Issue with Neo4j kafka sink connector in combination with Avro schema nesting
ThomasBorghs opened this issue ยท 12 comments
(edited, added full stacktrace)
Hi,
I'm trying to ingest Kafka messages encoded in Avro into neo4j using the kafka connect neo4j sink.
I'm getting this error:
Caused by: org.apache.kafka.connect.errors.DataException: org.neo4j.driver.internal.value.FloatValue is not a supported data type.
This error seems to be caused by a "double" datatype in my Avro schema:
Caused by: org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'latitude'
Stacktrace:
[2021-11-09 16:52:53,416] ERROR ErrorData(originalTopic=rtim-wagon-gps, timestamp=1603019642492, partition=8, offset=295990, exception=org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'networkLocation', key=338849101419|3187547f-4a5a-4f02-acf0-7bd617712fd6, value=Struct{parentMessageUuid=f0a2724f-656b-43fd-a1fa-22273e4335d7,wagonNumber=338849101419,measurementSource=WAGON_GPS,measurementTime=2020-10-18T13:10:00+02:00,measurementType=WAGON_GPS,messageTime=2020-, executingClass=class streams.kafka.connect.sink.Neo4jSinkTask) (streams.kafka.connect.sink.Neo4jSinkTask)
org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'networkLocation'
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertStruct(AbstractConverter.java:259)
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convert(AbstractConverter.java:78)
at streams.kafka.connect.utils.ConnectExtensionFunctionsKt.convertData(ConnectExtensionFunctions.kt:24)
at streams.kafka.connect.utils.ConnectExtensionFunctionsKt.convertData$default(ConnectExtensionFunctions.kt:23)
at streams.kafka.connect.utils.ConnectExtensionFunctionsKt.toStreamsSinkEntity(ConnectExtensionFunctions.kt:19)
at streams.kafka.connect.sink.EventBuilder.build(EventBuilder.kt:28)
at streams.kafka.connect.sink.Neo4jSinkTask.put(Neo4jSinkTask.kt:41)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'coordinates'
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertStruct(AbstractConverter.java:259)
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convert(AbstractConverter.java:78)
at streams.kafka.connect.sink.converters.MapValueConverter.setStructField(MapValueConverter.kt:56)
at streams.kafka.connect.sink.converters.MapValueConverter.setStructField(MapValueConverter.kt:9)
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertStruct(AbstractConverter.java:244)
... 17 more
Caused by: org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'latitude'
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertMap(AbstractConverter.java:159)
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convert(AbstractConverter.java:80)
at streams.kafka.connect.sink.converters.MapValueConverter.setMap(MapValueConverter.kt:62)
at streams.kafka.connect.sink.converters.MapValueConverter.setStructField(MapValueConverter.kt:57)
at streams.kafka.connect.sink.converters.MapValueConverter.setStructField(MapValueConverter.kt:9)
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertStruct(AbstractConverter.java:244)
... 21 more
Caused by: org.apache.kafka.connect.errors.DataException: org.neo4j.driver.internal.value.FloatValue is not a supported data type.
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertMap(AbstractConverter.java:151)
... 26 more
I believe that the schema is parsed incorrectly, where a Double type is converted into a org.neo4j.driver.internal.value.FloatValue, although the AbstractConverter class is not able to handle this datatype, but can only process java.lang.Float.
I'm deploying containers
- version 2.0.0 of the neo4j sink connector
- version 6.0.0 of the community confluent platform
- neo4j 4.1.9 (latest available docker image)
This is the Avro schema of records on the Kafka cluster, where the latitude field is configured as a double
{
"type": "record",
"name": "WagonGpsMeasurement",
"namespace": "be.lineas.rtim.model.gps",
"fields": [
{
"name": "parentMessageUuid",
"type": [
"null",
{
"type": "string",
"CustomEncoding": "NullableUUIDEncoding"
}
],
"default": null
},
{
"name": "positionAccuracy",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "positionProvider",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "assignedWagonTripKey",
"type": [
"null",
{
"type": "record",
"name": "WagonTripKey",
"namespace": "be.lineas.rtim.model.wagonevent",
"fields": [
{
"name": "start",
"type": {
"type": "string",
"CustomEncoding": "InstantEncoding"
}
},
{
"name": "wagon",
"type": "string"
}
]
}
],
"default": null
},
{
"name": "wagonNumber",
"type": "string"
},
{
"name": "assignedTrainId",
"type": [
"null",
{
"type": "record",
"name": "TrainId",
"namespace": "be.lineas.rtim.model.train",
"fields": [
{
"name": "internationalTrainNumber",
"type": "string"
},
{
"name": "trainDate",
"type": {
"type": "string",
"CustomEncoding": "LocalDateEncoding"
}
}
]
}
],
"default": null
},
{
"name": "legUuid",
"type": [
"null",
{
"type": "string",
"CustomEncoding": "NullableUUIDEncoding"
}
],
"default": null
},
{
"name": "measurementSource",
"type": {
"type": "enum",
"name": "MeasurementSource",
"namespace": "be.lineas.rtim.model.measurement",
"symbols": [
"RTIS",
"WAGON_GPS",
"ISR"
]
}
},
{
"name": "measurementTime",
"type": [
"null",
{
"type": "string",
"CustomEncoding": "NullableOffsetDateTimeEncoding"
}
],
"default": null
},
{
"name": "measurementType",
"type": {
"type": "enum",
"name": "MeasurementType",
"namespace": "be.lineas.rtim.model.measurement",
"symbols": [
"DEPARTURE",
"PASSAGE",
"ARRIVAL",
"WAGON_GPS",
"LEFT_ORIGIN",
"WAGON_ARRIVAL",
"WAGON_DEPARTURE",
"REACHED_DESTINATION",
"PASSED_TROUGH",
"BORDER_CROSSING",
"BORDER_CROSSING_EXIT",
"BORDER_CROSSING_ENTRY",
"BORDER_CROSSING_ADVICE",
"WAGON_REFUSED",
"READY_TO_PULL",
"PULLED",
"DELIVERED",
"FREE_FOR_DISPOSITION",
"WAGON_BROKEN",
"WAGON_REPAIRED",
"TRANSPORT_CREATED",
"CREATE_TRANSPORT_DOSSIER",
"UPDATE_TRANSPORT_DOSSIER",
"WAGON_MILEAGE_CALCULATION_RESULT",
"DELETE_TRANSPORT_DOSSIER",
"ELTA_FORWARDING"
]
}
},
{
"name": "messageTime",
"type": [
"null",
{
"type": "string",
"CustomEncoding": "NullableOffsetDateTimeEncoding"
}
],
"default": null
},
{
"name": "messageUuids",
"type": {
"type": "array",
"items": "string",
"CustomEncoding": "UUIDListEncoding"
}
},
{
"name": "networkLocation",
"type": [
"null",
{
"type": "record",
"name": "NetworkLocation",
"namespace": "be.lineas.rtim.model.networklocation",
"fields": [
{
"name": "coordinates",
"type": [
"null",
{
"type": "record",
"name": "Coordinates",
"namespace": "be.lineas.rtim.model.gps",
"fields": [
{
"name": "latitude",
"type": "double"
},
{
"name": "longitude",
"type": "double"
}
]
}
],
"default": null
},
{
"name": "countryIsoCode",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "isEnriched",
"type": "boolean"
},
{
"name": "locationUUID",
"type": [
"null",
{
"type": "string",
"CustomEncoding": "NullableUUIDEncoding"
}
],
"default": null
},
{
"name": "name",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "netCode",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "primaryLocationCode",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "uicCode",
"type": [
"null",
"string"
],
"default": null
}
]
}
],
"default": null
},
{
"name": "uuid",
"type": {
"type": "string",
"CustomEncoding": "UUIDEncoding"
}
}
]
}
Hi @ThomasBorghs,
I did some tests and it seems that the conversion of double's type fields happens correctly.
Could you please share your JSON Sink connector configuration and a sample event?
Thanks in advance.
Regards,
Mauro
Hi Mauro,
Thanks for the reply! I'm having issues with running the connector as an AVRO sink through. I am also running a JSON connector on the same kafka cluster and the same neo4j instance without any issues.
here is my configuration for the AVRO connector (I've left out the neo4j credentials)
{
"name": "neo4j-rtim-wagon-gps-sink",
"config": {
"topics": "rtim-wagon-gps",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"kafka.bootstrap.servers": "tst-kafka0-mp.lineas.net:9092,tst-kafka1-mp.lineas.net:9092,tst-kafka2-mp.lineas.net:9092",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry-service.schema-registry.svc.cluster.local",
"value.converter.enhanced.avro.schema.support": "true",
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "neo4j-deadletter-topic",
"errors.log.include.messages": true,
"neo4j.server.uri": "bolt://neo4j-bolt.neo4j.svc.cluster.local:7687",
"neo4j.encryption.enabled": false,
"neo4j.topic.cypher.rtim-wagon-gps": "MERGE (wagon:Wagon {number: event.wagonNumber})\nMERGE (wagon)-[:HAS]->(measurement:Measurement {\n uuid: event.uuid})\nSET\nmeasurement.type = event.measurementType,\nmeasurement.wagonNumber = event.wagonNumber,\nmeasurement.measurementTime = datetime(event.measurementTime.string),\nmeasurement.messageTime = datetime(event.messageTime.string),\nmeasurement.latitude = event.networkLocation.NetworkLocation.coordinates.Coordinates.latitude.double,\nmeasurement.longitude = event.networkLocation.NetworkLocation.coordinates.Coordinates.longitude.double\n\nWITH measurement, wagon\nMATCH (consignmentNote:ConsignmentNote)-[:CONSISTS_OF]->(wagon)\n WHERE consignmentNote.startTime <= measurement.measurementTime < consignmentNote.endTime\nSET measurement.consignmentNoteNumber = consignmentNote.consignmentNoteNumber"
}
}
a sample event is included below (represented as JSON):
{
"parentMessageUuid": "fbcf1174-bdff-4de3-b284-88684e6d8c1c",
"positionAccuracy": null,
"positionProvider": null,
"assignedWagonTripKey": null,
"wagonNumber": "318847893860",
"assignedTrainId": null,
"legUuid": null,
"measurementSource": "WAGON_GPS",
"measurementTime": "2021-11-11T17:40:00+01:00",
"measurementType": "WAGON_GPS",
"messageTime": "2021-11-11T17:45:47+01:00",
"messageUuids": [
"69f9a896-e3e2-4753-b2fe-2688c9dde85f"
],
"networkLocation": {
"coordinates": {
"latitude": 51.17389,
"longitude": 3.82985
},
"countryIsoCode": null,
"isEnriched": false,
"locationUUID": null,
"name": null,
"netCode": null,
"primaryLocationCode": null,
"uicCode": null
},
"uuid": "69f9a896-e3e2-4753-b2fe-2688c9dde85f"
}
Thanks!
Thomas
Also I've been trying to set up a unit test myself to verify, but setting up an AVRO message has proven to be too tricky up until now
Hi @ThomasBorghs,
i tested your AVRO schema with the event you shared and it seems there are some serialization issues, but not with the double
type fields. I tried simplifying the schema using just the two fields latitude
and longitude
and the Kafka Neo4j connector works fine. So I would suggest to ensure the events are structured correctly based on the schema you shared.
Furthermore, i noticed that you haven't specified the Schema Registry port, in the Kafka Neo4j connector JSON config:
"value.converter.schema.registry.url": "http://schema-registry-service.schema-registry.svc.cluster.local"
Ensure that Kafka Connect is able to connect to Schema Registry correctly.
Regards,
Mauro
Hi @mroiter-larus,
Thanks for the follow-up, and apologies for the delay, I'm doing this in between other work. (for now this is an unfunded proof of concept at my company)
I'm going to try and do some testing this weekend, I'll get back to you asap.
Thomas
Hi @mroiter-larus,
I've tested with another message and schema, I get a similar error but it seems not to be related to doubles ...
The schema:
{
"type": "record",
"name": "record",
"fields": [
{
"name": "trainId",
"type": {
"type": "record",
"name": "TrainId",
"namespace": "be.lineas.rtim.model.train",
"fields": [
{
"name": "internationalTrainNumber",
"type": "string"
},
{
"name": "trainDate",
"type": "string"
}
]
}
},
{
"name": "coreId",
"type": [
"null",
"string"
]
}
]
}
Example message:
{
"trainId": {
"internationalTrainNumber": "46261",
"trainDate": "2021-05-20"
},
"coreId": "000000046261"
}
Stacktrace:
[2021-11-18 08:18:51,964] ERROR ErrorData(originalTopic=train-linking, timestamp=1592462714046, partition=0, offset=8499, exception=org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'trainId', key=444256-2020-06-18, value=Struct{trainId=Struct{internationalTrainNumber=444256,trainDate=2020-06-18},coreId=000065933080}, executingClass=class streams.kafka.connect.sink.Neo4jSinkTask) (streams.kafka.connect.sink.Neo4jSinkTask:30)
org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'trainId'
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertStruct(AbstractConverter.java:259)
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convert(AbstractConverter.java:78)
at streams.kafka.connect.utils.ConnectExtensionFunctionsKt.convertData(ConnectExtensionFunctions.kt:24)
at streams.kafka.connect.utils.ConnectExtensionFunctionsKt.convertData$default(ConnectExtensionFunctions.kt:23)
at streams.kafka.connect.utils.ConnectExtensionFunctionsKt.toStreamsSinkEntity(ConnectExtensionFunctions.kt:19)
at streams.kafka.connect.sink.EventBuilder.build(EventBuilder.kt:28)
at streams.kafka.connect.sink.Neo4jSinkTask.put(Neo4jSinkTask.kt:41)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'internationalTrainNumber'
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertMap(AbstractConverter.java:159)
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convert(AbstractConverter.java:80)
at streams.kafka.connect.sink.converters.MapValueConverter.setMap(MapValueConverter.kt:62)
at streams.kafka.connect.sink.converters.MapValueConverter.setStructField(MapValueConverter.kt:57)
at streams.kafka.connect.sink.converters.MapValueConverter.setStructField(MapValueConverter.kt:9)
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertStruct(AbstractConverter.java:244)
... 17 more
Caused by: org.apache.kafka.connect.errors.DataException: org.neo4j.driver.internal.value.StringValue is not a supported data type.
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertMap(AbstractConverter.java:151)
... 22 more
This error occurs irrespective of what cypher query I configure in the connector. Seems to me like this is related to the fact that the "internationalTrainNumber" field is nested in the "trainId" field? Or maybe because the trainId contains multiple fields, which causes the mapper to use the setMap method? I was trying to set up a unit test using a nested AVRO message on the streams project to check this, but the code structure is not that trivial. I'll keep investigating though, but any help is appreciated.
Cheers,
Thomas
Hi @mroiter-larus,
I've succeeded in creating a unit test which causes my issue, in the Neo4jValueConverterTest. I would assume the schema that I create here would be valid? It's just as simple nesting of structs, following the exact message structure of the example in my previous post. I'll keep looking into what the root cause of this is, but there seems to be a mismatch in typing between the schema conversion and what types the AbstractConverter actually supports
@Test
fun `should be able to process a nested AVRO structure`() {
val train_schema = SchemaBuilder.struct()
.field("internationalTrainNumber", Schema.STRING_SCHEMA)
.field("trainDate", Schema.STRING_SCHEMA).build()
val my_schema = SchemaBuilder.struct()
.field("trainId", train_schema)
.field("coreId", Schema.STRING_SCHEMA).build()
val trainIdStruct = Struct(train_schema)
.put("internationalTrainNumber", "46261")
.put("trainDate", "2021-05-20")
val rootStruct = Struct(my_schema)
.put("trainId", trainIdStruct)
.put("coreId", "000000046261")
val result = Neo4jValueConverter().convert(rootStruct) as Map<*, *>
}
org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'trainId'
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertStruct(AbstractConverter.java:259)
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convert(AbstractConverter.java:78)
at streams.kafka.connect.sink.Neo4jValueConverterTest.should be able to process a nested AVRO structure(Neo4jValueConverterTest.kt:34)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'internationalTrainNumber'
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertMap(AbstractConverter.java:159)
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convert(AbstractConverter.java:80)
at streams.kafka.connect.sink.converters.MapValueConverter.setMap(MapValueConverter.kt:62)
at streams.kafka.connect.sink.converters.MapValueConverter.setStructField(MapValueConverter.kt:57)
at streams.kafka.connect.sink.converters.MapValueConverter.setStructField(MapValueConverter.kt:9)
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertStruct(AbstractConverter.java:244)
... 27 more
Caused by: org.apache.kafka.connect.errors.DataException: org.neo4j.driver.internal.value.StringValue is not a supported data type.
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertMap(AbstractConverter.java:151)
... 32 more
Cheers,
Thomas
@ThomasBorghs thank you so much for the test!!!
Do you guys have any idea when a new version might be released which includes this fix? Also, do you also manage the release cycle of the confluent Neo4j sink connector as well? (https://www.confluent.io/hub/neo4j/kafka-connect-neo4j), or is this considered as a separate project and managed separately?
Just to know if I need to download your branch and build my own version in the meantime :)
Cheers,
Thomas
Hi @ThomasBorghs,
sorry for the late reply. We haven't planned a release date yet. We don't manage the release on Confluent Hub directly. However, in the meantime you can build your own version for sure.
Regards,
Mauro