Spring kafka mongodb - sample project
The following project implements a simple data pipeline, utilizing SpringBoot, Kafka and MongoDB.
Springboot's embedded Tomcat is used to serve REST endpoints.
Quick glance over the available flows:
Generated using draw.io, import media/skmd.drawio
in draw.io to modify
Run locally
Dependencies:
- docker
- docker-compose
- java11
In order to run this project locally with all of its dependencies in docker, first build the artifact and then use the provided docker-compose.yml:
./gradlew clean build docker
docker-compose up
this will create a local image com.github.viktorpenelski/spring-kafka-mongo-demo
and bring it up along all of the dependencies (zoo, kafka, mongo).
The service will be available on localhost:8080
Dev locally
Dependencies:
- docker
- docker-compose
- java11
In order to run only the dependencies (zoo, kafka, mongo) in docker, while developing the app, you can use the alternative docker-compose:
docker-compose -f docker-compose-dependencies-only.yml up
After that you can run application from your IDE, or using:
./gradlew clean build
java -jar build/libs/*.jar
API usage
Enqueue a request:
/jsonstore/enqueue
endpoint accepts any valid JSON as payload. Upon successful
submission, HttpStatus 202 - Accepted
is returned with an id
and href
where the resource is expected to be
available at a later point.
Note that enqueueing a request results in a REST resource almost immediately, but since it is an asynchronous process served by a messaging queue, no guarantee is provided for when that will happen.
POST {{url}}:{{port}}/jsonstore/enqueue
With any valid JSON as body.
Example:
curl -X POST \
http://localhost:8080/jsonstore/enqueue \
-H 'content-type: application/json' \
-d '{ "someKey": "some value" }'
Expected response:
Status: 202 Accepted
Body:
{
"message": "Successfully enqueued the payload. Resource will be available shortly.",
"id": "d3269c50-3525-4725-b0c0-f9c8598daae6",
"href": "/jsonstore/d3269c50-3525-4725-b0c0-f9c8598daae6"
}
Retrieve a resource
Get a single REST resource previously enqueued via the enqueue endpoint.
GET {{url}}:{{port}}/jsonstore/{id}
Example:
curl localhost:8080/jsonstore/d3269c50-3525-4725-b0c0-f9c8598daae6
Response:
{
"payload" : {
"someKay" : "some value"
},
"_links" : {
"self" : {
"href" : "http://localhost:8080/jsonstore/d3269c50-3525-4725-b0c0-f9c8598daae6"
},
"jsonstoreRecord" : {
"href" : "http://localhost:8080/jsonstore/d3269c50-3525-4725-b0c0-f9c8598daae6"
}
}
}
Retrieve all resources
Get all REST resources previously enqueued via the enqueue endpoint.
The endpoint supports client-requested pagination via query parameters:
page
- which page to start the current request form. First page is 0. 0 by default.size
- page size. Each page will contain that many resources. 10000 by default.
GET {{url}}:{{port}}/jsonstore{&page,size}
Example:
curl localhost:8080/jsonstore
Response:
{
"_embedded" : {
"jsonstoreRecords" : [ {
"payload" : {
"someKay" : "some value"
},
"_links" : {
"self" : {
"href" : "http://localhost:8080/jsonstore/d3269c50-3525-4725-b0c0-f9c8598daae6"
},
"jsonstoreRecord" : {
"href" : "http://localhost:8080/jsonstore/d3269c50-3525-4725-b0c0-f9c8598daae6"
}
}
}, {
"payload" : {
"someKay" : "some value"
},
"_links" : {
"self" : {
"href" : "http://localhost:8080/jsonstore/bbc3dd1f-b99c-4de7-ae45-ceca3b34d866"
},
"jsonstoreRecord" : {
"href" : "http://localhost:8080/jsonstore/bbc3dd1f-b99c-4de7-ae45-ceca3b34d866"
}
}
} ]
},
"_links" : {
"self" : {
"href" : "http://localhost:8080/jsonstore{?page,size,sort}",
"templated" : true
},
"profile" : {
"href" : "http://localhost:8080/profile/jsonstore"
}
},
"page" : {
"size" : 10000,
"totalElements" : 2,
"totalPages" : 1,
"number" : 0
}
Integration tests
The app contains an integration test suite.
Currently it attempts to:
- Create a resource via POST
/jsonstore/enqueue
- fails on unreachable endpoint
- fails on status other than 202 Accepted
- Retrieve the resource via GET
/jsonstore/{id}
- retries twice with timeout (in case the queue is slower than expected)
- fails if the resource created in 1. does not have a valid
.href
in the body - fails if after N retries no 200 OK is received.
In order to run integration tests, first make sure that an app is either deployed locally or running on a remote host with all of its dependencies available (in which case use env variable INTEGRATION_TEST_BASE_URL_PORT to direct the tests against it).
Execute using:
./gradlew integrationTest
Environment variables:
variable | default | description |
---|---|---|
MONGODB_URL |
localhost | url of the mongo db instance to connect to |
MONGODB_PORT |
27017 | port of the mongodb instace to connect to |
KAFKA_BOOTSTRAP_SERVERS |
localhost:9092 | comma-separated list of host and port pairs that are the addresses of Kafka brokers |
INTEGRATION_TEST_BASE_URL_PORT |
http://127.0.0.1:8080 | Location to run acceptance tests against |
Potential improvements:
- Implement a dead letter queue for messages that were not stored
- Add common error handling to have consistent messages.
- Add common logging for each accepted request.
- Аdd embedded kafka/mongo to allow for integration tests w/o dependencies