confluentinc/kafka-connect-storage-cloud

Request for Feature: Confirming Completion and Handling Old Data in S3 Kafka Connector

vibhormishra-sg opened this issue · 0 comments

I have the following connector config that flushes out data to s3 every minute:

  "tasks.max": "3",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "path.format": "YYYY/MM/dd/HH/mm",
  "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "timestamp.extractor": "Record",
  "partition.duration.ms": "60000",  # 1 min
  "flush.size": "1000",
  "rotate.interval.ms": "60000",  # 1 min

The Kafka topic has three partitions, and the connector is configured with three task workers for each partition. The goal is to write data to an S3 bucket every minute with the path.format="YYYY/MM/dd/HH/mm" setting. Each worker writes data in parallel to the same S3 subfolder, such as s3=bucket/topic/2024/1/24/10/29. However, there is a scenario where two writers/processes may complete writing to a subfolder and move on to the next timestamp, while one writer is still completing the previous folder.

I need a mechanism to confirm that all writers/parallel processes have finished writing data to a particular S3 bucket folder before moving on to the next subfolder. While a buffer of another minute is considered, a more reassuring solution would involve confirmation from the writers.

Additionally, there is a concern about old data. If the orchestrator processes all data from s3=bucket/topic/2024/1/24/10/29 and an older record (e.g., 5 minutes old) arrives in the topic, the connector writes it to an old bucket folder (`s3=bucket/topic/2024/1/24/10/24). The orchestrator may not be aware of this old data. The question arises whether it makes sense to dump this old data into a different folder in the S3 bucket.