/poc-kafka-connect-cdc-mongodb-to-elasticsearch

A repository PoC streaming data from MongoDB as upstream to Elasticsearch downstream by using Kafka Connect and additional tools for monitoring ⭐️

Primary LanguageShellApache License 2.0Apache-2.0

Stream data from MongoDB to Elasticsearch by using Kafka Connect

A repository PoC streaming data from MongoDB as upstream to Elasticsearch downstream by using Kafka Connect and additional tools for monitoring. inspired by https://github.com/mongodb-university/kafka-edu repository

Prerequisites 🚀

mongodb-source-to-elasticsearch-sink using JsonSchemaConverter

An environment will PoC about to capture data changes from MongoDB to Elasticsearch use case

docker-compose will contains these services

  • Apache Kafka
  • Zookeeper
  • Apache Kafka Connect
  • Confluent REST Proxy for Kafka
  • Confluent Schema Registry
  • MongoDB Connector
  • Elasticsearch Connector Sink
  • MongoDB single node replica set
  • Kibana
  • Elasticsearch
  • Redpanda Console (integrated with Schema Registry and Kafka Connect)

Start Development 🚧

step 1) you have to change directories and start all services by using

cd playgrounds/mongodb-source-to-elasticsearch-sink
make up

step 2) shell to some container (we will use mongo1)

make exe

step 3) we have to create collection first for initialing cursor that source connector use it to capture changes and produce it to kafka topic

3.1) shell to MongoDB replica

mongo mongodb://mongo1:27017/?replicaSet=rs0    # for MongoDB version 3.X
mongosh mongodb://mongo1:27017/?replicaSet=rs0  # for MongoDB version 6.X

3.2) switch to target database

use quickstart

3.3) create a collection

db.createCollection('sampleData')

step 4) add source and sink connector, these command will add mongo-source as source connector and elasticsearch-sink as sink connector to capture data changes from upstream data to Kafka topic then push it to downstream. for more commands, you can see at scripts/kafka-connect/requests.sh

(optional) you can do this step by using Redpanda Console to create/edit/delete connectors on this http://localhost:8888/connect-clusters/connect-local

4.1) shell and open new session for commanding connector

make exe

4.2) add connectors

# add new mongo-source connector as a source connector by using JsonSchemaConverter
curl -X POST \
  -H "Content-Type: application/json" \
  --data '
  {
    "name": "mongo-source",
    "config": {
      "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
      "connection.uri": "mongodb://mongo1:27017/?replicaSet=rs0",
      "database": "quickstart",
      "collection": "sampleData",
      "pipeline": "[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]",

      "output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
      "output.format.value": "schema",
      "output.format.key": "json",

      "value.converter":"io.confluent.connect.json.JsonSchemaConverter",
      "value.converter.schema.registry.url": "http://schema-registry:8081",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",

      "output.schema.infer.value" : true,
      "publish.full.document.only": true,

      "transforms": "createKey,extractString",
      "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
      "transforms.createKey.fields": "hello",
      "transforms.extractString.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
      "transforms.extractString.field": "hello"
    }
  }
  ' \
  http://connect:8083/connectors -w "\n"

# add new elasticsearch-sink connector as a sink connector by using JsonSchemaConverter
curl -X POST \
  -H "Content-Type: application/json" \
  --data '
	{
    "name": "elasticsearch-sink",
    "config": {
      "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "connection.url": "http://elasticsearch:9200",
      "topics": "quickstart.sampleData",
      "tasks.max": "1",

      "value.converter":"io.confluent.connect.json.JsonSchemaConverter",
      "value.converter.schema.registry.url": "http://schema-registry:8081",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",

      "transforms": "createKey,extractString,ReplaceField",
      "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
      "transforms.createKey.fields": "_id",
      "transforms.extractString.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
      "transforms.extractString.field": "_id",
      "transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
      "transforms.ReplaceField.exclude": "_id"
    }
	}
	' \
  http://connect:8083/connectors -w "\n"

step 5) we will try to trigger or make some change events to out upstream system by insert one document to collection. for more commands, you can see at scripts/mongodb/manual.js

5.1) insert or update document that make event changes. you can read other events from this Change Events - MongoDB

db.sampleData.insertOne({ "hello": "world"})


db.sampleData.updateOne(
  { _id: ObjectId("your-document-object-id") },
  { $set: { hello: "updated"} },
)

5.2) you can monitoring data flow from these URL

References 🙏