What is common between Navigation System & Data Engineering - "movement from A to B with location transparency". Location transparency means that "A" is unaware of "B". A navigation system handles "location transparency" by defining "routes" i.e. how to go from "A" to "B" by picking up the correct highway and as a driver "I don't want to know how"
Can we apply same principle to "Data Engineering". The answer is "Yes". Location transparency is achieved by implementation of "Router Pattern". The Router pattern has been recognized as an excellent way to accomplish Enterprise Application Integration (EAI). A router is a component that connects its consumer to one of multiple output strategies (as enunciated in the strategy design pattern). This pattern is also one of the most powerful design pattern for the "micro-services architecture" as it can transform an application that is monolithic, non modular, non configurable among other bad things into a thing of beauty/piece of art.
Check out my Github project which shows:
- Moving data to "Data Lake - AWS S3"
- Moving data to "Data Pipeline - AWS Kinesis & Apache Kafka"
- Moving API data to "Data Pipeline - AWS Kinesis"
- Content Based Routing (CBR) to "Data Pipeline Apache Kafka"
- REST POST to "Data Pipeline - AWS Kinesis"
It also demonstrates opinionated way of AWS based development without AWS account but the same software will run on AWS cloud with "no fuss".
Following EIA are implemented using Spring Boot & Apache Camel:
- Route to Spring bean
- Route to AWS S3
- Route to AWS Kinesis from REST end point
- Route to Apache Kafka
- Content based routing
Let's checkout one of routes which moved REST API data to AWS Kinesis:
restConfiguration().host("localhost").port(4001);
from("timer:hi?period={{timer.period}}")
.setHeader("id", simple("${random(1,3)}"))
.to("rest:get:cars/{id}")
.log("[going to Kinesis]"+"${body}")
.setHeader(KinesisConstants.PARTITION_KEY,simple("1"))
.setHeader(KinesisConstants.SHARD_ID, simple("1"))
.to("aws-kinesis://mykinesisstream?amazonKinesisClient=#amazonKinesisClient")
.to("log:out?showAll=true")
.log("Completed Writing to Kinesis");
To run this project you can setup/emulate AWS locally on you laptop by following below steps. It also comes with docker image of Apache Kafka + Zookeeper
python3 -m virtualenv localstackenv
source localstackenv/bin/activate
pip install localstack
localstack start --docker
docker-compose up
Just run the Spring Boot application - "SpringBootCamelApplication" from IDE or maven & observe the logs.
2019-04-06 11:50:50.010 INFO 22172 --- [ucer[TestTopic]] route2 :
{
"orderNumber": 1,
"country": "US",
"amount": 100,
"items": [
{
"itemId" : 123,
"itemCost": 33,
"itemQty": 12
}
]
}
2019-04-06 11:50:50.013 INFO 22172 --- [umer[TestTopic]] FromKafka : consumed message from Kafka
2019-04-06 11:50:50.013 INFO 22172 --- [umer[TestTopic]] FromKafka : Hello from Gonnect
2019-04-06 11:50:50.036 INFO 22172 --- [umer[TestTopic]] FromKafka : consumed message from Kafka
2019-04-06 11:50:50.036 INFO 22172 --- [umer[TestTopic]] FromKafka :
{
"orderNumber": 1,
"country": "NL",
"amount": 100,
"items": [
{
"itemId" : 123,
"itemCost": 33,
"itemQty": 12
}
]
}
.... ...... ........
curl -XGET -s http://localhost:4001/actuator/camelroutes
[
{
"id": "route1",
"uptime": "9.780 seconds",
"uptimeMillis": 9781,
"properties": {
"parent": "64beb2b7",
"rest": "false",
"description": null,
"id": "route1"
},
"status": "Started"
},
{
"id": "RandomTextGeneratorRoute",
"uptime": "9.780 seconds",
"uptimeMillis": 9780,
"properties": {
"parent": "4e4b7abd",
"rest": "false",
"description": null,
"id": "RandomTextGeneratorRoute"
},
"status": "Started"
},
{
"id": "FromKafka",
"uptime": "9.736 seconds",
"uptimeMillis": 9736,
"properties": {
"parent": "730cd2d0",
"rest": "false",
"description": null,
"id": "FromKafka"
},
"status": "Started"
},
{
"id": "kafkaStartWithPartitioner",
"group": "kafka-route-group",
"uptime": "9.735 seconds",
"uptimeMillis": 9735,
"properties": {
"parent": "f10d3e4",
"rest": "false",
"description": null,
"id": "kafkaStartWithPartitioner",
"group": "kafka-route-group"
},
"status": "Started"
},
{
"id": "route2",
"uptime": "9.734 seconds",
"uptimeMillis": 9734,
"properties": {
"parent": "38e4f7b",
"rest": "false",
"description": null,
"id": "route2"
},
"status": "Started"
},
{
"id": "hello",
"group": "hello-group",
"uptime": "9.734 seconds",
"uptimeMillis": 9734,
"properties": {
"parent": "1915ce41",
"rest": "false",
"description": null,
"id": "hello",
"group": "hello-group"
},
"status": "Started"
}
]
curl -XGET -s http://localhost:4001/actuator/camelroutes/{id}/detail
NOTE: id = route1
{
"id": "route1",
"uptime": "2 minutes",
"uptimeMillis": 164554,
"properties": {
"parent": "64beb2b7",
"rest": "false",
"description": null,
"id": "route1"
},
"status": "Started",
"details": {
"deltaProcessingTime": -4,
"exchangesInflight": 0,
"exchangesTotal": 82,
"externalRedeliveries": 0,
"failuresHandled": 0,
"firstExchangeCompletedExchangeId": "ID-APMGJGH67551C6-1554502425015-0-3",
"firstExchangeCompletedTimestamp": "2019-04-05T22:13:49.139+0000",
"lastExchangeCompletedExchangeId": "ID-APMGJGH67551C6-1554502425015-0-590",
"lastExchangeCompletedTimestamp": "2019-04-05T22:16:30.879+0000",
"lastProcessingTime": 10,
"maxProcessingTime": 408,
"meanProcessingTime": 21,
"minProcessingTime": 8,
"redeliveries": 0,
"totalProcessingTime": 1769,
"hasRouteController": false
}
}