Use the IBM I journal as a source of CDC events see https://bitbucket.org/jhc-systems/journal-parsing/ for the journal fetch/decoding
GRTOBJAUT OBJ() OBJTYPE(*LIB) USER(<CDC_USER>) AUT(*EXECUTE) GRTOBJAUT OBJ(/*ALL) OBJTYPE(*JRNRCV) USER(<CDC_USER>) AUT(*USE) GRTOBJAUT OBJ(/) OBJTYPE(*JRN) USER(<CDC_USER>) AUT(*USE *OBJEXIST)
GRTOBJAUT OBJ() OBJTYPE(*LIB) USER(<CDC_USER>) AUT(*EXECUTE) GRTOBJAUT OBJ(/*ALL) OBJTYPE(*FILE) USER(<CDC_USER>) AUT(*USE)
Where:
- is the library where the journal and receivers reside
- is the journal name
- is the Figaro database library
- <CDC_USER> is the username of the CDC service account
The following environment variables are mandatory configuration and have no default values
DEBEZIUM_BOOTSTRAP_SERVERS
SCHEMA_REGISTRY_URL
DEBEZIUM_REST_ADVERTISED_HOST_NAME
It is highly recommended that the partitions and replicaion_factor is increased for production
PARTITIONS=3
REPLICATION_FACTOR=3
If the journal is deleted before it is read it will log an error: "Lost journal at position xxx" and reset to the beginning journal
At this point you really need to resync, this does not happen automatically
- Todo needs to support ad-jhoc snapshotting
Recommended minimum memory 1GB
This can either be configured with cloud resources or using JAVA_OPTIONS=-Xmx1g
DEBEZIUM_
producer.interceptor.classes=brave.kafka.interceptor.TracingProducerInterceptor
consumer.interceptor.classes=brave.kafka.interceptor.TracingConsumerInterceptor
producer.zipkin.sender.type=KAFKA
producer.zipkin.local.service_name=ksql
producer.zipkin.bootstrap.servers=mskafka:9092
https://bitbucket.org/jhc-systems/kafka-kubernetes/src/master/runtime/source/
A new connector configuration connector-name.json
:
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '@./connector-name.json'
file connector-name.json:
{
"name": "connector-name",
"config": {
"connector.class": "io.debezium.connector.db2as400.As400RpcConnector",
"schema": "SCHEMA",
"sanitize.field.names": "true",
"tasks.max": "1",
"hostname": "ibmiserver",
"dbname": "database name",
"user": "xxx",
"password": "xxx",
"port": "",
"poll.interval.ms": "2000",
"transforms": "unwrap",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"table.include.list": "SCHEMA.TABLE1",
"snapshot.mode": "initial",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"snapshot.max.threads": 4
}
}
Note the dbname
can be blank and will be used as part of the jdbc connect string : dbc://hostname/dbname
Optional:
"socket timeout": "300000",
"keep alive": "true",
"thread used": "false"
the above help with connections that can be blocked (firewalled) or dropped due to vpn issues
Unusually we have the incorrect CCSID on all our tables and the data is forced into the tables with the wrong encoding
This issue should really be corrected and the data translated but with thousands of tables and many clients all configured incorrectly this is a huge job with significant risk. Instead we have an additional pair of settings from_ccsid which is the ccsid on the table and to_ccsid which will use this ccsid instead - this is for the entire system and all tables.
curl -i -X GET -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/
[connector-name]
and deleted with
curl -i -X DELETE -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/connector-name
curl -i -X GET -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/connector-name
The configuration can be updated with
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/connector-name/config/ -d "@connector-name-config.json"
where the update file only contains the inner config and the connector name is in the url path:
{
"name": "connector-name",
"connector.class": "io.debezium.connector.db2as400.As400RpcConnector",
"schema": "SCHEMA",
"sanitize.field.names": "true",
"tasks.max": "1",
"hostname": "ibmiserver",
"dbname": "internaldbname",
"user": "xxx",
"password": "xxx",
"port": "",
"poll.interval.ms": "2000",
"transforms": "unwrap",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"table.include.list": "SCHEMA.TABLE1,SCHEMA.TABLE2",
"snapshot.mode": "initial"
}
Here we've added TABLE2 to the list.
prometheus stats are avaialble on port 7071
sample prometheus stats are in
- metrics/prometheus
sample grafana charts
- metrics/grafana.txt
See upstream project: https://bitbucket.org/jhc-systems/journal-parsing/src/master/
dspfd MYTABLE
File is currently journaled . . . . . . . . : Yes
Current or last journal . . . . . . . . . . : MYJRN
Library . . . . . . . . . . . . . . . . . : MYLIB
Journal images . . . . . . . . . . . . . . : IMAGES *BOTH
https://lucid.app/lucidchart/invitations/accept/inv_b0dba11e-fb73-4bfc-9efd-1c14d7ef2642
main class
org.apache.kafka.connect.cli.ConnectDistributed
runtime argument of the configuration e.g. for
- local kafka
src/test/resources/protobuf.properties
- remote confluent
src/test/resources/confluent.properties
Logging - vm args -Dlogback.configurationFile=src/test/resources/logback.xml
https://bitbucket.org/jhc-systems/kafka-kubernetes/src/master/docker/
Configure the IP addresses in conf/local.env
, src/test/resources/protobuf.properties
, and src/test/resources/confluent.properties
to be your IP address. If running using localhost, you can use 0.0.0.0
for each of these.
To run in VS Code, configure the following launch.json file, and run from the Run and Debug extension.
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "java",
"name": "Launch",
"request": "launch",
"mainClass": "org.apache.kafka.connect.cli.ConnectDistributed",
"projectName": "debezium-connector-ibmi",
"env": {},
"args": "src/test/resources/protobuf.properties",
"logback.configurationFile": "src/test/resources/logback.xml"
}
]
}
additional configuration parameters required for avro in the submitted json
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
optional parameter
"snapshot.max.threads": 4
mvn compile jib:dockerBuild