memiiso/debezium-server-iceberg

Error while running the application

kowshikdutta opened this issue · 5 comments

Hi,
I am facing the below error while running the applicaiton.

Source: MariaDB
Sink: AWS Glue Catalog and S3
Java: V17

Attached is the "application.properties" file

Tables are getting created in "default" database in Glue. But then the app is crashing due to below error

ERROR:
application.properties.pdf
application.properties.pdf
application.properties.pdf

2024-05-31 12:28:22,710 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'java.lang.NullPointerException: Cannot invoke "com.fasterxml.jackson.databind.JsonNode.asLong(long)" because the return value of "com.fasterxml.jackson.databind.JsonNode.get(String)" is null', error = 'java.lang.NullPointerException: Cannot invoke "com.fasterxml.jackson.databind.JsonNode.asLong(long)" because the return value of "com.fasterxml.jackson.databind.JsonNode.get(String)" is null': java.lang.NullPointerException: Cannot invoke "com.fasterxml.jackson.databind.JsonNode.asLong(long)" because the return value of "com.fasterxml.jackson.databind.JsonNode.get(String)" is null
at io.debezium.server.iceberg.tableoperator.IcebergTableOperator.compareByTsThenOp(IcebergTableOperator.java:89)
at io.debezium.server.iceberg.tableoperator.IcebergTableOperator.lambda$deduplicateBatch$0(IcebergTableOperator.java:63)
at java.base/java.util.concurrent.ConcurrentHashMap.merge(ConcurrentHashMap.java:2056)
at io.debezium.server.iceberg.tableoperator.IcebergTableOperator.lambda$deduplicateBatch$1(IcebergTableOperator.java:62)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at io.debezium.server.iceberg.tableoperator.IcebergTableOperator.deduplicateBatch(IcebergTableOperator.java:60)
at io.debezium.server.iceberg.tableoperator.IcebergTableOperator.addToTable(IcebergTableOperator.java:141)
at io.debezium.server.iceberg.IcebergChangeConsumer.handleBatch(IcebergChangeConsumer.java:167)
at io.debezium.embedded.async.ParallelSmtAndConvertBatchProcessor.processRecords(ParallelSmtAndConvertBatchProcessor.java:56)
at io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords.doCall(AsyncEmbeddedEngine.java:1157)
at io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords.doCall(AsyncEmbeddedEngine.java:1138)
at io.debezium.embedded.async.RetryingCallable.call(RetryingCallable.java:47)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)

2024-05-31 12:28:22,711 DEBUG [io.qua.run.Application] (main) Stopping application
2024-05-31 12:28:22,711 DEBUG [io.qua.run.shu.ShutdownRecorder] (main) Attempting to gracefully shutdown.
2024-05-31 12:28:22,728 INFO [io.deb.ser.DebeziumServer] (main) Received request to stop the engine
2024-05-31 12:28:22,728 DEBUG [io.deb.emb.asy.AsyncEmbeddedEngine] (main) Engine shutdown called.
2024-05-31 12:28:22,728 ERROR [io.deb.ser.DebeziumServer] (main) Exception while shuttting down Debezium [Error Occurred After Shutdown]: java.lang.IllegalStateException: Engine has been already shut down.
at io.debezium.embedded.async.AsyncEmbeddedEngine.close(AsyncEmbeddedEngine.java:249)
at io.debezium.server.DebeziumServer.stop(DebeziumServer.java:249)
at io.debezium.server.DebeziumServer_Observer_stop_yMpCZhmgvv79zBZyHwCy4l6x-EI.notify(Unknown Source)
at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:351)
at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:333)
at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:80)
at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:155)
at io.quarkus.arc.runtime.ArcRecorder$2.run(ArcRecorder.java:111)
at io.quarkus.runtime.StartupContext.runAllInReverseOrder(StartupContext.java:84)
at io.quarkus.runtime.StartupContext.close(StartupContext.java:73)
at io.quarkus.runner.ApplicationImpl.doStop(Unknown Source)
at io.quarkus.runtime.Application.stop(Application.java:208)
at io.quarkus.runtime.Application.stop(Application.java:155)
at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:228)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
at io.debezium.server.Main.main(Main.java:15)

its failing here while cheeking __source_ts_ms value for de-duplication.

do you have sample events? somehow events are generated with NULL __source_ts_ms values. you can see them in debug mode.

does it works when you run it without upsert mode? debezium.sink.iceberg.upsert=false

Hi,
I did update flag "debezium.sink.iceberg.upsert=false" and the application started working. I could see duplicate records coming up with before and after image. Please check the attachment. For "winyear" = 2008, table is showing both before and after image. I would like to see only the latest image. Please can you suggest how to turn on that flag without causing the application to fail.

Also, I have attached a sample set of events data from a final glue table. There are lot of records where "__source_ts_ms" is coming as NULL. They appear to be snapshots. Please can you suggest as how to avoid these.

Regards,
Kowshik
sampledata.xlsx

@kowshikdutta fist section of the records (first 22) is not unwrapped data. that's why its created with null __source_ts_ms

could it be that debezium.transforms=unwrap was not working before?

in both data, you can see that source.ts_ms is populated

Hi,
Application is failing when I set 'debezium.sink.iceberg.upsert=true'. Without this, I could see the updated records are showing up with the original record. So finally it's coming with duplicates. Please can you advise as
Uploading Screenshot 2024-06-07 at 6.09.19 PM.png…
how to make the application run with the above flag set to 'debezium.sink.iceberg.upsert=true'.

@kowshikdutta could you share the last error log? and config? and you can see working example here, just leaving here for a reference