snowflakedb/snowflake-kafka-connector

SNOW-989387 Connectors errored out after updating to v2.1.2

Closed this issue ยท 18 comments

Updated to v2.1.2 from v2.1.0 and connectors are erroring out with following error:

com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException: [SF_KAFKA_CONNECTOR] Exception: Failure in Streaming Channel Offset Migration Response
Error Code: 5023
Detail: Streaming Channel Offset Migration from Source to Destination Channel has no/invalid response, please contact Snowflake Support
Message: Migrating OffsetToken for a SourceChannel:MyEvent_MyEvent_0 in table:MY_DB.MY_SCHEMA.MY_TABLE failed due to exceptionMessage:JDBC driver internal error: exception creating result java.lang.NoClassDefFoundError: Could not initialize class net.snowflake.client.jdbc.internal.apache.arrow.memory.RootAllocator at net.snowflake.client.jdbc.SnowflakeResultSetSerializableV1.create(SnowflakeResultSetSerializableV1.java:586). and stackTrace:[net.snowflake.client.jdbc.SnowflakeStatementV1.executeQueryInternal(SnowflakeStatementV1.java:268), net.snowflake.client.jdbc.SnowflakePreparedStatementV1.executeQuery(SnowflakePreparedStatementV1.java:117), com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.migrateStreamingChannelOffsetToken(SnowflakeConnectionServiceV1.java:1035), com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.<init>(TopicPartitionChannel.java:287), com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.createStreamingChannelForTopicPartition(SnowflakeSinkServiceV2.java:254), com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.lambda$startPartitions$1(SnowflakeSinkServiceV2.java:222), java.base/java.lang.Iterable.forEach(Iterable.java:75), com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.startPartitions(SnowflakeSinkServiceV2.java:217), com.snowflake.kafka.connector.SnowflakeSinkTask.open(SnowflakeSinkTask.java:259), org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:644), org.apache.kafka.connect.runtime.WorkerSinkTask.access$1200(WorkerSinkTask.java:73), org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:741), org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:324), org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:473), org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478), org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389), org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:559), org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1288), org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1247), org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227), org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:479), org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:331), org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237), org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206), org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204), org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259), org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181), java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539), java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264), java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136), java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635), java.base/java.lang.Thread.run(Thread.java:833)]
	at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:367)
	at com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.migrateStreamingChannelOffsetToken(SnowflakeConnectionServiceV1.java:1077)
	at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.<init>(TopicPartitionChannel.java:287)
	at com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.createStreamingChannelForTopicPartition(SnowflakeSinkServiceV2.java:254)
	at com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.lambda$startPartitions$1(SnowflakeSinkServiceV2.java:222)
	at java.base/java.lang.Iterable.forEach(Iterable.java:75)
	at com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.startPartitions(SnowflakeSinkServiceV2.java:217)
	at com.snowflake.kafka.connector.SnowflakeSinkTask.open(SnowflakeSinkTask.java:259)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:644)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1200(WorkerSinkTask.java:73)
	at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:741)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:324)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:473)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:559)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1288)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1247)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:479)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:331)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Hi @cchandurkar,

That issue is related to a JDK 21 compatibility problem that was only fixed in the JDBC driver version 3.14.4. Please refer to the JDBC issue 1512.

The only way around this for now is to point to another JDK version lower than 21.

Thanks for prompt response @sfc-gh-wfateem

I can confirm that I'm on openjdk 17 and not 21.

openjdk 17.0.8 2023-07-18 LTS
OpenJDK Runtime Environment (Red_Hat-17.0.8.0.7-1) (build 17.0.8+7-LTS)
OpenJDK 64-Bit Server VM (Red_Hat-17.0.8.0.7-1) (build 17.0.8+7-LTS, mixed mode, sharing)

Looks like I might have skimmed your error stack too quickly. That's a slightly different issue:
JDBC Driver Compatibility Issue With JDK 16 and Later: Exception creating result java.lang.ExceptionInInitializerError at net.snowflake.client.jdbc.internal.apache.arrow.memory.UnsafeAllocationManager

