apache/seatunnel

[Bug] [Source Oracle-CDC Sink Starrocks] Starrocks已经读到行了,但是读不到Oracel cdc的数据

Closed this issue · 1 comments

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

image
cdc的行已经读到了,但数据都是null,导致写不进starrocks

SeaTunnel Version

2.3.8

SeaTunnel Config

{
    "sink" : [
        {
            "base-url" : "jdbc:mysql://ip:19030",
            "enable_upsert_delete" : true,
            "password" : "******",
            "database" : "tmp",
            "batch_max_rows" : 1024,
            "starrocks.config" : {
                "format" : "JSON",
                "strip_outer_array" : true
            },
            "data_save_mode" : "DROP_DATA",
            "nodeUrls" : [
                "ip:8030"
            ],
            "plugin_name" : "StarRocks",
            "table" : "tmp_customers",
            "username" : "******"
        }
    ],
    "source" : [
        {
            "base-url" : "jdbc:oracle:thin:@ip:11521:HELOWIN",
            "password" : "******",
            "startup.mode" : "initial",
            "table-names" : [
                "HELOWIN.FLINKUSER.CUSTOMERS"
            ],
            "database-names" : [
                "HELOWIN"
            ],
            "schema-names" : [
                "FLINKUSER"
            ],
            "result_table_name" : "customers",
            "source.reader.close.timeout" : 120000,
            "plugin_name" : "Oracle-CDC",
            "username" : "******"
        }
    ],
    "env" : {
        "job.mode" : "STREAMING",
        "parallelism" : 1,
        "checkpoint.interval" : 3000
    }
}

Running Command

/data/apache-seatunnel-2.3.8/bin/seatunnel.sh -c /data/realtime/2024/12/13/1f79d5f49a44438a88ebc3b4a040fbcf.json

Error Exception

org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException: ErrorCode:[STARROCKS-01], ErrorDescription:[Flush batch data to sink connector failed] - Failed to flush data to StarRocks 
tmp/tmp_customers
too many filtered rows
Error: NULL value in non-nullable column 'customer_id'. Row: [NULL, NULL, NULL, NULL, 0]
Error: NULL value in non-nullable column 'customer_id'. Row: [NULL, NULL, NULL, NULL, 0]
Error: NULL value in non-nullable column 'customer_id'. Row: [NULL, NULL, NULL, NULL, 0]
Error: NULL value in non-nullable column 'customer_id'. Row: [NULL, NULL, NULL, NULL, 0]
Error: NULL value in non-nullable column 'customer_id'. Row: [NULL, NULL, NULL, NULL, 0]


        at org.apache.seatunnel.connectors.seatunnel.starrocks.client.StarRocksStreamLoadVisitor.doStreamLoad(StarRocksStreamLoadVisitor.java:130) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.starrocks.client.StarRocksSinkManager.flush(StarRocksSinkManager.java:85) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.starrocks.client.StarRocksSinkManager.close(StarRocksSinkManager.java:72) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.starrocks.sink.StarRocksSinkWriter.close(StarRocksSinkWriter.java:83) ~[?:?]
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.close(SinkFlowLifeCycle.java:154) ~[seatunnel-starter.jar:2.3.8]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$close$5(SeaTunnelTask.java:330) ~[seatunnel-starter.jar:2.3.8]
        at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) ~[?:?]
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) ~[?:?]
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
        at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290) ~[?:?]
        at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746) ~[?:?]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) ~[?:?]
        at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) ~[?:?]
        at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) ~[?:?]
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) ~[?:?]
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) ~[?:?]

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

解决了,两边字段名大小写不一致