confluentinc/kafka-connect-storage-cloud

Prevent clearing topic-partitions that are still assigned during a rebalance

SatyaKuppam opened this issue · 4 comments

Problem

To decrease the impact of rebalances during rolling bounces of k8s pods, we changed the partition.assignment.strategy from the default RangeAssignor to CooperativeStickyAssignor. After this change we encountered NPEs and the S3SinkTask goes into an unrecoverable state. We did not find the same issue with StickyAssignor however.

Example of an NPE (this is with v10.0.7):

"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask\
      \ due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)\n\
      \tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)\n\
      \tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\
      \tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\
      \tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\
      \tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\n\t\
      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\
      \tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\t\
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\
      \tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\
      \tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.lang.NullPointerException\n\
      \tat io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)\n\
      \t... 10 more\n"

Possible Fix in #648

@pbadani looping you on this, since you are the last committed user.

BDeus commented

Same for me, had to revert to previous Assignor.
Furthermore CooperativeStickyAssignor will became the defaut assignor one day (KIP about it https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248)

Same for me, had to revert to previous Assignor.
Furthermore CooperativeStickyAssignor will became the defaut assignor one day (KIP about it https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248)

Hey @BDeus, thanks for confirming. We are using the patch provided in this PR and it working fine with CooperativeStickyAssignor, we haven't seen issues as yet.