/db-caching

Showcasing ability to use MYSQL binlogs to create a in-memory cache for stream-table joins when stream processing.

Primary LanguageShellMIT LicenseMIT

Setup (Mac OSX)

Startup docker-machine

cd docker
docker-machine up docker-vm

Note the docker host IP

docker-machine env docker-vm

Use this at relevant places.

Startup MYSQL

docker-compose up -d mysql

Grant privileges

docker exec -it my-mysql bash -c "mysql -uroot -p -e \"GRANT ALL on demo.* to 'demo'@'%' identified by 'demo';CREATE DATABASE demo;SELECT User FROM mysql.user;\""
docker exec -it my-mysql bash -c "mysql -uroot -p -e \"GRANT ALL on maxwell.* to 'maxwell'@'%' identified by 'maxwell';GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';SELECT User FROM mysql.user;\""

Start Kafka container

docker-compose up -d kafka

Start maxwell binlog replicator

docker-compose run maxwell bash -c "bin/maxwell --user=maxwell --password=maxwell --host=mysql --producer=kafka --kafka.bootstrap.servers=kafka:9092"

Check if maxwell topic is created

docker exec -it my-kafka bash -c "/opt/kafka_2.11-0.8.2.1/bin/kafka-topics.sh --zookeeper localhost:2181 --list"

Put a consumer on maxwell and create some users

docker exec -it my-kafka bash -c "/opt/kafka_2.11-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic maxwell"
cd data-generators/ruby
bundle exec ruby create_users.rb

You will have created 1000 records each on demo.A and demo.B

Now, let us look at how we can do DB denormalization while stream processing

Publish some events into 'events' kafka topic

docker exec -it my-kafka bash -c "/opt/kafka_2.11-0.8.2.1/bin/kafka-topics.sh --create --topic events --zookeeper localhost:2181 --partitions 1 --replication-factor 1"
bundle exec ruby create_events.rb

Everytime you run create_events.rb 1000 arbitrary events will be logged of type {"aid":803,"bid":205}.

Install YARN

cd data-processors/stream-processors/samza
bin/grid install yarn
bin/grid start yarn

Once YARN is up, you should be able to access http://localhost:8088/cluster.

Denormalize these events using Samza

mvn clean package
mkdir -p deploy/samza
tar -xvf ./target/db-caching-0.0.1-dist.tar.gz -C deploy/samza
deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/stream-db-denormalization.properties

Wait for job to get into RUNNING status on YARN.

docker exec -it my-kafka bash -c "/opt/kafka_2.11-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic db-de-normalized-events

Let this shell remain open.

Try again (this time observe the pace of denormalization

bundle exec ruby create_events.rb

Now, use the DB cache denormalizer

deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/db-to-changelog.properties

Compare stream-de-normalized-events vs db-de-normalized-events

docker exec -it my-kafka bash -c "/opt/kafka_2.11-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic stream-de-normalized-events"
docker exec -it my-kafka bash -c "/opt/kafka_2.11-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic db-de-normalized-events"
bundle exec ruby create_events.rb

Note the difference in speed.

Enjoy!