snowflakedb/snowflake-kafka-connector

SCHEMA EVOLUTION: Converting a String to Number Column

Closed this issue · 3 comments

Hi,

I'm explicitly making a Date fields in my JDBC connector to be loaded as STRING in Topics. When using Snowflake Sink connector with schema evolution mode, it is creating Table for me that is different from what is specified in Topics. It is converting that String field into a Number (original value is a DATE), so when DATE gets inserted to the NUMBER field, it fails! Any suggestions?

ERROR LOG

        due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:624)\n\tat
        org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:342)\n\tat
        org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:242)\n\tat
        org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:211)\n\tat
        org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat
        org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat
        org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)\n\tat
        java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat
        java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat
        java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat
        java.base/java.lang.Thread.run(Thread.java:840)\nCaused by: org.apache.kafka.connect.errors.DataException:
        Error inserting Records using Streaming API with msg:The given row cannot
        be converted to the internal format due to invalid value: Value cannot be
        ingested into Snowflake column DATECOLUMNASSTRING of type NUMBER, Row Index: 0,
        reason: Not a valid number\n\tat com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.handleInsertRowsFailures(TopicPartitionChannel.java:744)\n\tat
        com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertBufferedRecords(TopicPartitionChannel.java:512)\n\tat
        com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertRecordToBuffer(TopicPartitionChannel.java:338)\n\tat
        com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.insert(SnowflakeSinkServiceV2.java:299)\n\tat
        com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.insert(SnowflakeSinkServiceV2.java:267)\n\tat
        com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:301)\n\tat
        org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:593)\n\t...
        11 more\nCaused by: net.snowflake.ingest.utils.SFException: The given row
        cannot be converted to the internal format due to invalid value: Value cannot
        be ingested into Snowflake column DATECOLUMNASSTRING of type NUMBER, Row Index:
        0, reason: Not a valid number\n\tat net.snowflake.ingest.streaming.internal.DataValidationUtil.valueFormatNotAllowedException(DataValidationUtil.java:884)\n\tat
        net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseBigDecimal(DataValidationUtil.java:557)\n\tat
        net.snowflake.ingest.streaming.internal.ParquetValueParser.getSb16Value(ParquetValueParser.java:327)\n\tat
        net.snowflake.ingest.streaming.internal.ParquetValueParser.parseColumnValueToParquet(ParquetValueParser.java:163)\n\tat
        net.snowflake.ingest.streaming.internal.ParquetRowBuffer.addRow(ParquetRowBuffer.java:203)\n\tat
        net.snowflake.ingest.streaming.internal.ParquetRowBuffer.addRow(ParquetRowBuffer.java:148)\n\tat
        net.snowflake.ingest.streaming.internal.AbstractRowBuffer.insertRows(AbstractRowBuffer.java:306)\n\tat
        net.snowflake.ingest.streaming.internal.ParquetRowBuffer.insertRows(ParquetRowBuffer.java:35)\n\tat
        net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestChannelInternal.insertRows(SnowflakeStreamingIngestChannelInternal.java:362)\n\tat
        net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestChannelInternal.insertRow(SnowflakeStreamingIngestChannelInternal.java:328)\n\tat
        com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel$InsertRowsApiResponseSupplier.get(TopicPartitionChannel.java:628)\n\tat
        com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel$InsertRowsApiResponseSupplier.get(TopicPartitionChannel.java:580)\n\tat
        dev.failsafe.Functions.lambda$toCtxSupplier$11(Functions.java:243)\n\tat dev.failsafe.Functions.lambda$get$0(Functions.java:46)\n\tat
        dev.failsafe.internal.FallbackExecutor.lambda$apply$0(FallbackExecutor.java:51)\n\tat
        dev.failsafe.SyncExecutionImpl.executeSync(SyncExecutionImpl.java:182)\n\tat
        dev.failsafe.FailsafeExecutor.call(FailsafeExecutor.java:438)\n\tat dev.failsafe.FailsafeExecutor.get(FailsafeExecutor.java:115)\n\tat
        com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertRowsWithFallback(TopicPartitionChannel.java:574)\n\tat
        com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertBufferedRecords(TopicPartitionChannel.java:500)\n\t...
        16 more\n"```

@sfc-gh-tzhang hi, can you comment on this

@imrohankataria Are you using JSON or AVRO? And not sure if I fully understand your question, do you mean the field has a type of String but schema evolution creates a column as number?

Closing due to inactivity