mongodb change stream replayer
ChangeEvent defined with referring to change-events/#change-stream-output
type ChangeEvent struct {
SrcDB string
SrcColl string
DstDB string
DstColl string
OperationType string
DocumentKey bson.D
UpdatedFields *bson.D
RemovedFields *bson.A
FullDocument *bson.D
}
- Basic example
ctx := context.TODO()
conf := events.Config{
DBMap: {"src_db": "dst_db"},
}
client, _ = mongo.Connect(ctx, options.Client().ApplyURI("mongodb://localhost:27017"))
mono := NewMonolog(client, &conf)
for _, changeEventBytes := range yourChangeStreamBytes {
mono.Process(ctx, changeEventBytes)
}
- Custom Filter
Filter fuction has been defined as
type FilterFunc func(ChangeEvent) bool
NewMonolog with mounting your custom filters
mono := NewMonolog(client, conf, func(ce events.ChangeEvent) bool {
return ce.SrcColl == "ignore_collection"
})
...