confluentinc/kafka-connect-storage-cloud

Kafka S3 Connector immediatelly aborts the MultipartUpload

martinbolco opened this issue · 1 comments

I installed Kafka-Connect with S3 connector plugin on OpenShift cluster. The pod is running and initial request to get bucket ACL is successful. When we produce the message to the topic it is not uploaded to the bucket but ends with org.apache.kafka.connect.errors.ConnectException: Part upload failed

connector configuration:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: "lkh-minio-connector"
  namespace: "openshift-operators"
  labels:
    strimzi.io/cluster: kafka-connect-cluster
spec:
  class: io.confluent.connect.s3.S3SinkConnector
  config:
    connector.class: io.confluent.connect.s3.S3SinkConnector
    task.max: '1'
    topics: kafka-test-backup-topic2
    s3.region: eu-central
    s3.bucket.name: lkh-kafka-connect-backup-test
    s3.part.size: '5242880'
    flush.size: '1'
    store.url: <link to minio backend url>
    storage.class: io.confluent.connect.s3.storage.S3Storage
    format.class: io.confluent.connect.s3.format.bytearray.ByteArrayFormat
    partitioner.class: io.confluent.connect.storage.partitioner.DefaultPartitioner
    behavior.on.null.values: ignore
    timeone: 'UTC'
    errors.tolerance: all
    errors.log.enable: true
    errors.log.include.messages: true
    aws.access.key.id: ***
    aws.secret.access.key: ***

Kafka-connect logs:

2023-07-17 12:12:00,219 INFO [lkh-minio-connector|task-0] Creating S3 output stream. (io.confluent.connect.s3.storage.S3Storage) [task-thread-lkh-minio-connector-0]
2023-07-17 12:12:00,224 INFO [lkh-minio-connector|task-0] Create S3OutputStream for bucket 'lkh-kafka-connect-backup-test' key 'topics/kafka-test-backup-topic2/partition=0/kafka-test-backup-topic2+0+0000000000.bin' (io.confluent.connect.s3.storage.S3OutputStream) [task-thread-lkh-minio-connector-0]
2023-07-17 12:12:00,226 INFO [lkh-minio-connector|task-0] Starting commit and rotation for topic partition kafka-test-backup-topic2-0 with start offset {partition=0=0} (io.confluent.connect.s3.TopicPartitionWriter) [task-thread-lkh-minio-connector-0]
2023-07-17 12:12:00,361 WARN [lkh-minio-connector|task-0] Aborting multi-part upload with id 'MmVlNDRlMGItNzkwOC00YWU1LTk2ZGEtN2NiYmM2YWY0YzVhLjU3MmEyZTU3LTBjN2MtNGQyYi1hZDU1LTg0YjlmMTNmNGNhMA' (io.confluent.connect.s3.storage.S3OutputStream) [task-thread-lkh-minio-connector-0]
2023-07-17 12:12:00,385 ERROR [lkh-minio-connector|task-0] Multipart upload failed to complete for bucket 'lkh-kafka-connect-backup-test' key 'topics/kafka-test-backup-topic2/partition=0/kafka-test-backup-topic2+0+0000000000.bin'. Reason: Part upload failed:  (io.confluent.connect.s3.storage.S3OutputStream) [task-thread-lkh-minio-connector-0]
2023-07-17 12:12:00,387 ERROR [lkh-minio-connector|task-0] WorkerSinkTask{id=lkh-minio-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Part upload failed:  (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-lkh-minio-connector-0]
org.apache.kafka.connect.errors.ConnectException: Part upload failed: 
	at io.confluent.connect.s3.util.S3ErrorUtils.throwConnectException(S3ErrorUtils.java:84)
	at io.confluent.connect.s3.format.S3RetriableRecordWriter.commit(S3RetriableRecordWriter.java:62)
	at java.base/java.util.Optional.ifPresent(Optional.java:178)
	at io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider$1.commit(KeyValueHeaderRecordWriterProvider.java:139)
	at io.confluent.connect.s3.TopicPartitionWriter.commitFile(TopicPartitionWriter.java:669)
	at io.confluent.connect.s3.TopicPartitionWriter.commitFiles(TopicPartitionWriter.java:638)
	at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:278)
	at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:218)
	at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:244)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.IOException: Part upload failed: 
	at io.confluent.connect.s3.storage.S3OutputStream.uploadPart(S3OutputStream.java:151)
	at io.confluent.connect.s3.storage.S3OutputStream.commit(S3OutputStream.java:168)
	at io.confluent.connect.s3.format.bytearray.ByteArrayRecordWriterProvider$1.commit(ByteArrayRecordWriterProvider.java:82)
	at io.confluent.connect.s3.format.S3RetriableRecordWriter.commit(S3RetriableRecordWriter.java:60)
	... 19 more
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: null; S3 Extended Request ID: null; Proxy: 172.18.1.159), S3 Extended Request ID: null
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1879)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1418)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1387)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5456)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5403)
	at com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3887)
	at com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3872)
	at io.confluent.connect.s3.storage.S3OutputStream$MultipartUpload.uploadPart(S3OutputStream.java:278)
	at io.confluent.connect.s3.storage.S3OutputStream.uploadPart(S3OutputStream.java:145)
	... 22 more
