/amazon-kinesis-connector-flink

This is a fork of the Apache Flink Kinesis connector adding Enhanced Fanout support for Flink 1.8/1.11 on KDA.

Primary LanguageJavaApache License 2.0Apache-2.0

Amazon Kinesis Connector for Apache Flink

This is a fork of the official Apache Flink Kinesis Connector.

The fork backports the following features to older versions of Flink:

Both features are already available in the official Apache Flink connector for Flink 1.12.

Quickstart

You no longer need to build the Kinesis Connector from source. Add the following dependency to your project to start using the connector.

For Flink 1.8

<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-connector-flink</artifactId>
    <version>1.6.1</version>
</dependency>

For Flink 1.11

<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-connector-flink</artifactId>
    <version>2.4.1</version>
</dependency>

Refer to the official Apache Flink documentation for more information on configuring the connector:

Note that Flink 1.11 does not support the "Available Metadata" functionality from upstream Table/SQL connector documentation. If you want to expose KDS metadata fields in your table definitions, consider upgrading to Flink 1.12 or higher and using the KDS connector from the upstream repository.

SQL Connector

The amazon-kinesis-sql-connector-flink module can be used to build a fully shaded connector that can be used with Flink SQL client.

Migration

If you are migrating from the Apache Flink Kinesis Connector, you should perform the following steps:

  1. Replace the dependency in your application pom.xml
  2. Migrate the prefix of packages for referenced classes
    • From: org.apache.flink.streaming.connectors.kinesis
    • To: software.amazon.kinesis.connectors.flink

For example

  • com.amazonaws.services.kinesisanalytics.flink.connectors.FlinkKinesisConsumer becomes
  • software.amazon.kinesis.connectors.flink.FlinkKinesisConsumer.

Flink Versions

This connector is compatible with Flink 1.11. For a version of this connector that is compatible with Flink 1.8 use the 1.x release line. Other versions of Flink may work, but are not officially supported.

Support

We will support this connector until KDA adds support for Apache Flink 1.12. Beyond this, we will not maintain patching or security for this repo. The Apache Flink Kinesis connector should be used instead of this library where possible.

Using EFO

Two additional properties are required to enable EFO on your FlinkKinesisConsumer:

  • RECORD_PUBLISHER_TYPE: Set this parameter to EFO for your application to use an EFO consumer to access the Kinesis Data Stream data.
  • EFO_CONSUMER_NAME: Set this parameter to a string value that is unique among the consumers of this stream. Re-using a consumer name in the same Kinesis Data Stream will cause the previous consumer using that name to be terminated.

To configure a FlinkKinesisConsumer to use EFO, add the following parameters to the consumer:

consumerConfig.putIfAbsent(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, "EFO");
consumerConfig.putIfAbsent(ConsumerConfigConstants.EFO_CONSUMER_NAME, "efo-consumer");

Note the additional IAM permissions required to use EFO:

{
  "Sid": "AllStreams",
  "Effect": "Allow",
  "Action": [
    "kinesis:ListShards",
    "kinesis:ListStreamConsumers"
  ],
  "Resource": "arn:aws:kinesis:<region>:<account>:stream/*"
},
{
  "Sid": "Stream",
  "Effect": "Allow",
  "Action": [
    "kinesis:DescribeStreamSummary",
    "kinesis:RegisterStreamConsumer"
  ],
  "Resource": "arn:aws:kinesis:<region>:<account>:stream/<stream-name>"
},
{
  "Sid": "Consumer",
  "Effect": "Allow",
  "Action": [
    "kinesis:DescribeStreamConsumer",
    "kinesis:SubscribeToShard",
    "kinesis:DeregisterStreamConsumer"
  ],
  "Resource": [
    "arn:aws:kinesis:<region>:<account>:stream/<stream-name>/consumer/<consumer-name>",
    "arn:aws:kinesis:<region>:<account>:stream/<stream-name>/consumer/<consumer-name>:*"
  ]
}

For more information refer to the official EFO documentation for the KDS connector.

Security

See CONTRIBUTING for more information.

License

This project is licensed under the Apache-2.0 License.