Accompanying blog post:
This demo project showcases the use of Apache Kafka to create materialized views from a stream of events. This sample projects assumes you have read the following articles:
The application holds a single entity type: a job description for a fictive job board. A list of all published job descriptions is also held.
The app is split in three components:
- A stateless API which reads off of redis and schedules writes through kafka.
- A stateless consumer which reads the event stream and keeps redis up to date.
- A small single-page javascript application to interact with the API.
In the project directory, run lein uberjar
, a resulting standalone artifact
will be in target/kmodel-0.3.0-standalone.jar
Start the client by running java -jar target/kmodel-0.3.0-standalone.jar client
or the worker by running java -jar target/kmodel-0.3.0-standalone.jar worker
If you want to test modifications, you can run the project with leiningen as well,
by issuing either lein run -- client
or lein run -- worker
Additionally, a path pointing to an EDN configuration file may be supplied as the last argument.
The defaults are shown here and assume both redis, zookeeper and kafka are available locally, adapt as needed.
{:http {:port 8080}
:redis {:pool {}
:spec {:host ""
:port 6379}}
:producer {:bootstrap.servers "localhost:9092"
:value.serializer serializer
:key.serializer serializer}
:consumer {:zookeeper.connect "localhost:2181" "job-db"}}
You will of course need both zookeeper and kafka running, as well as a reachable redis instance.
You will need to create the kafka topic and indicate in its configuration that log compaction is enabled:
./bin/ --create --topic=job --zookeeper=localhost:2181 --partitions=10 --replication-factor=1 --config=cleanup.policy=compact
You will also need the following set in your kafka broker properties:
The code is split across the following namespaces:
: glue namespace, to start thingskmodel.worker
: kafka to redis consumerkmodel.client
: glue namespace for the clientkmodel.client.db
: persistence layer implementationkmodel.client.api
: HTTP API implementation
GET /api/job
: return jobsPOST /api/job
: insert a new jobPUT /api/job/:id
: update a job by IDDELETE /api/job/:id
: delete job by ID