This is a dummy project with the purpose of kafka study with the use of some small examples to cover some topics.
- log compression;
- order consuming;
- messaging schemas;
- schemas formats (Akron, JSON, Protobuffer, etc);
- replay, foward and rewind of the messagens in a topic;
- cluster management (creation, deletion and scaling);
- kSQL uses;
- alternatives do Docker (extra topic);
- and more ...
We will use coingecko API to give some contexts to project and make it more tasty.
This project uses docker and docker-compose in order to create local environments and Makefile to automatize some tasks.
kafka stack means a set of servers to deliver and receive streams or messages with many features and assets
it s kafka broker itself, where messages are pull and pushed
it controls a kafka necessary metadata, has already a discussion in comunity about strip out
where producers registry the schemas as Avro, JsonSchema, ProtoBuff and consumers loads it ones. SchemaRegistry works as http API and use kafka as your storage for schemas.
Below an example of a simple message serialized.
schema
{
string first_name = 1;
string last_name = 2;
}
message
{
first_name: "Arun",
last_name: "Kurian"
}
bytes
124Arun226Kurian
In the case of 124Arun
, 1
stands for the field identifier, 2
for the data type (which is the string), and 4
is the length of the text.
OBS: Confluent Schema Registry add a header in raw message this header its compose of 5 bytes prefixing the payload.
0 | 1-4 |
---|---|
unsigned char | unsigned int |
the schema_id can be retreived by sample code
value = b'\x00\x00\x00\x00\x01\x0485'
magic, schema_id = struct.unpack('>bI', value[:5])
print(magic, schema_id)
subjects
curl localhost:8081/subjects | jq '.'
schemas will responds our Avro(in example) schemas, a descriptions of fields
curl localhost:8081/schemas | jq '.'
Aws Glue can be used as schema Registry too.
OBS: Glue Schema Registry add a header in raw message this header its compose of 18 bytes prefixing the payload.
0 | 1 | 2-17 |
---|---|---|
8 bit version | 1 about compression | is a 128 bytes uuid |
create without schema-registry:
b{'Scope': 'dev', 'Name': 'testevent', 'Version': 'v0', 'Payload': {'required_field': 'valor do required_field', 'struct_field': {'text_field': 'valor do text_field'}}}
created by aws_glue_schema_registry producer:
b\x03\x00\x13\xc0d\x87W\xd3E\x15\xa9\x18\xb6\x8a\x0f\x7f\xa0\xf0{"Scope":"dev","Version":"v0","Payload":{"required_field":"valor do required_field","struct_field":{"text_field":"valor do text_field"}},"Name":"testevent"}
import uuid
value = b'\x03\x00\x13\xc0d\x87W\xd3E\x15\xa9\x18\xb6\x8a\x0f\x7f\xa0\xf0{"Scope":"dev","Version":"v0","Payload":{"required_field":"valor do required_field","struct_field":{"text_field":"valor do text_field"}},"Name":"testevent"}'
uuid.UUID(bytes=value[2:18])
# UUID('13c06487-57d3-4515-a918-b68a0f7fa0f0')
# ^ match with a schema registred in glue
...
...
It s a integration of sources and syncs with kafka by "plug and play" based in declarative configs. For exemple: mysql table -> kafka topic -> kibana index
https://github.com/confluentinc/kafka-connect-storage-cloud/
-
Products
- id
- name
-
Clicks
- id uuid
- product_id
- visit_id
-
Orders
- id
- product_id
- click_id
-
Visits
- id