2023-07-17 12:12:00,390 ERROR [lkh-minio-connector|task-0] WorkerSinkTask{id=lkh-minio-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-lkh-minio-connector-0]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.connect.errors.ConnectException: Part upload failed: 
	at io.confluent.connect.s3.util.S3ErrorUtils.throwConnectException(S3ErrorUtils.java:84)
	at io.confluent.connect.s3.format.S3RetriableRecordWriter.commit(S3RetriableRecordWriter.java:62)
	at java.base/java.util.Optional.ifPresent(Optional.java:178)
	at io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider$1.commit(KeyValueHeaderRecordWriterProvider.java:139)
	at io.confluent.connect.s3.TopicPartitionWriter.commitFile(TopicPartitionWriter.java:669)
	at io.confluent.connect.s3.TopicPartitionWriter.commitFiles(TopicPartitionWriter.java:638)
	at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:278)
	at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:218)
	at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:244)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
	... 11 more
Caused by: java.io.IOException: Part upload failed: 
	at io.confluent.connect.s3.storage.S3OutputStream.uploadPart(S3OutputStream.java:151)
	at io.confluent.connect.s3.storage.S3OutputStream.commit(S3OutputStream.java:168)
	at io.confluent.connect.s3.format.bytearray.ByteArrayRecordWriterProvider$1.commit(ByteArrayRecordWriterProvider.java:82)
	at io.confluent.connect.s3.format.S3RetriableRecordWriter.commit(S3RetriableRecordWriter.java:60)
	... 19 more
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: null; S3 Extended Request ID: null; Proxy: 172.18.1.159), S3 Extended Request ID: null
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1879)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1418)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1387)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5456)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5403)
	at com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3887)
	at com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3872)
	at io.confluent.connect.s3.storage.S3OutputStream$MultipartUpload.uploadPart(S3OutputStream.java:278)
	at io.confluent.connect.s3.storage.S3OutputStream.uploadPart(S3OutputStream.java:145)
	... 22 more

S3 minio logs:

Initial request to get the bucket ACL

