( Impact on BC/DR - Data loss ) - High priority, Nodes with field value is 2D Point will throw exception and cannot be successfully sink to Neo4j
yhhongyang opened this issue · 1 comments
yhhongyang commented
Description
In the source instance, create a node with a field value is 2D point, it will generate a Kafka payload like this
{
"payload":{
"id":"11",
"before":null,
"after":{
"properties":{
"address":{
"crs":"wgs-84",
"latitude":56.7,
"longitude":12.78,
"height":null
}
},
"labels":[
"Location"
]
},
"type":"node"
},
"schema":{
"properties":{
"address":"PointValue"
},
"constraints":[
]
}
}
As the value of the height
is null, in the sink instance, null values cannot be deserialized into correct PointValue, it will throw exception like this
2023-12-22 07:58:36.476+0000 INFO [neo4j/2b281484] [Sink] Registering the Streams Sink procedures
org.neo4j.exceptions.InvalidArgumentException: Cannot assign NO_VALUE to field height
at org.neo4j.values.storable.PointValue$PointBuilder.assignFloatingPoint(PointValue.java:667)
at org.neo4j.values.storable.PointValue$PointBuilder.assign(PointValue.java:610)
at org.neo4j.values.storable.PointValue.lambda$fromMap$0(PointValue.java:413)
at org.neo4j.values.virtual.MapValue$MapWrappingMapValue.foreach(MapValue.java:127)
at org.neo4j.values.storable.PointValue.fromMap(PointValue.java:413)
at streams.utils.StreamsTransactionEventDeserializer.convertPoints(JSONUtils.kt:239)
at streams.utils.StreamsTransactionEventDeserializer.deserialize(JSONUtils.kt:226)
at streams.utils.StreamsTransactionNodeEventDeserializer.deserialize(JSONUtils.kt:200)
at streams.utils.StreamsTransactionNodeEventDeserializer.deserialize(JSONUtils.kt:181)
at com.fasterxml.jackson.databind.ObjectMapper._convert(ObjectMapper.java:4444)
at com.fasterxml.jackson.databind.ObjectMapper.convertValue(ObjectMapper.java:4390)
at streams.utils.JSONUtils.asStreamsTransactionEvent(JSONUtils.kt:388)
at streams.utils.SchemaUtils.toStreamsTransactionEvent(SchemaUtils.kt:36)
at streams.service.sink.strategy.SourceIdIngestionStrategy.mergeNodeEvents(SourceIdIngestionStrategy.kt:72)
.......
Maybe in the source side, if the points are in 2D, the Kafka payload should not include height field.
Expected Behavior (Mandatory)
Nodes that has a field whose value is 2D point, can be successfully sinked to another Neo4j instance
Actual Behavior (Mandatory)
Currently, in the sink instance, it will throw this kind of exceptions when it handle kafka event that contains 2D point
to missing (therefore NULL) value for creator parameter start which is a non-nullable type
at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: streams.events.RelationshipPayload["start"]), key="431-0", value={"meta":{"timestamp":1703232298105,"username":"neo4j","txId":431,"txEventId":0,"txEventsCount":1,"operation":"created","source":{"hostname":"dhcp-9-245-199-179.e2y-cn.ibmmobiledemo.com"}},"payload":{", executingClass=class streams.kafka.KafkaAutoCommitEventConsumer)
at streams.service.errors.KafkaErrorService.report(KafkaErrorService.kt:37) ~[neo4j-streams-local-4.1.3.jar:?]
at streams.kafka.KafkaAutoCommitEventConsumer.executeAction(KafkaAutoCommitEventConsumer.kt:98) ~[neo4j-streams-local-4.1.3.jar:?]
at streams.kafka.KafkaAutoCommitEventConsumer.readSimple(KafkaAutoCommitEventConsumer.kt:89) ~[neo4j-streams-local-4.1.3.jar:?]
at streams.kafka.KafkaAutoCommitEventConsumer.read(KafkaAutoCommitEventConsumer.kt:119) ~[neo4j-streams-local-4.1.3.jar:?]
at streams.kafka.KafkaEventSink$createJob$1.invokeSuspend(KafkaEventSink.kt:163) [neo4j-streams-local-4.1.3.jar:?]
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) [neo4j-streams-local-4.1.3.jar:?]
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108) [neo4j-streams-local-4.1.3.jar:?]
at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:115) [neo4j-streams-local-4.1.3.jar:?]
at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:103) [neo4j-streams-local-4.1.3.jar:?]
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584) [neo4j-streams-local-4.1.3.jar:?]
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793) [neo4j-streams-local-4.1.3.jar:?]
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697) [neo4j-streams-local-4.1.3.jar:?]
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684) [neo4j-streams-local-4.1.3.jar:?]
The root cause of this error is
2023-12-22 07:58:36.476+0000 INFO [neo4j/2b281484] [Sink] Registering the Streams Sink procedures
org.neo4j.exceptions.InvalidArgumentException: Cannot assign NO_VALUE to field height
at org.neo4j.values.storable.PointValue$PointBuilder.assignFloatingPoint(PointValue.java:667)
at org.neo4j.values.storable.PointValue$PointBuilder.assign(PointValue.java:610)
at org.neo4j.values.storable.PointValue.lambda$fromMap$0(PointValue.java:413)
at org.neo4j.values.virtual.MapValue$MapWrappingMapValue.foreach(MapValue.java:127)
at org.neo4j.values.storable.PointValue.fromMap(PointValue.java:413)
at streams.utils.StreamsTransactionEventDeserializer.convertPoints(JSONUtils.kt:239)
at streams.utils.StreamsTransactionEventDeserializer.deserialize(JSONUtils.kt:226)
at streams.utils.StreamsTransactionNodeEventDeserializer.deserialize(JSONUtils.kt:200)
at streams.utils.StreamsTransactionNodeEventDeserializer.deserialize(JSONUtils.kt:181)
at com.fasterxml.jackson.databind.ObjectMapper._convert(ObjectMapper.java:4444)
at com.fasterxml.jackson.databind.ObjectMapper.convertValue(ObjectMapper.java:4390)
at streams.utils.JSONUtils.asStreamsTransactionEvent(JSONUtils.kt:388)
at streams.utils.SchemaUtils.toStreamsTransactionEvent(SchemaUtils.kt:36)
at streams.service.sink.strategy.SourceIdIngestionStrategy.mergeNodeEvents(SourceIdIngestionStrategy.kt:72)
at streams.service.StreamsSinkService.writeWithStrategy(StreamsSinkService.kt:32)
at streams.service.StreamsSinkService.writeForTopic(StreamsSinkService.kt:40)
at streams.kafka.KafkaEventSink$createJob$1$timeMillis$1.invoke(KafkaEventSink.kt:167)
at streams.kafka.KafkaEventSink$createJob$1$timeMillis$1.invoke(KafkaEventSink.kt:163)
at streams.kafka.KafkaAutoCommitEventConsumer.executeAction(KafkaAutoCommitEventConsumer.kt:95)
at streams.kafka.KafkaAutoCommitEventConsumer.readSimple(KafkaAutoCommitEventConsumer.kt:89)
at streams.kafka.KafkaAutoCommitEventConsumer.read(KafkaAutoCommitEventConsumer.kt:119)
at streams.kafka.KafkaEventSink$createJob$1.invokeSuspend(KafkaEventSink.kt:163)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:115)
at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:103)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)
Currently used versions
Versions
- OS:
- Neo4j: 4.4.28
- Neo4j-Streams: 4.1.3
yhhongyang commented
Possible solution:
Maybe add more data structures for PointValue
data class StreamsPointCartesian(override val crs: String, val x: Double, val y: Double): StreamsPoint()
data class StreamsPointCartesian3D(override val crs: String, val x: Double, val y: Double, val z: Double): StreamsPoint()
data class StreamsPointWgs(override val crs: String, val latitude: Double, val longitude: Double): StreamsPoint()
data class StreamsPointWgs3D(override val crs: String, val latitude: Double, val longitude: Double, val height: Double): StreamsPoint()