SCHEMA EVOLUTION: Converting a String to Number Column
Closed this issue · 3 comments
imrohankataria commented
Hi,
I'm explicitly making a Date fields in my JDBC connector to be loaded as STRING in Topics. When using Snowflake Sink connector with schema evolution mode, it is creating Table for me that is different from what is specified in Topics. It is converting that String field into a Number (original value is a DATE), so when DATE gets inserted to the NUMBER field, it fails! Any suggestions?
ERROR LOG
due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:624)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:342)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:242)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:211)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)\n\tat
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat
java.base/java.lang.Thread.run(Thread.java:840)\nCaused by: org.apache.kafka.connect.errors.DataException:
Error inserting Records using Streaming API with msg:The given row cannot
be converted to the internal format due to invalid value: Value cannot be
ingested into Snowflake column DATECOLUMNASSTRING of type NUMBER, Row Index: 0,
reason: Not a valid number\n\tat com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.handleInsertRowsFailures(TopicPartitionChannel.java:744)\n\tat
com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertBufferedRecords(TopicPartitionChannel.java:512)\n\tat
com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertRecordToBuffer(TopicPartitionChannel.java:338)\n\tat
com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.insert(SnowflakeSinkServiceV2.java:299)\n\tat
com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.insert(SnowflakeSinkServiceV2.java:267)\n\tat
com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:301)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:593)\n\t...
11 more\nCaused by: net.snowflake.ingest.utils.SFException: The given row
cannot be converted to the internal format due to invalid value: Value cannot
be ingested into Snowflake column DATECOLUMNASSTRING of type NUMBER, Row Index:
0, reason: Not a valid number\n\tat net.snowflake.ingest.streaming.internal.DataValidationUtil.valueFormatNotAllowedException(DataValidationUtil.java:884)\n\tat
net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseBigDecimal(DataValidationUtil.java:557)\n\tat
net.snowflake.ingest.streaming.internal.ParquetValueParser.getSb16Value(ParquetValueParser.java:327)\n\tat
net.snowflake.ingest.streaming.internal.ParquetValueParser.parseColumnValueToParquet(ParquetValueParser.java:163)\n\tat
net.snowflake.ingest.streaming.internal.ParquetRowBuffer.addRow(ParquetRowBuffer.java:203)\n\tat
net.snowflake.ingest.streaming.internal.ParquetRowBuffer.addRow(ParquetRowBuffer.java:148)\n\tat
net.snowflake.ingest.streaming.internal.AbstractRowBuffer.insertRows(AbstractRowBuffer.java:306)\n\tat
net.snowflake.ingest.streaming.internal.ParquetRowBuffer.insertRows(ParquetRowBuffer.java:35)\n\tat
net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestChannelInternal.insertRows(SnowflakeStreamingIngestChannelInternal.java:362)\n\tat
net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestChannelInternal.insertRow(SnowflakeStreamingIngestChannelInternal.java:328)\n\tat
com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel$InsertRowsApiResponseSupplier.get(TopicPartitionChannel.java:628)\n\tat
com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel$InsertRowsApiResponseSupplier.get(TopicPartitionChannel.java:580)\n\tat
dev.failsafe.Functions.lambda$toCtxSupplier$11(Functions.java:243)\n\tat dev.failsafe.Functions.lambda$get$0(Functions.java:46)\n\tat
dev.failsafe.internal.FallbackExecutor.lambda$apply$0(FallbackExecutor.java:51)\n\tat
dev.failsafe.SyncExecutionImpl.executeSync(SyncExecutionImpl.java:182)\n\tat
dev.failsafe.FailsafeExecutor.call(FailsafeExecutor.java:438)\n\tat dev.failsafe.FailsafeExecutor.get(FailsafeExecutor.java:115)\n\tat
com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertRowsWithFallback(TopicPartitionChannel.java:574)\n\tat
com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertBufferedRecords(TopicPartitionChannel.java:500)\n\t...
16 more\n"```
imrohankataria commented
@sfc-gh-tzhang hi, can you comment on this
sfc-gh-tzhang commented
@imrohankataria Are you using JSON or AVRO? And not sure if I fully understand your question, do you mean the field has a type of String but schema evolution creates a column as number?
sfc-gh-akowalczyk commented
Closing due to inactivity