[REQUEST s3.GetBucketACL] [2023-07-17T14:09:53.220] [Client IP: 10.120.12.132]
GET /lkh-kafka-connect-backup-test/?acl
Proto: HTTP/1.1
Host: ***
Content-Length: 0
X-Forwarded-Proto: http
Amz-Sdk-Retry: 0/0/500
User-Agent: APN/1.0 Confluent/1.0 KafkaS3Connector/10.5.1, aws-sdk-java/1.12.268 Linux/4.18.0-305.72.1.el8_4.x86_64 OpenJDK_64-Bit_Server_VM/17.0.6+10-LTS java/17.0.6 scala/2.13.10 kotlin/1.4.10 vendor/Red_Hat,_Inc. cfg/retry-mode/legacy
X-Forwarded-For: 10.120.12.13210.243.0.12
Amz-Sdk-Invocation-Id: 50f92197-ff26-8270-bd6e-e361b04cadb9
Amz-Sdk-Request: attempt=1;max=4
Authorization:***, SignedHeaders=amz-sdk-invocation-id;amz-sdk-request;amz-sdk-retry;content-type;host;user-agent;x-amz-content-sha256;x-amz-date, Signature=f55cdf74581fcca07bac9c27e5ccb3cd582cef17e251f7f81049d6be7001c4bd
Forwarded: for=10.243.0.12;host=***;proto=http
Via: 1.1 sqdeb (squid/3.4.8)
Cache-Control: max-age=0
Content-Type: application/octet-stream
X-Amz-Content-Sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
X-Amz-Date: 20230717T120953Z
X-Forwarded-Host: ***
X-Forwarded-Port: 80
***
[RESPONSE] [2023-07-17T14:09:53.221] [ Duration 561µs  ↑ 239 B  ↓ 309 B ]
200 OK
Vary: Origin,Accept-Encoding
X-Amz-Id-2: dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8
X-Amz-Request-Id: 1772A66C0CECBFE4
X-Content-Type-Options: nosniff
X-Xss-Protection: 1; mode=block
Strict-Transport-Security: max-age=31536000; includeSubDomains
<AccessControlPolicy><Owner><ID></ID><DisplayName></DisplayName></Owner><AccessControlList><Grant><Grantee xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="CanonicalUser"><Type>CanonicalUser</Type></Grantee><Permission>FULL_CONTROL</Permission></Grant></AccessControlList></AccessControlPolicy>

Initiate multipart upload request:

[REQUEST s3.NewMultipartUpload] [2023-07-17T14:12:00.294] [Client IP: 10.120.12.132]
POST /lkh-kafka-connect-backup-test/topics/kafka-test-backup-topic2/partition%3D0/kafka-test-backup-topic2%2B0%2B0000000000.bin?uploads
Proto: HTTP/1.1
Host: ***
Amz-Sdk-Request: ttl=20230717T121250Z;attempt=1;max=4
Authorization: ***, SignedHeaders=amz-sdk-invocation-id;amz-sdk-request;amz-sdk-retry;content-length;content-type;host;user-agent;x-amz-content-sha256;x-amz-date, Signature=2eb60a1ee6b0568b7c5746336d708d37bee8f4e927f00376674e7693bf6cc2ce
Content-Type: application/octet-stream
X-Amz-Content-Sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
X-Forwarded-For: 10.120.12.13210.243.0.15
Amz-Sdk-Invocation-Id: 5332201e-49eb-aa20-adef-58995178c4da
Via: 1.1 sqdeb (squid/3.4.8)
X-Forwarded-Host: ***
X-Forwarded-Port: 80
Amz-Sdk-Retry: 0/0/500
Cache-Control: max-age=0
Content-Length: 0
Forwarded: for=10.243.0.15;host=***;proto=http
User-Agent: APN/1.0 Confluent/1.0 KafkaS3Connector/10.5.1, aws-sdk-java/1.12.268 Linux/4.18.0-305.72.1.el8_4.x86_64 OpenJDK_64-Bit_Server_VM/17.0.6+10-LTS java/17.0.6 scala/2.13.10 kotlin/1.4.10 vendor/Red_Hat,_Inc. cfg/retry-mode/legacy
X-Amz-Date: 20230717T121200Z
X-Forwarded-Proto: http
***
[RESPONSE] [2023-07-17T14:12:00.298] [ Duration 4.442ms  ↑ 239 B  ↓ 411 B ]
200 OK
X-Content-Type-Options: nosniff
Accept-Ranges: bytes
Server: MinIO
Strict-Transport-Security: max-age=31536000; includeSubDomains
Vary: Origin,Accept-Encoding
X-Amz-Id-2: dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8
X-Amz-Request-Id: 1772A689A3186949
Content-Length: 411
Content-Type: application/xml
X-Xss-Protection: 1; mode=block
<?xml version="1.0" encoding="UTF-8"?>
<InitiateMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Bucket>lkh-kafka-connect-backup-test</Bucket><Key>topics/kafka-test-backup-topic2/partition=0/kafka-test-backup-topic2+0+0000000000.bin</Key><UploadId>MmVlNDRlMGItNzkwOC00YWU1LTk2ZGEtN2NiYmM2YWY0YzVhLjU3MmEyZTU3LTBjN2MtNGQyYi1hZDU1LTg0YjlmMTNmNGNhMA</UploadId></InitiateMultipartUploadResult>

