- Published on
monstache 深度解析:MongoDB 到 Elasticsearch 数据同步机制与实现
- Authors
- Name
- Liant
monstache 简述
Go语言编写的一款将 MongoDB
数据库中的数据同步至 Elasticsearch
的软件.
特点
- 完整同步
- 实时同步
- 同步过程中,对
MongoDB
中的document处理 - 提供http接口的做状态检查
- resume
- ...
程序启动执行流程
服务启动
流程示意图 

四句话和一张图描述
- 加载配置文件
- 初始化资源(连接数据库等)
- 运行服务
- 条件退出
服务运行
服务示意图 

服务退出几种情况
- 命令行查看版本,配置等
exit-after-direct-reads=true
,当直接读取同步之后便退出程序- 服务运行处理
changeStream
和tailOps
- 程序异常退出,系统KILL通知
两种场景,三种方案
- 两种场景:完整同步,实时同步
- 三种方案:
- ProcessDirectReads(直接读取数据库)
- changeStream(mongodb的changeStream特性)
- tailOps(读取
oplog.rs
)
1. DirectReadNs 直接同步
DirectReadNs 代码执行流程
# 读取collection,如:user.info,user.class
direct-read-namespaces = ["user.info", "user.class]
direct-read-namespaces = [""] // 所有的collections
# 配置文件加载
main()->mustConfig()->config.build()->config.loadConfigFile()
=>
# 细节处理,ENV的优先级高于配置文件
if len(config.DirectReadNs) == 0 {
config.DirectReadNs = tomlConfig.DirectReadNs
}
# 启动监听时,处理所有collection
ic.run()->ic.startListen()->ic.buildGtmOptions()
=>
if config.dynamicDirectReadList() {
config.DirectReadNs = ic.buildDynamicDirectReadNs(nsFilter)
}
->gtm.StartMulti(conns, gtmOpts)
# 启动了监听,会去读取mongo相关的属性
gtm.ProcessDirectReads
gtm.ConsumeChangeStream
gtm.TailOps <-> (inOps=>FetchDocuments) 读取 oplog.rs 中的oplog,在获取文档内容,到->opC
# startReadWait 读取等待,当 ic.gtmCtx.DirectReadWg.Wait() 时,保存 resume 数据
# direct-read-stateful 记录读取状态,读取完成之后
# exit-after-direct-reads = true,则程序处理收尾工作,然后退出程序
ic.startReadWait()->ic.config.ExitAfterDirectReads
# 最后 eventLoop,处理OpC, (op, open := <-ic.gtmCtx.OpC:)
ic.eventLoop()->ic.routeOp(op)->ic.routeData(op)->(ic.indexC <- op)
ic.startIndex() 中接收 ic.indexC, 最后esclient索引到ES中,是批量操作
数据同步演示
如果同时配置 direct-read-namespaces与change-stream-namespaces,同时执行.
disable-change-events=true 会禁用changeStream和tailOps 事件
示例:
INFO 2021/10/07 20:22:17 Started monstache version 6.7.6
INFO 2021/10/07 20:22:17 Go version go1.17.1
INFO 2021/10/07 20:22:17 MongoDB go driver v1.7.2
INFO 2021/10/07 20:22:17 Elasticsearch go driver 7.0.28
INFO 2021/10/07 20:22:17 Successfully connected to MongoDB version 4.4.9
INFO 2021/10/07 20:22:17 Successfully connected to Elasticsearch version 7.15.0
ConsumeChangeStream
INFO 2021/10/07 20:22:17 Listening for events
ProcessDirectReads
INFO 2021/10/07 20:22:17 Watching changes on collection demo.sites
demo.news
demo.news
demo.news
demo.news
demo.sites
map[_id:ObjectID("615ee67d31b7dc28638f4a9e") number:no637 site:site637 title:title637]
demo.news
demo.news
demo.news
demo.news
demo.sites
map[_id:ObjectID("615ee68231b7dc28638f4a9f") number:no638 site:site638 title:title638]
由于是使用MongoDB中的objectId作为es的_id所以执行无需考虑数据重复问题.
2. changeStream
MongoDB的changeStream事件,依赖于MongoDB的oplog
注意: changeStream特性理论上是只依赖于oplog,oplog是集群同步时用到的特性.但是单机也能开启oplog的.所以changeStream也是单机可用的.
代码执行流程
# 使用changeStream,监控所有的的collection
change-stream-namespaces=[""]
# 执行监听,当 n.database = "",n.collection = ""时,watch整个mongo变更事件
启动监听
gtm.Start()->
# 开启协程监控MongoDB的changeStream事件
ConsumeChangeStream(ctx, client, ns, o)->
n.parseForChanges(ns)->
c := client.Database(n.database).Collection(n.collection)->
stream, err = c.Watch(task.ctx, pipeline, opts)->
# 监控事件,使用MongoDB的驱动接口
# 该消息会不确定的更新
stream.Next(task.ctx)->
# 将操作归类,ns过滤和DirectReadFilter过滤,最后将数据放入OpC中,等待indexClient消费
changeDoc.isDrop() # 删除
changeDoc.isDropDatabase() # 删除数据库
changeDoc.isInvalidate() #无效操作,会刷新changeStream
oper != ""-> ctx.OpC <- op # 将消息放入OpC,为IndexClient提供doc消息
# indexClient的流程,同 DirectReadNs
eventLoop()->
op, open := <-ic.gtmCtx.OpC: # 处理基本数据等
replay=true
会将所有记录的oplog,通过changeStream
事件重新播放一次,这是是属于MongoDB的特性
3. tailOps
该特性依赖 MongoDB
的 oplog
,需要 MongoDB
开启 MongoDB
,读取 oplog.rs
集合.
# 启用了oplog
monstache.EnableOplog = true
gtm.OpLogDisabled = false
# 启用监听,启动协程
gtm.Start()->
构建WorkerCount个inOps,同时启用协程 FetchDocuments 用于获取文档内容,最后执行
tailOps(ctx, client, inOps, o)
# 从oplog.rs中,读取文档
cursor, err = GetOpLogCursor(client, cts, o) # 读取oplog.rs的文档,实际上是oplog
cursor.Next(task.ctx)
op.ParseLogEntry(&entry, o) # 用于解析log
# 数据两个去处,检查是否有数据,有数据直接投递到OpC中;否则投递到inOps中,去获取文档内容
if opDataReady(op, o) {
ctx.OpC <- op
} else {
// 处理数据,需要去执行 FetchDocuments 文档
// broadcast to fetch channels
for _, channel := range channels {
channel <- op
}
}
# 最后数据流转同, DirectReadNs
eventLoop()->
op, open := <-ic.gtmCtx.OpC: # 处理基本数据等
4. 写入 ElasticSearch
// 路由操作,删除表等操作
if err = ic.routeOp(op); err != nil {
ic.processErr(err)
}
=> 路由Op操作
// 路由操作
func (ic *indexClient) routeOp(op *gtm.Op) (err error) {
// 插件处理,通过插件处理之后,将op放入processC中
if processPlugin != nil {
// 插件处理,ProcessPluginInput具有的es与mongo连接对象
err = ic.routeProcess(op)
}
if op.IsDrop() { // 删除db或者删除col命令
err = ic.routeDrop(op)
} else if op.IsDelete() { // 删除文档命令
err = ic.routeDelete(op)
} else if op.Data != nil { // 处理数据
err = ic.routeData(op)
}
return
}
=> 处理op数据:先处理关联数据,处理GridFS,最后处理一般消息
func (ic *indexClient) routeData(op *gtm.Op) (err error) {
skip := false
// oplog 数据(tailOplog/changeStream) 并且有关联数据,处理关联数据
// 只有oplog可用关联信息同步
if op.IsSourceOplog() && len(ic.config.Relate) > 0 {
skip, err = ic.routeDataRelate(op)
}
// 不需要跳过的数据,需要处理文件内容
if !skip {
if ic.hasFileContent(op) {
// 写入文件
ic.fileC <- op
} else {
// 写入数据
ic.indexC <- op
}
}
return
}
=> 处理 IndexC
for op := range ic.indexC {
if err := ic.doIndex(op); err != nil {
ic.processErr(err)
}
}
=> 将数据插入 ElasticSearch 中
// 保存文档数据
func (ic *indexClient) doIndex(op *gtm.Op) (err error) {
if err = ic.mapData(op); err == nil {
if op.Data != nil {
err = ic.doIndexing(op)
} else if op.IsUpdate() {
ic.doDelete(op)
}
}
return
}
其他特性
HTTP服务
查看服务器的一些统计状态
# 启用http服务
enable-http-server = true
127.0.0.1:8080/started => 44.3916393s
127.0.0.1:8080/healthz => ok
# 需要同时配置 stats=true,才可以使用,否则404
127.0.0.1:8080/stats =>
{
"Flushed": 59,
"Committed": 65,
"Indexed": 65,
"Created": 0,
"Updated": 0,
"Deleted": 0,
"Succeeded": 65,
"Failed": 0,
"Workers": [
{
"Queued": 0,
"LastDuration": 9000000
},
{
"Queued": 0,
"LastDuration": 3000000
},
{
"Queued": 0,
"LastDuration": 0
},
{
"Queued": 0,
"LastDuration": 0
},
{
"Queued": 0,
"LastDuration": 0
},
{
"Queued": 0,
"LastDuration": 0
}
]
}
127.0.0.1:8080/instance =>
{
"enabled": true,
"pid": 16256,
"hostname": "LAPTOP-MAT9G0DE",
"cluster": "",
"resumeName": "default",
"lastTs": {
"T": 1633606975,
"I": 1
},
"lastTsFormat": "2021-10-07T19:42:55"
}
resume
是对于changeStream
使用,继续上次的记录,为MongoDB驱动提供接口
# 配置项
resume=true
# 0=timestamp,1=token
resume-strategy=1
resume-from-timestamp=
resume-from-earliest-timestamp=
Workers & Worker 协作
使用了ConsistentHashFilter
做消息分发,提高处理能力
# 代码示例
workers = ["Tom", "Dick", "Harry"]
monstache -f config.toml -worker Tom
monstache -f config.toml -worker Dick
monstache -f config.toml -worker Harry
plugin 插件
可以对同步的文档做处理,文档中的字段处理
# 配置
mapper-plugin-path="/path/to/myplugin.so"
# 插件示例
package main
import (
"github.com/rwynn/monstache/monstachemap"
"strings"
)
// 处理文档本身
func Map(input *monstachemap.MapperPluginInput) (output *monstachemap.MapperPluginOutput, err error) {
doc := input.Document
for k, v := range doc {
switch v.(type) {
case string:
doc[k] = strings.ToUpper(v.(string))
}
}
output = &monstachemap.MapperPluginOutput{Document: doc}
return
}
// 是否忽略文档,传输过程中
func Filter(input *monstachemap.MapperPluginInput) (keep bool, err error) {
}
// 读取collection使用pipeline过滤源数据,mongo读取数据过程中
func Pipeline(ns string, changeStream bool) (stages []interface{}, err error) {
}
// 处理文档,如删除es中的文档,将控制权转为plugin处理
func Process(input*monstachemap.ProcessPluginInput) error {
}
script && filter && pipeline
功能比plugin中的map更加强大,Map,Process功能
[[script]]
namespace = "mydb.mycollection"
script = """
module.exports = function(doc) {
if ( doc.score > {{index . "THRESHOLD"}} ) {
doc.important = true;
}
return doc;
}
"""
主要过滤文档
[[filter]]
namespace = "db.collection"
script = """
module.exports = function(doc, ns, updateDesc) {
return !!doc.interesting;
}
"""
[[filter]]
namespace = "db2.collection2"
path = "path/to/script.js"
从MongoDB中条件取值
[[pipeline]]
script = """
module.exports = function(ns, changeStream) {
if (changeStream) {
return [
{ $match: {"fullDocument.foo": 1} }
];
} else {
return [
{ $match: {"foo": 1} }
];
}
}
"""
注意点
- 配置文件中mapping映射不指定,则Elasticsearch的索引(index)名称为MongoDB中
db.collection
- 同时开启
direct-read-namespaces
与change-stream-namespace
,会同时执行 change-stream-namespace
需要开启MongoDB的oplogchange-stream-namespace
默认只执行当前状态下的changeStream,repaly=true
会重播所有changeStreamexit-after-direct-reads
会在direct-read-namespaces
执行完成之后退出程序.但是change-stream-namespace
会同时执行,可能会在es中生产记录- resume,是对changeStream而言,是MongoDB的特性.
- 文档处理,使用JavaScript比golang的plugin要慢几个数量级,是由于js执行加上锁.数量巨大的建议golang的plugin
- monstache执行时,会在MongoDB钟创建相关的数据库,保留运行记录.