Use the IBM I journal as a source of CDC events see https://bitbucket.org/jhc-systems/journal-parsing/ for the journal fetch/decoding
GRTOBJAUT OBJ(<JRNLIB>) OBJTYPE(*LIB) USER(<CDC_USER>) AUT(*EXECUTE)
GRTOBJAUT OBJ(<JRNLIB>/*ALL) OBJTYPE(*JRNRCV) USER(<CDC_USER>) AUT(*USE)
GRTOBJAUT OBJ(<JRNLIB>/<JRN>) OBJTYPE(*JRN) USER(<CDC_USER>) AUT(*USE *OBJEXIST)
GRTOBJAUT OBJ(<PROJECT_LIB>) OBJTYPE(*LIB) USER(<CDC_USER>) AUT(*EXECUTE)
GRTOBJAUT OBJ(<PROJECT_LIB>/*ALL) OBJTYPE(*FILE) USER(<CDC_USER>) AUT(*USE)
Where:
- is the library where the journal and receivers reside
- is the journal name
- <PROJECT_LIB> is the Figaro database library
- <CDC_USER> is the username of the CDC service account
The following environment variables are mandatory configuration and have no default values
DEBEZIUM_BOOTSTRAP_SERVERS
SCHEMA_REGISTRY_URL
DEBEZIUM_REST_ADVERTISED_HOST_NAME
It is highly recommended that the partitions and replicaion_factor is increased for production
PARTITIONS=3
REPLICATION_FACTOR=3
- TODO integrate with exit program to prevent journal loss https://github.com/jhc-systems/debezium-ibmi-exitpgm
- Limited support for table changes - the journal entries for table changes are not documented so rely on fetching table structure at runtime and refetching when table change detected
- No support for remote journals and fail over
- No support for clobs/xml and similar large text/blobs
capture your ca cert normally the bottom of the output:
openssl s_client -showcerts -connect <host>:9471
between the last pair of -----BEGIN CERTIFICATE-----
and end and save it to iseries-cert.pem
If using docker simply mount your certs at /var/tls
If running natively import the cert
keytool -import -noprompt -alias iseries-cert -storepass changeit -keystore /usr/lib/jvm/java-1.17.0-openjdk-amd64/lib/security/cacerts -file iseries-cert.pem
If the journal is deleted before it is read it will log an error: "Lost journal at position xxx" and reset to the beginning journal
At this point you really need to resync, this does not happen automatically
- Todo needs to support ad-jhoc snapshotting
Recommended minimum memory 1GB
This can either be configured with cloud resources or using JAVA_OPTIONS=-Xmx1g
DEBEZIUM_
producer.interceptor.classes=brave.kafka.interceptor.TracingProducerInterceptor
consumer.interceptor.classes=brave.kafka.interceptor.TracingConsumerInterceptor
producer.zipkin.sender.type=KAFKA
producer.zipkin.local.service_name=ksql
producer.zipkin.bootstrap.servers=mskafka:9092
https://bitbucket.org/jhc-systems/kafka-kubernetes/src/master/runtime/source/
A new connector configuration connector-name.json
:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '@./connector-name.json'
file connector-name.json:
{
"name": "connector-name",
"config": {
"connector.class": "io.debezium.connector.db2as400.As400RpcConnector",
"schema": "SCHEMA",
"sanitize.field.names": "true",
"tasks.max": "1",
"hostname": "ibmiserver",
"dbname": "database name",
"user": "xxx",
"password": "xxx",
"port": "",
"secure": true
"poll.interval.ms": "2000",
"transforms": "unwrap",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"table.include.list": "SCHEMA.TABLE1",
"snapshot.mode": "initial",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"snapshot.max.threads": 4
}
}
Note the dbname
can be blank and will be used as part of the jdbc connect string : dbc://hostname/dbname
Optional:
"socket timeout": "300000",
"keep alive": "true",
"thread used": "false"
the above help with connections that can be blocked (firewalled) or dropped due to vpn issues
Unusually we have the incorrect CCSID on all our tables and the data is forced into the tables with the wrong encoding
This issue should really be corrected and the data translated but with thousands of tables and many clients all configured incorrectly this is a huge job with significant risk. Instead we have an additional pair of settings from_ccsid which is the ccsid on the table and to_ccsid which will use this ccsid instead - this is for the entire system and all tables.
curl -i -X GET -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/
[connector-name]
and deleted with
curl -i -X DELETE -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/connector-name
curl -i -X GET -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/connector-name
The configuration can be updated with
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/connector-name/config/ -d "@connector-name-config.json"
Notes
- the
connector-name
in the url must match thename
in the json file - the update file only contains the inner
config
{
"connector.class": "io.debezium.connector.db2as400.As400RpcConnector",
"schema": "SCHEMA",
"sanitize.field.names": "true",
"tasks.max": "1",
"hostname": "ibmiserver",
"dbname": "database name",
"user": "xxx",
"password": "xxx",
"port": "",
"poll.interval.ms": "2000",
"transforms": "unwrap",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"table.include.list": "SCHEMA.TABLE1",
"snapshot.mode": "initial",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
}
Here we've added TABLE2 to the list.
prometheus stats are avaialble on port 7071
sample prometheus stats are in
- metrics/prometheus
sample grafana charts
- metrics/grafana.txt
See upstream project: https://bitbucket.org/jhc-systems/journal-parsing/src/master/
dspfd MYTABLE
File is currently journaled . . . . . . . . : Yes
Current or last journal . . . . . . . . . . : MYJRN
Library . . . . . . . . . . . . . . . . . : MYLIB
Journal images . . . . . . . . . . . . . . : IMAGES *BOTH
https://lucid.app/lucidchart/invitations/accept/inv_b0dba11e-fb73-4bfc-9efd-1c14d7ef2642
main class
org.apache.kafka.connect.cli.ConnectDistributed
runtime argument of the configuration e.g. for
- local kafka
src/test/resources/protobuf.properties
- remote confluent
src/test/resources/confluent.properties
Logging - vm args -Dlogback.configurationFile=src/test/resources/logback.xml
https://bitbucket.org/jhc-systems/kafka-kubernetes/src/master/docker/
Configure the IP addresses in conf/local.env
, src/test/resources/protobuf.properties
, and src/test/resources/confluent.properties
to be your IP address. If running using localhost, you can use 0.0.0.0
for each of these.
To run in VS Code, configure the following launch.json file, and run from the Run and Debug extension.
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "java",
"name": "Launch",
"request": "launch",
"mainClass": "org.apache.kafka.connect.cli.ConnectDistributed",
"projectName": "debezium-connector-ibmi",
"env": {},
"args": "src/test/resources/protobuf.properties",
"logback.configurationFile": "src/test/resources/logback.xml"
}
]
}
New configuration paramater secure
this defaults to true
Fixes data loss bugs:
-
- when receivers reset
-
- after receiving a continuation offset the first entry is occasionally lost (when the next request doesn't contain a continuation offset)
additional configuration parameters required for avro in the submitted json
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
optional parameter
"snapshot.max.threads": 4
mvn compile jib:dockerBuild