Multipart upload abort request (immediately after new multipart reqeust initiated)

[REQUEST s3.AbortMultipartUpload] [2023-07-17T14:12:00.370] [Client IP: 10.120.12.132]
DELETE /lkh-kafka-connect-backup-test/topics/kafka-test-backup-topic2/partition%3D0/kafka-test-backup-topic2%2B0%2B0000000000.bin?uploadId=MmVlNDRlMGItNzkwOC00YWU1LTk2ZGEtN2NiYmM2YWY0YzVhLjU3MmEyZTU3LTBjN2MtNGQyYi1hZDU1LTg0YjlmMTNmNGNhMA
Proto: HTTP/1.1
Host: ***
User-Agent: APN/1.0 Confluent/1.0 KafkaS3Connector/10.5.1, aws-sdk-java/1.12.268 Linux/4.18.0-305.72.1.el8_4.x86_64 OpenJDK_64-Bit_Server_VM/17.0.6+10-LTS java/17.0.6 scala/2.13.10 kotlin/1.4.10 vendor/Red_Hat,_Inc. cfg/retry-mode/legacy
Via: 1.1 sqdeb (squid/3.4.8)
X-Amz-Content-Sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
X-Forwarded-Port: 80
Amz-Sdk-Invocation-Id: c0e79488-8ca7-3e72-dc62-fbfcb143ff19
Amz-Sdk-Retry: 0/0/500
Cache-Control: max-age=0
Content-Type: application/octet-stream
X-Forwarded-For: 10.120.12.13210.243.0.15
X-Forwarded-Host: ***
Amz-Sdk-Request: ttl=20230717T120543Z;attempt=1;max=4
Authorization: ***, SignedHeaders=amz-sdk-invocation-id;amz-sdk-request;amz-sdk-retry;content-type;host;user-agent;x-amz-content-sha256;x-amz-date, Signature=ddc7175736463b6e78f8a82eb4ca290c0bd1b67897ef4fd3cf1972b231c9a87d
X-Forwarded-Proto: http
Content-Length: 0
Forwarded: for=10.243.0.15;host=***;proto=http
X-Amz-Date: 20230717T121200Z
***
[RESPONSE] [2023-07-17T14:12:00.371] [ Duration 950µs  ↑ 239 B  ↓ 0 B ]
204 No Content
Accept-Ranges: bytes
Server: MinIO
Strict-Transport-Security: max-age=31536000; includeSubDomains
X-Content-Type-Options: nosniff
Content-Length: 0
Vary: Origin,Accept-Encoding
X-Amz-Id-2: dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8
X-Amz-Request-Id: 1772A689A7A5004A
X-Xss-Protection: 1; mode=block

From the kafka connect logs it seems there is 400 bad request for part upload but there are no more information about that bad request and also logs in our Minio instance doesn't capture any other requests (only initiate the multipart upload and abort multipart upload)

issue caused by proxy settings