But I would have expected the same issue in v2.1.0. I'll need to look into this a bit more closely, but in the meantime, can you try doing the following and restart your connector in case you're running that on a standalone Kafka Connect server, or restart your Kafka Connect cluster if that's running in distributed mode?
export KAFKA_OPTS='--add-opens java.base/java.nio=ALL-UNNAMED $KAFKA_OPTS'

@cchandurkar that won't work. We basically need to get that into the kafka-run-class.sh script positioned right after the Java command, so a workaround is to do this instead:
export KAFKA_HEAP_OPTS='--add-opens java.base/java.nio=ALL-UNNAMED'

That at least works for me on an Apache Kafka 3.4 environment. I don't believe that script changed a whole lot between versions.

This is caused by the changes in #750 to address a significant issue. It requires submitting a query using the JDBC driver and process the results which causes this problem.
More information on that issue can be found here:
https://community.snowflake.com/s/article/JDBC-Driver-Compatibility-Issue-With-JDK-16-and-Later

Workaround for JDK 16 is:
export KAFKA_OPTS='-Djdk.module.illegalAccess=permit $KAFKA_OPTS'

Workaround for JDK 17 and higher:
export KAFKA_HEAP_OPTS='--add-opens java.base/java.nio=ALL-UNNAMED $KAFKA_HEAP_OPTS'

@sfc-gh-wfateem hey thanks. That worked ๐Ÿ™Œ๐Ÿผ

Hey @cchandurkar,
Glad that worked out for you.
I'm just going to reopen the case so I can see if there's a bit more we can do to handle this gracefully. I imagine others are going to run into this problem.

Hi @sfc-gh-wfateem ,
we updated the Snowflake Connector from version 2.0.1 to version 2.1.2, and we are also experiencing a similar issue. However, we are using OpenJDK 11 (Kafka Connect Docker image - confluentinc/cp-kafka-connect:7.4.0.amd64). In our case, a restart of the failed process helps.

Error log:

com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException: [SF_KAFKA_CONNECTOR] Exception: Failure in Streaming Channel Offset Migration Response
Error Code: 5023
Detail: Streaming Channel Offset Migration from Source to Destination Channel has no/invalid response, please contact Snowflake Support
Message: Snowflake experienced a transient exception, please retry the migration request.
	at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:367)
	at com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.migrateStreamingChannelOffsetToken(SnowflakeConnectionServiceV1.java:1062)
	at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.<init>(TopicPartitionChannel.java:287)
	at com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.createStreamingChannelForTopicPartition(SnowflakeSinkServiceV2.java:254)
	at com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.lambda$startPartitions$1(SnowflakeSinkServiceV2.java:222)
	at java.base/java.lang.Iterable.forEach(Iterable.java:75)
	at com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.startPartitions(SnowflakeSinkServiceV2.java:217)
	at com.snowflake.kafka.connector.SnowflakeSinkTask.open(SnowflakeSinkTask.java:259)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:644)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1200(WorkerSinkTask.java:73)
	at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:741)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:322)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:471)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:474)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:385)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:557)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1272)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1236)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:479)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:331)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Open JDK version:

openjdk version "11.0.18" 2023-01-17 LTS
OpenJDK Runtime Environment Zulu11.62+17-CA (build 11.0.18+10-LTS)
OpenJDK 64-Bit Server VM Zulu11.62+17-CA (build 11.0.18+10-LTS, mixed mode)

Is there a similar fix for OpenJDK version 11 as there is for versions 16 and 17+?

@lsimac you're experiencing a different issue. The change causing this is the same, but your problem is unrelated to the JDK. The Kafka Connector is running a SQL statement and that looks like it might have failed.
Snowflake experienced a transient exception, please retry the migration request.
Which might explain why it resolved when you restarted the process.

Is it resolved once and for all, or are you continuously running into this problem? If it's the latter, then can I ask you to open a support ticket with us so we can take a closer look? We'll need more information from you to be able to understand what happened in your particular scenario.

