
Primary LanguageJavaGNU Affero General Public License v3.0AGPL-3.0

Aiven Kafka S3 Connector

Data Format

S3 Object names

S3 connector stores series of files in the specified bucket. Each object is named using pattern [<aws_s3_prefix>]<topic>-<partition>-<startoffset>[.gz]. The .gz extension is used if gzip compression is used, see output_compression below.

The connector creates one file per Kafka Connect offset.flush.interval.ms setting for partitions that have received new messages during that period. The setting defaults to 60 seconds.

Data file format

Data is stored in one record per line in S3. The format is comma separated fields specified by output_fields configuration option. If key and value fields are selected, they are written out in Base64 encoded form.

For example, output_fields of value,key,timestamp results in rows looking something like this:



S3 permissions

S3 connector needs the following permissions to the specified bucket:

  • s3:GetObject
  • s3:PutObject
  • s3:AbortMultipartUpload
  • s3:ListMultipartUploadParts
  • s3:ListBucketMultipartUploads

In case of Access Denied error see https://aws.amazon.com/premiumsupport/knowledge-center/s3-troubleshoot-403/

Connector Configuration


AWS Access Key ID for accessing S3 bucket. Mandatory.


AWS S3 Secret Access Key. Mandatory.


Name of the region for the bucket used for storing the records. Defaults to us-east-1.


Name of an existing bucket for storing the records. Mandatory.


The prefix that will be added to the file name in the bucket. Can be used for putting output files into a subdirectory.


The parameter supports templating using {{ var }} for variables that will be substituted with values.

Currently supported variables are:

  • {{ utc_date }} - the current date in UTC time zone.
  • {{ utc_year }} - the current year in UTC time zone.
  • {{ utc_month }} - the current month in UTC time zone.
  • {{ utc_day }} - the current day of the month in UTC time zone.
  • {{ local_date }} - the current date in the local time zone.
  • {{ local_year }} - the current year in the local time zone.
  • {{ local_month }} - the current month in the local time zone.
  • {{ local_day }} - the current day of the month in the local time zone.

Both dates are formatted in ISO 8601 format, e.g.: 2019-03-26.

The date of the moment when the file is being uploaded to S3 is used.

For example: some-directory/{{ utc_date }}/.


Connector class name, in this case: io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector.


Connector-specific key encoding, must be set to org.apache.kafka.connect.converters.ByteArrayConverter.


Connector-specific value encoding, must be set to org.apache.kafka.connect.converters.ByteArrayConverter.


Topics to subscribe to. See Kafka Connect documentation for details. E.g. demo_topic,another_topic.


Compression type for output files. Supported algorithms are gzip and none. Defaults to gzip.


A comma separated list of fields to include in output. Supported values are: key, offset, timestamp and value. Defaults to value.


S3 connector definition example:

curl -X POST \
    -H "Content-Type: application/json" \
    -d @- \
    https://avnadmin:password@demo-kafka.aivencloud.com:17070/connectors <<EOF
            "name": "example-s3-sink",
            "config": {
                "aws_access_key_id": "AKI...",
                "aws_secret_access_key": "SECRET_ACCESS_KEY",
                "aws_s3_bucket": "aiven-example",
                "aws_s3_prefix": "example-s3-sink/",
                "aws_s3_region": "us-east-1",
                "connector.class": "io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector",
                "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
                "output_compression": "gzip",
                "output_fields": "value,key,timestamp",
                "tasks.max": 1,
                "topics": "source_topic,another_topic",
                "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"