neo4j-contrib/neo4j-streams

Issue with Neo4j kafka sink connector in combination with Avro schema nesting

ThomasBorghs opened this issue ยท 12 comments

(edited, added full stacktrace)

Hi,

I'm trying to ingest Kafka messages encoded in Avro into neo4j using the kafka connect neo4j sink.

I'm getting this error:

Caused by: org.apache.kafka.connect.errors.DataException: org.neo4j.driver.internal.value.FloatValue is not a supported data type.

This error seems to be caused by a "double" datatype in my Avro schema:

Caused by: org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'latitude'

Stacktrace:

[2021-11-09 16:52:53,416] ERROR ErrorData(originalTopic=rtim-wagon-gps, timestamp=1603019642492, partition=8, offset=295990, exception=org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'networkLocation', key=338849101419|3187547f-4a5a-4f02-acf0-7bd617712fd6, value=Struct{parentMessageUuid=f0a2724f-656b-43fd-a1fa-22273e4335d7,wagonNumber=338849101419,measurementSource=WAGON_GPS,measurementTime=2020-10-18T13:10:00+02:00,measurementType=WAGON_GPS,messageTime=2020-, executingClass=class streams.kafka.connect.sink.Neo4jSinkTask) (streams.kafka.connect.sink.Neo4jSinkTask)
org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'networkLocation'
	at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertStruct(AbstractConverter.java:259)
	at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convert(AbstractConverter.java:78)
	at streams.kafka.connect.utils.ConnectExtensionFunctionsKt.convertData(ConnectExtensionFunctions.kt:24)
	at streams.kafka.connect.utils.ConnectExtensionFunctionsKt.convertData$default(ConnectExtensionFunctions.kt:23)
	at streams.kafka.connect.utils.ConnectExtensionFunctionsKt.toStreamsSinkEntity(ConnectExtensionFunctions.kt:19)
	at streams.kafka.connect.sink.EventBuilder.build(EventBuilder.kt:28)
	at streams.kafka.connect.sink.Neo4jSinkTask.put(Neo4jSinkTask.kt:41)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'coordinates'
	at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertStruct(AbstractConverter.java:259)
	at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convert(AbstractConverter.java:78)
	at streams.kafka.connect.sink.converters.MapValueConverter.setStructField(MapValueConverter.kt:56)
	at streams.kafka.connect.sink.converters.MapValueConverter.setStructField(MapValueConverter.kt:9)
	at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertStruct(AbstractConverter.java:244)
	... 17 more
Caused by: org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'latitude'
	at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertMap(AbstractConverter.java:159)
	at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convert(AbstractConverter.java:80)
	at streams.kafka.connect.sink.converters.MapValueConverter.setMap(MapValueConverter.kt:62)
	at streams.kafka.connect.sink.converters.MapValueConverter.setStructField(MapValueConverter.kt:57)
	at streams.kafka.connect.sink.converters.MapValueConverter.setStructField(MapValueConverter.kt:9)
	at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertStruct(AbstractConverter.java:244)
	... 21 more
Caused by: org.apache.kafka.connect.errors.DataException: org.neo4j.driver.internal.value.FloatValue is not a supported data type.
	at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertMap(AbstractConverter.java:151)
	... 26 more

I believe that the schema is parsed incorrectly, where a Double type is converted into a org.neo4j.driver.internal.value.FloatValue, although the AbstractConverter class is not able to handle this datatype, but can only process java.lang.Float.

I'm deploying containers

  • version 2.0.0 of the neo4j sink connector
  • version 6.0.0 of the community confluent platform
  • neo4j 4.1.9 (latest available docker image)

This is the Avro schema of records on the Kafka cluster, where the latitude field is configured as a double

{
  "type": "record",
  "name": "WagonGpsMeasurement",
  "namespace": "be.lineas.rtim.model.gps",
  "fields": [
    {
      "name": "parentMessageUuid",
      "type": [
        "null",
        {
          "type": "string",
          "CustomEncoding": "NullableUUIDEncoding"
        }
      ],
      "default": null
    },
    {
      "name": "positionAccuracy",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "positionProvider",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "assignedWagonTripKey",
      "type": [
        "null",
        {
          "type": "record",
          "name": "WagonTripKey",
          "namespace": "be.lineas.rtim.model.wagonevent",
          "fields": [
            {
              "name": "start",
              "type": {
                "type": "string",
                "CustomEncoding": "InstantEncoding"
              }
            },
            {
              "name": "wagon",
              "type": "string"
            }
          ]
        }
      ],
      "default": null
    },
    {
      "name": "wagonNumber",
      "type": "string"
    },
    {
      "name": "assignedTrainId",
      "type": [
        "null",
        {
          "type": "record",
          "name": "TrainId",
          "namespace": "be.lineas.rtim.model.train",
          "fields": [
            {
              "name": "internationalTrainNumber",
              "type": "string"
            },
            {
              "name": "trainDate",
              "type": {
                "type": "string",
                "CustomEncoding": "LocalDateEncoding"
              }
            }
          ]
        }
      ],
      "default": null
    },
    {
      "name": "legUuid",
      "type": [
        "null",
        {
          "type": "string",
          "CustomEncoding": "NullableUUIDEncoding"
        }
      ],
      "default": null
    },
    {
      "name": "measurementSource",
      "type": {
        "type": "enum",
        "name": "MeasurementSource",
        "namespace": "be.lineas.rtim.model.measurement",
        "symbols": [
          "RTIS",
          "WAGON_GPS",
          "ISR"
        ]
      }
    },
    {
      "name": "measurementTime",
      "type": [
        "null",
        {
          "type": "string",
          "CustomEncoding": "NullableOffsetDateTimeEncoding"
        }
      ],
      "default": null
    },
    {
      "name": "measurementType",
      "type": {
        "type": "enum",
        "name": "MeasurementType",
        "namespace": "be.lineas.rtim.model.measurement",
        "symbols": [
          "DEPARTURE",
          "PASSAGE",
          "ARRIVAL",
          "WAGON_GPS",
          "LEFT_ORIGIN",
          "WAGON_ARRIVAL",
          "WAGON_DEPARTURE",
          "REACHED_DESTINATION",
          "PASSED_TROUGH",
          "BORDER_CROSSING",
          "BORDER_CROSSING_EXIT",
          "BORDER_CROSSING_ENTRY",
          "BORDER_CROSSING_ADVICE",
          "WAGON_REFUSED",
          "READY_TO_PULL",
          "PULLED",
          "DELIVERED",
          "FREE_FOR_DISPOSITION",
          "WAGON_BROKEN",
          "WAGON_REPAIRED",
          "TRANSPORT_CREATED",
          "CREATE_TRANSPORT_DOSSIER",
          "UPDATE_TRANSPORT_DOSSIER",
          "WAGON_MILEAGE_CALCULATION_RESULT",
          "DELETE_TRANSPORT_DOSSIER",
          "ELTA_FORWARDING"
        ]
      }
    },
    {
      "name": "messageTime",
      "type": [
        "null",
        {
          "type": "string",
          "CustomEncoding": "NullableOffsetDateTimeEncoding"
        }
      ],
      "default": null
    },
    {
      "name": "messageUuids",
      "type": {
        "type": "array",
        "items": "string",
        "CustomEncoding": "UUIDListEncoding"
      }
    },
    {
      "name": "networkLocation",
      "type": [
        "null",
        {
          "type": "record",
          "name": "NetworkLocation",
          "namespace": "be.lineas.rtim.model.networklocation",
          "fields": [
            {
              "name": "coordinates",
              "type": [
                "null",
                {
                  "type": "record",
                  "name": "Coordinates",
                  "namespace": "be.lineas.rtim.model.gps",
                  "fields": [
                    {
                      "name": "latitude",
                      "type": "double"
                    },
                    {
                      "name": "longitude",
                      "type": "double"
                    }
                  ]
                }
              ],
              "default": null
            },
            {
              "name": "countryIsoCode",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "isEnriched",
              "type": "boolean"
            },
            {
              "name": "locationUUID",
              "type": [
                "null",
                {
                  "type": "string",
                  "CustomEncoding": "NullableUUIDEncoding"
                }
              ],
              "default": null
            },
            {
              "name": "name",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "netCode",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "primaryLocationCode",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "uicCode",
              "type": [
                "null",
                "string"
              ],
              "default": null
            }
          ]
        }
      ],
      "default": null
    },
    {
      "name": "uuid",
      "type": {
        "type": "string",
        "CustomEncoding": "UUIDEncoding"
      }
    }
  ]
}

Hi @ThomasBorghs,

I did some tests and it seems that the conversion of double's type fields happens correctly.
Could you please share your JSON Sink connector configuration and a sample event?

Thanks in advance.

Regards,

Mauro

Hi Mauro,

Thanks for the reply! I'm having issues with running the connector as an AVRO sink through. I am also running a JSON connector on the same kafka cluster and the same neo4j instance without any issues.

here is my configuration for the AVRO connector (I've left out the neo4j credentials)

{
  "name": "neo4j-rtim-wagon-gps-sink",
  "config": {
    "topics": "rtim-wagon-gps",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "kafka.bootstrap.servers": "tst-kafka0-mp.lineas.net:9092,tst-kafka1-mp.lineas.net:9092,tst-kafka2-mp.lineas.net:9092",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry-service.schema-registry.svc.cluster.local",
    "value.converter.enhanced.avro.schema.support": "true",
    "errors.retry.timeout": "-1",
    "errors.retry.delay.max.ms": "1000",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.deadletterqueue.topic.name": "neo4j-deadletter-topic",
    "errors.log.include.messages": true,
    "neo4j.server.uri": "bolt://neo4j-bolt.neo4j.svc.cluster.local:7687",
    "neo4j.encryption.enabled": false,
    "neo4j.topic.cypher.rtim-wagon-gps": "MERGE (wagon:Wagon {number: event.wagonNumber})\nMERGE (wagon)-[:HAS]->(measurement:Measurement {\n  uuid: event.uuid})\nSET\nmeasurement.type = event.measurementType,\nmeasurement.wagonNumber = event.wagonNumber,\nmeasurement.measurementTime = datetime(event.measurementTime.string),\nmeasurement.messageTime = datetime(event.messageTime.string),\nmeasurement.latitude = event.networkLocation.NetworkLocation.coordinates.Coordinates.latitude.double,\nmeasurement.longitude = event.networkLocation.NetworkLocation.coordinates.Coordinates.longitude.double\n\nWITH measurement, wagon\nMATCH (consignmentNote:ConsignmentNote)-[:CONSISTS_OF]->(wagon)\n  WHERE consignmentNote.startTime <= measurement.measurementTime < consignmentNote.endTime\nSET measurement.consignmentNoteNumber = consignmentNote.consignmentNoteNumber"
  }
}

a sample event is included below (represented as JSON):

{
      "parentMessageUuid": "fbcf1174-bdff-4de3-b284-88684e6d8c1c",
      "positionAccuracy": null,
      "positionProvider": null,
      "assignedWagonTripKey": null,
      "wagonNumber": "318847893860",
      "assignedTrainId": null,
      "legUuid": null,
      "measurementSource": "WAGON_GPS",
      "measurementTime": "2021-11-11T17:40:00+01:00",
      "measurementType": "WAGON_GPS",
      "messageTime": "2021-11-11T17:45:47+01:00",
      "messageUuids": [
        "69f9a896-e3e2-4753-b2fe-2688c9dde85f"
      ],
      "networkLocation": {
        "coordinates": {
          "latitude": 51.17389,
          "longitude": 3.82985
        },
        "countryIsoCode": null,
        "isEnriched": false,
        "locationUUID": null,
        "name": null,
        "netCode": null,
        "primaryLocationCode": null,
        "uicCode": null
      },
      "uuid": "69f9a896-e3e2-4753-b2fe-2688c9dde85f"
    }

Thanks!
Thomas

Also I've been trying to set up a unit test myself to verify, but setting up an AVRO message has proven to be too tricky up until now

Hi @ThomasBorghs,

i tested your AVRO schema with the event you shared and it seems there are some serialization issues, but not with the double type fields. I tried simplifying the schema using just the two fields latitude and longitude and the Kafka Neo4j connector works fine. So I would suggest to ensure the events are structured correctly based on the schema you shared.

Furthermore, i noticed that you haven't specified the Schema Registry port, in the Kafka Neo4j connector JSON config:

"value.converter.schema.registry.url": "http://schema-registry-service.schema-registry.svc.cluster.local"

Ensure that Kafka Connect is able to connect to Schema Registry correctly.

Regards,

Mauro

Hi @ThomasBorghs,

any news?

Mauro

Hi @mroiter-larus,

Thanks for the follow-up, and apologies for the delay, I'm doing this in between other work. (for now this is an unfunded proof of concept at my company)

I'm going to try and do some testing this weekend, I'll get back to you asap.

Thomas

Hi @mroiter-larus,

I've tested with another message and schema, I get a similar error but it seems not to be related to doubles ...

The schema:

{
  "type": "record",
  "name": "record",
  "fields": [
    {
      "name": "trainId",
      "type": {
        "type": "record",
        "name": "TrainId",
        "namespace": "be.lineas.rtim.model.train",
        "fields": [
          {
            "name": "internationalTrainNumber",
            "type": "string"
          },
          {
            "name": "trainDate",
            "type": "string"
          }
        ]
      }
    },
    {
      "name": "coreId",
      "type": [
        "null",
        "string"
      ]
    }
  ]
}

Example message:

{
      "trainId": {
        "internationalTrainNumber": "46261",
        "trainDate": "2021-05-20"
      },
      "coreId": "000000046261"
    }

Stacktrace:

[2021-11-18 08:18:51,964] ERROR ErrorData(originalTopic=train-linking, timestamp=1592462714046, partition=0, offset=8499, exception=org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'trainId', key=444256-2020-06-18, value=Struct{trainId=Struct{internationalTrainNumber=444256,trainDate=2020-06-18},coreId=000065933080}, executingClass=class streams.kafka.connect.sink.Neo4jSinkTask) (streams.kafka.connect.sink.Neo4jSinkTask:30)
org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'trainId'
	at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertStruct(AbstractConverter.java:259)
	at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convert(AbstractConverter.java:78)
	at streams.kafka.connect.utils.ConnectExtensionFunctionsKt.convertData(ConnectExtensionFunctions.kt:24)
	at streams.kafka.connect.utils.ConnectExtensionFunctionsKt.convertData$default(ConnectExtensionFunctions.kt:23)
	at streams.kafka.connect.utils.ConnectExtensionFunctionsKt.toStreamsSinkEntity(ConnectExtensionFunctions.kt:19)
	at streams.kafka.connect.sink.EventBuilder.build(EventBuilder.kt:28)
	at streams.kafka.connect.sink.Neo4jSinkTask.put(Neo4jSinkTask.kt:41)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'internationalTrainNumber'
	at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertMap(AbstractConverter.java:159)
	at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convert(AbstractConverter.java:80)
	at streams.kafka.connect.sink.converters.MapValueConverter.setMap(MapValueConverter.kt:62)
	at streams.kafka.connect.sink.converters.MapValueConverter.setStructField(MapValueConverter.kt:57)
	at streams.kafka.connect.sink.converters.MapValueConverter.setStructField(MapValueConverter.kt:9)
	at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertStruct(AbstractConverter.java:244)
	... 17 more
Caused by: org.apache.kafka.connect.errors.DataException: org.neo4j.driver.internal.value.StringValue is not a supported data type.
	at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertMap(AbstractConverter.java:151)
	... 22 more

This error occurs irrespective of what cypher query I configure in the connector. Seems to me like this is related to the fact that the "internationalTrainNumber" field is nested in the "trainId" field? Or maybe because the trainId contains multiple fields, which causes the mapper to use the setMap method? I was trying to set up a unit test using a nested AVRO message on the streams project to check this, but the code structure is not that trivial. I'll keep investigating though, but any help is appreciated.

Cheers,
Thomas

Hi @mroiter-larus,

I've succeeded in creating a unit test which causes my issue, in the Neo4jValueConverterTest. I would assume the schema that I create here would be valid? It's just as simple nesting of structs, following the exact message structure of the example in my previous post. I'll keep looking into what the root cause of this is, but there seems to be a mismatch in typing between the schema conversion and what types the AbstractConverter actually supports

    @Test
    fun `should be able to process a nested AVRO structure`() {
        val train_schema = SchemaBuilder.struct()
                .field("internationalTrainNumber", Schema.STRING_SCHEMA)
                .field("trainDate", Schema.STRING_SCHEMA).build()
        val my_schema = SchemaBuilder.struct()
                .field("trainId", train_schema)
                .field("coreId", Schema.STRING_SCHEMA).build()

        val trainIdStruct = Struct(train_schema)
                .put("internationalTrainNumber", "46261")
                .put("trainDate", "2021-05-20")
        val rootStruct = Struct(my_schema)
                .put("trainId", trainIdStruct)
                .put("coreId", "000000046261")

        val result = Neo4jValueConverter().convert(rootStruct) as Map<*, *>
    }
org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'trainId'

	at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertStruct(AbstractConverter.java:259)
	at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convert(AbstractConverter.java:78)
	at streams.kafka.connect.sink.Neo4jValueConverterTest.should be able to process a nested AVRO structure(Neo4jValueConverterTest.kt:34)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'internationalTrainNumber'
	at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertMap(AbstractConverter.java:159)
	at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convert(AbstractConverter.java:80)
	at streams.kafka.connect.sink.converters.MapValueConverter.setMap(MapValueConverter.kt:62)
	at streams.kafka.connect.sink.converters.MapValueConverter.setStructField(MapValueConverter.kt:57)
	at streams.kafka.connect.sink.converters.MapValueConverter.setStructField(MapValueConverter.kt:9)
	at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertStruct(AbstractConverter.java:244)
	... 27 more
Caused by: org.apache.kafka.connect.errors.DataException: org.neo4j.driver.internal.value.StringValue is not a supported data type.
	at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertMap(AbstractConverter.java:151)
	... 32 more

Cheers,
Thomas

@ThomasBorghs thank you so much for the test!!!

@ThomasBorghs

Great,

I'll try that test! Thanks a lot.

Mauro

Hi @mroiter-larus @conker84,

Do you guys have any idea when a new version might be released which includes this fix? Also, do you also manage the release cycle of the confluent Neo4j sink connector as well? (https://www.confluent.io/hub/neo4j/kafka-connect-neo4j), or is this considered as a separate project and managed separately?

Just to know if I need to download your branch and build my own version in the meantime :)

Cheers,
Thomas

Hi @ThomasBorghs,

sorry for the late reply. We haven't planned a release date yet. We don't manage the release on Confluent Hub directly. However, in the meantime you can build your own version for sure.

Regards,

Mauro