Disclaimer: This work is inspired by Debezium ExtractNewRecordState SMT. We take the opportunity to thanks the whole Debezium team for the quality of their work and for making it available for all.
Qlik Replicate change events are composed by 3 parts:
headers
: which includesoperation
: The SQL operation:INSERT
,UPDATE
,DELETE
orREFRESH
(ie. initial loading)changeSequence
: A monotonically increasing change sequence that is commom to all change tables of a tasktimestamp
: The original change UTC timestamp.streamPosition
: The source stream offset position.transactionId
: The ID of the transaction that the change record belongs to.changeMask
: Hexadecimal sting to detect which columns have changed (internal purpose).columnMask
: Hexadecimal sting to present which columns are present (internal purpose).transactionEventCounter
: The sequence number of the current operation in the transaction.transactionLastEvent
: Flag to mark the last change of a transaction.
beforeData
: The database record before the change.data
: The database record after the change
For example, an UPDATE
change event looks like this:
{
"data": {
"id": 1,
"email": "foo@bar.com"
},
"beforeData": {
"id": 1,
"email": "john@doe.com"
},
"headers": {
"operation": "UPDATE",
"changeSequence": "100",
"timestamp": 1000,
"streamPosition": "1",
"transactionId": "1",
"changeMask": "1",
"columnMask": "1",
"transactionEventCounter": 1,
"transactionLastEvent": true
}
}
The structure provides all the information Qlik Replicate has.
However, the rest of the Kafka ecosystem usually expect the data in a more simple format like:
{
"id": 1,
"email": "john@doe.com"
}
Download the latest jar on the release page.
The SMT jar should be present in the plugin.path
with the other Kafka connector/SMT jars.
The configuration of the ExtractNewRecordState
SMT is made in your Kafka Connect sink connector's configuration.
Given the following Qlik Replicate change event message:
{
"data": null,
"beforeData": {
"id": 1,
"email": "john@doe.com"
},
"headers": {
"operation": "DELETE",
"changeSequence": "100",
"timestamp": 1000,
"streamPosition": "1",
"transactionId": "1",
"changeMask": "1",
"columnMask": "1",
"transactionEventCounter": 1,
"transactionLastEvent": true
}
}
We can use the following configuration
"transforms": "extract",
"transforms.extract.type": "com.michelin.kafka.connect.transforms.qlik.replicate.ExtractNewRecordState",
"transforms.extract.delete.mode": "soft",
"transforms.extract.add.fields": "operation,timestamp",
"transforms.extract.add.headers": "transactionId"
delete.mode=soft
The SMT adds __deleted
and set it to true whenever a delete operation is processed.
add.fields=operation,timestamp
Adds change event metadata (coming from the headers
property).
It will add a __operation
and __timestamp
field to teh simplified Kafka record.
add.headers=transactionId
Same as add.fields
but with the Kafka record headers.
We will obtain the following Kafka message:
{
"id": "1",
"email": "john@doe.com
"__deleted": "true",
"__operation": "DELETE",
"__timestamp": "1000"
}
and a header "__transactionId: "1"
.
The ExtractNewRecordState
SMT (Single Message Transformation) extract the data
field form Qlik Replicate change event in a Kafka record.
The value of the data
field will replace the original message content to build a simpler Kafka record.
When a record is updated, the beforeData
will represent the previous state and data
the current.
By extracting the data
field, the SMT exposes the updated record.
If you plan to sink the updated record into a database, please use the upsert
semantic (INSERT OR UPDATE).
By comparing the beforeData
and data
properties, one is able to detect the source table modifications (add/remove columns).
The SMT will automatically build the new Schema corresponding to the new table structure.
A database DELETE
operation generate a change event like this:
{
"beforeData": null,
"data": {
"id": 1,
"email": "john@doe.com"
},
"headers": {
"operation": "DELETE",
"changeSequence": "100",
"timestamp": 1000,
"streamPosition": "1",
"transactionId": "1",
"changeMask": "1",
"columnMask": "1",
"transactionEventCounter": 1,
"transactionLastEvent": true
}
}
The change event data
has captured the record state and the headers.operation
is set to DELETE
.
ExtractNewRecordState
SMT proposes 3 options:
- Produce a tombstone record (Use case: the JDBCSinkConnector can be configured to convert a tombstones into an actual SQL delete statement on the Sink repository).
- Skip the record to prevent any downstream delete management.
- Do a soft delete (aka. logical delete), the SMT will add a
__deleted
field to each message and toggle totrue
whenever a delete operation is processed.
Qlick Replicate permits to wrap the change event data with an envelope like this:
{
"magic": "atMSG",
"type": "DT",
"headers": null,
"messageSchemaId": null,
"messageSchema": null,
"message": {
"data": {
"id": 1,
"email": "foo@bar.com"
},
"beforeData": {
"id": 1,
"email": "john@doe.com"
},
"headers": {
"operation": "UPDATE",
"changeSequence": "100",
"timestamp": 1000,
"streamPosition": "1",
"transactionId": "1",
"changeMask": "1",
"columnMask": "1",
"transactionEventCounter": 1,
"transactionLastEvent": true
}
}
}
The SMT detects the presence of the envelop and unwrap automatically to produce a simple message.
Qlik Replicate relies on JSON to represent a change event. Using schemas is highly recommended when you're building production-grade application but if your need a quick and dirty test, the SMT can process messages with and without schemas.
Note: Schemas are mandatory for some components like JdbcSinkConnector
for instance...
Property name | Default | Description |
---|---|---|
delete.mode |
apply |
Delete events will produce a tombstone. This property permits to choose between 3 modes: - apply : A tombstone will be produce (mainly to triggers the JDBCSinkConnector delete support).- skip : The Kafka record will be dropped and not passed to the downstream processing components.- soft : Adds a __deleted field to each message. When an insert or update is processed the _deleted field is set to false. When a delete occurs, teh SMT will use the beforeData property and set the __deleted field to true. |
add.fields |
"" |
Adds a field based on change event metadata (coming exclusively from the headers property).The field name will be prefixed by __ (double underscores). The field value will be converted to string. |
add.headers |
"" |
Adds a header based on change event metadata (coming exclusively from the headers property).The header name will be prefixed by __ (double underscores). |
Qlik replicate use a microseconds precision when representing a timestamp.
As for now, the TimestampComverter
only supports the milliseconds precision.
The ReplicateTimestampConverter
SMT is a duplication of TimestampComverter
which use a microseconds as default timestamp precsion rather than milliseconds.
Note: This SMT is very Qlik Replicate specific and should be removed when TimestampConverter
will support microseconds.
Is likely probable you want to chain the Qlik Replicate state extraction and date conversion. This is a sample configuration:
"transforms": "extract,convertBirthDate, convertCreationDate",
"transforms.extract.type": "com.michelin.kafka.connect.transforms.qlik.replicate.ExtractNewRecordState",
"transforms.convertBirthDate.type": "com.michelin.kafka.connect.transforms.qlik.replicate.ReplicateTimestampConverter$Value",
"transforms.convertBirthDate.field": "person_birthday",
"transforms.convertBirthDate.target.type": "Timestamp",
"transforms.convertCreationDate.type": "com.michelin.kafka.connect.transforms.qlik.replicate.ReplicateTimestampConverter$Value",
"transforms.convertCreationDate.field": "person_creation_date",
"transforms.convertCreationDate.target.type": "Date"
Notice that each date/time field conversion has to be independently defined.
See the official documentation https://docs.confluent.io/current/connect/transforms/timestampconverter.html
The release process relies on Maven Release plugin.
Run ./mvnw release:prepare
, it will ask you for the version name, SCM tag name and the nest development version name.
Once finished, you obtain a tag and the pom.xml has been updated.
If everything looks good, run ./mvnw release:perform
to confirm the changes otherwise run ./mvnw release:rollback
.
Once done go to Github Release page, you'll see a Draft
release, this has been automatically generated when the sources haven been tagged.
Select the tag you just made and define name for the release (the tag version is perfect).
The changelog is already generated via release drafter.
If everything is ok, publish the release. The artifact will be automatically build and attached to the release.
Kafka Connect SMT for Qlik Replicate is released under the MIT License. See the bundled LICENSE file for details.