@sfc-gh-wfateem , Thank you for the support. Currently, it seems like the issue is resolved permanently. However, we are in the process of migrating our connectors to Snowpipe Streaming, so we will monitor them to see if the issue reappears. If it does, we will open a support ticket.

Howdy --- We are using the managed Confluent Cloud connector, and seem to be having the exact same issues as detailed above. We have an open support issue with Confluent at the moment.

Version is 2.1.2 as well.

Any suggestions?

Hey @devin-hc,
There are two issues that have been raised here. The main one that's being addressed here is JDK related:
Message: Migrating OffsetToken for a SourceChannel:MyEvent_MyEvent_0 in table:MY_DB.MY_SCHEMA.MY_TABLE failed due to exceptionMessage:JDBC driver internal error: exception creating result java.lang.NoClassDefFoundError: Could not initialize class net.snowflake.client.jdbc.internal.apache.arrow.memory.RootAllocator

The other one is going to be situational for every account and might be transient:
Message: Snowflake experienced a transient exception, please retry the migration request.

If you're using Confluent Cloud then I doubt you're running into the first issue, because I believe that's using a JDK 11 environment, so you wouldn't run into the first problem.

Do you have the exact error you're running into?

Hi @sfc-gh-wfateem Thank you for the response. Since Friday (when I think Confluent upgraded to 2.1.2) we've been getting a variety of errors:

"message": "[SF_KAFKA_CONNECTOR] Exception: Failure in Streaming Channel Offset Migration Response
Error Code: 5023
Detail: Streaming Channel Offset Migration from Source to Destination Channel has no/invalid response, please contact Snowflake Support
Message: Snowflake experienced a transient exception, please retry the migration request.",

And

        "message": "Open channel request failed: HTTP Status: 400 ErrorBody: 
        {\n  \"status_code\" : 8,\n  \"message\" : \"The supplied database does not exist or is not authorized.\"\n}.",
        "rootCause": "HTTP Status: 400 ErrorBody: 
        {\n  \"status_code\" : 8,\n  \"message\" : \"The supplied database does not exist or is not authorized.\"\

Being the two recurring ones. Data is still flowing in, just some of the tasks will sit in a failure state and backlogged messages will sometimes build up.

Confluent support has hooked in their engineering folks, so that's a good sign.

FWIW, on the Snowflake side, we haven't changed permissioning, roles, etc. Also confirmed/double checked the applicable network policies/rules. Problem isn't there.

@devin-hc the first issue might have been a transient problem, it just depends if that's something you consistently run into or if it was just a one time problem. It's something we would have to look into for you.

The second problem is highly likely a problem with the connector that we addressed recently with PR #744. As a workaround you can set the following Kafka connector configuration:
enable.streaming.client.optimization=false

The fix should be included in the next release which I believe will be sometime in January.

@devin-hc the first issue might have been a transient problem, it just depends if that's something you consistently run into or if it was just a one time problem. It's something we would have to look into for you.

The second problem is highly likely a problem with the connector that we addressed recently with PR #744. As a workaround you can set the following Kafka connector configuration: enable.streaming.client.optimization=false

The fix should be included in the next release which I believe will be sometime in January.

Thank you for the followup/info, @sfc-gh-wfateem !

I'll add your note about the streaming.client.optimization param to our open Confluent ticket.

Should I open a Snowflake support case about the first issue? Both are popping up pretty consistently now.

@devin-hc yes, if that first error is consistently occurring for you then please go ahead and open a case with us.
Please mention the Snowflake account, and when providing the error, please mention the timestamp and what timezone that's in.

@sfc-gh-wfateem

Thank you again for the help with this. Our issue is resolved (including 400s) without looping Snowflake support.
I pointed Confluent's CS folks at this thread/your replies. From them (after I asked what the fix was):

Thanks for the confirmation and your patience on this.
 
We override the config (enable.streaming.client.optimization=false) to temporarily workaround the issue due to the code freeze for the holidays. 
 
We will deploy a fix on this after the code freeze. Please let me know if you have further questions. 

Appreciate the help!

This is fixed by #774 which has been merged.
I'll close the issue now, feel free to reopen this if you have any questions or issues.