Published on

monstache 深度解析:MongoDB 到 Elasticsearch 数据同步机制与实现

Authors
  • avatar
    Name
    Liant
    Twitter

monstache 简述

Go语言编写的一款将 MongoDB 数据库中的数据同步至 Elasticsearch 的软件.

特点

  • 完整同步
  • 实时同步
  • 同步过程中,对 MongoDB 中的document处理
  • 提供http接口的做状态检查
  • resume
  • ...

程序启动执行流程

服务启动

流程示意图 服务启动

四句话和一张图描述

  1. 加载配置文件
  2. 初始化资源(连接数据库等)
  3. 运行服务
  4. 条件退出

服务运行

服务示意图 服务运行

服务退出几种情况

  1. 命令行查看版本,配置等
  2. exit-after-direct-reads=true,当直接读取同步之后便退出程序
  3. 服务运行处理changeStreamtailOps
  4. 程序异常退出,系统KILL通知

两种场景,三种方案

  • 两种场景:完整同步,实时同步
  • 三种方案:
    • ProcessDirectReads(直接读取数据库)
    • changeStream(mongodb的changeStream特性)
    • tailOps(读取oplog.rs)

1. DirectReadNs 直接同步

DirectReadNs 代码执行流程

# 读取collection,:user.info,user.class
direct-read-namespaces = ["user.info", "user.class]
direct-read-namespaces = [""] // 所有的collections


# 配置文件加载
main()->mustConfig()->config.build()->config.loadConfigFile()
=>
# 细节处理,ENV的优先级高于配置文件
if len(config.DirectReadNs) == 0 {
    config.DirectReadNs = tomlConfig.DirectReadNs
}


# 启动监听时,处理所有collection
ic.run()->ic.startListen()->ic.buildGtmOptions()
=>
if config.dynamicDirectReadList() {
    config.DirectReadNs = ic.buildDynamicDirectReadNs(nsFilter)
}
->gtm.StartMulti(conns, gtmOpts)


# 启动了监听,会去读取mongo相关的属性
gtm.ProcessDirectReads
gtm.ConsumeChangeStream
gtm.TailOps <-> (inOps=>FetchDocuments) 读取 oplog.rs 中的oplog,在获取文档内容,->opC 


# startReadWait 读取等待,当 ic.gtmCtx.DirectReadWg.Wait(),保存 resume 数据
# direct-read-stateful 记录读取状态,读取完成之后 
# exit-after-direct-reads = true,则程序处理收尾工作,然后退出程序
ic.startReadWait()->ic.config.ExitAfterDirectReads


# 最后 eventLoop,处理OpC, (op, open := <-ic.gtmCtx.OpC:)
ic.eventLoop()->ic.routeOp(op)->ic.routeData(op)->(ic.indexC <- op)
ic.startIndex() 中接收 ic.indexC, 最后esclient索引到ES,是批量操作

数据同步演示

如果同时配置 direct-read-namespaces与change-stream-namespaces,同时执行.
disable-change-events=true 会禁用changeStream和tailOps 事件

示例:

INFO 2021/10/07 20:22:17 Started monstache version 6.7.6
INFO 2021/10/07 20:22:17 Go version go1.17.1
INFO 2021/10/07 20:22:17 MongoDB go driver v1.7.2
INFO 2021/10/07 20:22:17 Elasticsearch go driver 7.0.28
INFO 2021/10/07 20:22:17 Successfully connected to MongoDB version 4.4.9
INFO 2021/10/07 20:22:17 Successfully connected to Elasticsearch version 7.15.0
ConsumeChangeStream
INFO 2021/10/07 20:22:17 Listening for events
ProcessDirectReads
INFO 2021/10/07 20:22:17 Watching changes on collection demo.sites
demo.news
demo.news
demo.news
demo.news
demo.sites
map[_id:ObjectID("615ee67d31b7dc28638f4a9e") number:no637 site:site637 title:title637]
demo.news
demo.news
demo.news
demo.news
demo.sites
map[_id:ObjectID("615ee68231b7dc28638f4a9f") number:no638 site:site638 title:title638]

由于是使用MongoDB中的objectId作为es的_id所以执行无需考虑数据重复问题.

2. changeStream

MongoDB的changeStream事件,依赖于MongoDB的oplog

注意: changeStream特性理论上是只依赖于oplog,oplog是集群同步时用到的特性.但是单机也能开启oplog的.所以changeStream也是单机可用的.

代码执行流程

# 使用changeStream,监控所有的的collection
change-stream-namespaces=[""]

# 执行监听,当 n.database = "",n.collection = "",watch整个mongo变更事件
启动监听
gtm.Start()->

# 开启协程监控MongoDB的changeStream事件
ConsumeChangeStream(ctx, client, ns, o)->
n.parseForChanges(ns)->
c := client.Database(n.database).Collection(n.collection)->
stream, err = c.Watch(task.ctx, pipeline, opts)->

# 监控事件,使用MongoDB的驱动接口
# 该消息会不确定的更新
stream.Next(task.ctx)->

# 将操作归类,ns过滤和DirectReadFilter过滤,最后将数据放入OpC中,等待indexClient消费
changeDoc.isDrop() # 删除
changeDoc.isDropDatabase() # 删除数据库
changeDoc.isInvalidate() #无效操作,会刷新changeStream
oper != ""-> ctx.OpC <- op # 将消息放入OpC,为IndexClient提供doc消息

# indexClient的流程,DirectReadNs
eventLoop()->
op, open := <-ic.gtmCtx.OpC: # 处理基本数据等

replay=true会将所有记录的oplog,通过changeStream事件重新播放一次,这是是属于MongoDB的特性

3. tailOps

该特性依赖 MongoDBoplog ,需要 MongoDB 开启 MongoDB ,读取 oplog.rs 集合.

# 启用了oplog
monstache.EnableOplog = true
gtm.OpLogDisabled = false

# 启用监听,启动协程
gtm.Start()->
构建WorkerCount个inOps,同时启用协程 FetchDocuments 用于获取文档内容,最后执行
tailOps(ctx, client, inOps, o)

# 从oplog.rs中,读取文档
cursor, err = GetOpLogCursor(client, cts, o) # 读取oplog.rs的文档,实际上是oplog
cursor.Next(task.ctx)
op.ParseLogEntry(&entry, o) # 用于解析log

# 数据两个去处,检查是否有数据,有数据直接投递到OpC中;否则投递到inOps中,去获取文档内容
if opDataReady(op, o) {
    ctx.OpC <- op
} else {
    // 处理数据,需要去执行 FetchDocuments 文档
    // broadcast to fetch channels
    for _, channel := range channels {
        channel <- op
    }
}

# 最后数据流转同, DirectReadNs
eventLoop()->
op, open := <-ic.gtmCtx.OpC: # 处理基本数据等

4. 写入 ElasticSearch

// 路由操作,删除表等操作
if err = ic.routeOp(op); err != nil {
    ic.processErr(err)
}
=> 路由Op操作
// 路由操作
func (ic *indexClient) routeOp(op *gtm.Op) (err error) {

	// 插件处理,通过插件处理之后,将op放入processC中
	if processPlugin != nil {
        // 插件处理,ProcessPluginInput具有的es与mongo连接对象
		err = ic.routeProcess(op)
	}
	if op.IsDrop() { // 删除db或者删除col命令
		err = ic.routeDrop(op)
	} else if op.IsDelete() { // 删除文档命令
		err = ic.routeDelete(op)
	} else if op.Data != nil { // 处理数据
		err = ic.routeData(op)
	}
	return
}

=> 处理op数据:先处理关联数据,处理GridFS,最后处理一般消息
func (ic *indexClient) routeData(op *gtm.Op) (err error) {
	skip := false
	// oplog 数据(tailOplog/changeStream) 并且有关联数据,处理关联数据
	// 只有oplog可用关联信息同步
	if op.IsSourceOplog() && len(ic.config.Relate) > 0 {
		skip, err = ic.routeDataRelate(op)
	}

	// 不需要跳过的数据,需要处理文件内容
	if !skip {
		if ic.hasFileContent(op) {
			// 写入文件
			ic.fileC <- op
		} else {
			// 写入数据
			ic.indexC <- op
		}
	}
	return
}

=> 处理 IndexC
for op := range ic.indexC {
    if err := ic.doIndex(op); err != nil {
        ic.processErr(err)
    }
}

=> 将数据插入 ElasticSearch// 保存文档数据
func (ic *indexClient) doIndex(op *gtm.Op) (err error) {
	if err = ic.mapData(op); err == nil {
		if op.Data != nil {
			err = ic.doIndexing(op)
		} else if op.IsUpdate() {
			ic.doDelete(op)
		}
	}
	return
}

其他特性

HTTP服务

查看服务器的一些统计状态

# 启用http服务
enable-http-server = true

127.0.0.1:8080/started => 44.3916393s
127.0.0.1:8080/healthz => ok
# 需要同时配置 stats=true,才可以使用,否则404
127.0.0.1:8080/stats =>
{
    "Flushed": 59,
    "Committed": 65,
    "Indexed": 65,
    "Created": 0,
    "Updated": 0,
    "Deleted": 0,
    "Succeeded": 65,
    "Failed": 0,
    "Workers": [
        {
            "Queued": 0,
            "LastDuration": 9000000
        },
        {
            "Queued": 0,
            "LastDuration": 3000000
        },
        {
            "Queued": 0,
            "LastDuration": 0
        },
        {
            "Queued": 0,
            "LastDuration": 0
        },
        {
            "Queued": 0,
            "LastDuration": 0
        },
        {
            "Queued": 0,
            "LastDuration": 0
        }
    ]
}
127.0.0.1:8080/instance =>
{
    "enabled": true,
    "pid": 16256,
    "hostname": "LAPTOP-MAT9G0DE",
    "cluster": "",
    "resumeName": "default",
    "lastTs": {
        "T": 1633606975,
        "I": 1
    },
    "lastTsFormat": "2021-10-07T19:42:55"
}

resume

是对于changeStream使用,继续上次的记录,为MongoDB驱动提供接口

# 配置项
resume=true
# 0=timestamp,1=token
resume-strategy=1
resume-from-timestamp=
resume-from-earliest-timestamp=

Workers & Worker 协作

使用了ConsistentHashFilter做消息分发,提高处理能力

# 代码示例
workers = ["Tom", "Dick", "Harry"]
monstache -f config.toml -worker Tom
monstache -f config.toml -worker Dick
monstache -f config.toml -worker Harry

plugin 插件

可以对同步的文档做处理,文档中的字段处理

# 配置
mapper-plugin-path="/path/to/myplugin.so"
# 插件示例
package main
import (
    "github.com/rwynn/monstache/monstachemap"
    "strings"
)
// 处理文档本身
func Map(input *monstachemap.MapperPluginInput) (output *monstachemap.MapperPluginOutput, err error) {
    doc := input.Document
    for k, v := range doc {
        switch v.(type) {
        case string:
            doc[k] = strings.ToUpper(v.(string))
        }
    }
    output = &monstachemap.MapperPluginOutput{Document: doc}
    return
}

// 是否忽略文档,传输过程中
func Filter(input *monstachemap.MapperPluginInput) (keep bool, err error) {

}

// 读取collection使用pipeline过滤源数据,mongo读取数据过程中
func Pipeline(ns string, changeStream bool) (stages []interface{}, err error) {

}

// 处理文档,如删除es中的文档,将控制权转为plugin处理
func Process(input*monstachemap.ProcessPluginInput) error {

}

文档 plugin
示例 myFoo

script && filter && pipeline

功能比plugin中的map更加强大,Map,Process功能

[[script]]
namespace = "mydb.mycollection"
script = """
module.exports = function(doc) {
    if ( doc.score > {{index . "THRESHOLD"}} ) {
      doc.important = true;
    }
    return doc;
}
"""

主要过滤文档

[[filter]]
namespace = "db.collection"
script = """
module.exports = function(doc, ns, updateDesc) {
    return !!doc.interesting;
}
"""

[[filter]]
namespace = "db2.collection2"
path = "path/to/script.js"

从MongoDB中条件取值

[[pipeline]]
script = """
module.exports = function(ns, changeStream) {
  if (changeStream) {
    return [
      { $match: {"fullDocument.foo": 1} }
    ];
  } else {
    return [
      { $match: {"foo": 1} }
    ];
  }
}
"""

注意点

  • 配置文件中mapping映射不指定,则Elasticsearch的索引(index)名称为MongoDB中db.collection
  • 同时开启direct-read-namespaceschange-stream-namespace,会同时执行
  • change-stream-namespace需要开启MongoDB的oplog
  • change-stream-namespace默认只执行当前状态下的changeStream,repaly=true会重播所有changeStream
  • exit-after-direct-reads会在direct-read-namespaces执行完成之后退出程序.但是change-stream-namespace会同时执行,可能会在es中生产记录
  • resume,是对changeStream而言,是MongoDB的特性.
  • 文档处理,使用JavaScript比golang的plugin要慢几个数量级,是由于js执行加上锁.数量巨大的建议golang的plugin
  • monstache执行时,会在MongoDB钟创建相关的数据库,保留运行记录.

1.编写插件时,需要更新依赖 github.com/klauspost/compress v1.13.6 ,不然编译不过

https://github.com/rwynn/monstache/issues/553