Start pulsar and elasticsearch
$ docker-compose up -d
Create new tennant
$ curl --header "Content-Type: application/json" \
--request PUT \
--data '{ "allowedClusters": ["standalone"]}' \
http://localhost:8080/admin/v2/tenants/tenant-1
Create new namespace
$ curl --header "Content-Type: application/json" \
--request PUT \
--data '{}' \
http://localhost:8080/admin/v2/namespaces/tenant-1/ns-1
Create Elasticsearch sink
$ curl --header "Content-Type: multipart/form-data" \
--request POST \
-F url='file:///pulsar/connectors/pulsar-io-elastic-search-2.7.0-SNAPSHOT.nar;type=text/plain' \
-F sinkConfig='{ "className": "org.apache.pulsar.io.elasticsearch.ElasticSearchSink", "archive": "/pulsar/connectors/pulsar-io-elastic-search-2.7.0-SNAPSHOT.nar", "inputs": ["persistent://tenant-1/ns-1/elastic-test"], "processingGuarantees": "EFFECTIVELY_ONCE", "parallelism": 1, "configs": {"elasticSearchUrl": "http://elasticsearch:9200", "indexName": "test_index" } };type=application/json' \
http://localhost:8080/admin/v3/sinks/tenant-1/ns-1/elasticsearch
Sink config class https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
Sink manageement
$ curl --request POST http://localhost:8080/admin/v3/sinks/tenant-1/ns-1/elasticsearch/start
$ curl --request POST http://localhost:8080/admin/v3/sinks/tenant-1/ns-1/elasticsearch/restart
$ curl --request GET http://localhost:8080/admin/v3/sinks/tenant-1/ns-1/elasticsearch/status
$ curl --request DELETE http://localhost:8080/admin/v3/sinks/tenant-1/ns-1/elasticsearch
On MacOS
$ brew install libpulsar
Run producer
$ npm install
$ node ./index.js
Check documents in Elasticsearch
$ curl -s http://localhost:9200/test_index/_refresh
$ curl -s http://localhost:9200/test_index/_search