/amazon-kinesis-data-analytics-flink-starter-kit

Amazon Kinesis Data Analytics Flink Starter Kit helps you with the development of Flink Application with Kinesis Stream as a source and Amazon S3 as a sink. This demonstrates the use of Session Window with AggregateFunction.

Primary LanguageJavaMIT No AttributionMIT-0

Amazon Kinesis Data Analytics Flink – Starter Kit


🚨 August 30, 2023: Amazon Kinesis Data Analytics has been renamed to Amazon Managed Service for Apache Flink.


🚨 This example refers to an old Apache Flink version (1.11) and managed service runtime. For newer examples, refer to then new Blueprints repository and general Amazon Managed Service for Apache Flink examples


Amazon Kinesis Data Analytics Flink Starter Kit helps you with the development of Flink Application with Kinesis Stream as a source and Amazon S3 as a sink. This demonstrates the use of Session Window with AggregateFunction.

Contents:


Architecture

The Architecture of this starter kit is shown in the below diagram Alt

Application Overview

Pre-requisites

  1. JDK 11
  2. IDE for e.g. Eclipse or Spring Tools or Intellij IDEA
  3. Apache Maven
  4. AWS CLI
  5. This starter kit tested with the Apache Flink Version 1.11.1

AWS Service Requirements

The following AWS services are required to deploy this starter kit:

  1. 1 Amazon S3 Bucket
  2. 1 Amazon Kinesis Data Stream
  3. 1 Amazon Kinesis Data Analytics Flink Application
  4. 1 IAM role with 4 policies

Build Instructions

Build Apache Flink Application

  1. Clone this starter kit to your Laptop / MacBook
  2. It has Maven nature, so you can import it to your IDE.
  3. Build the Jar file using one of the steps below:
    1. Using standalone Maven, go to project home directory and run command mvn -X clean install
    2. From Eclipse or STS, run command -X clean install. Navigation: Project right click --> Run As --> Maven Build (Option 4)
  4. Build process will generate a jar file amazon-kinesis-data-analytics-flink-starter-kit-1.0.jar.
  5. Note: The size of the jar file is around 46 MB

Deployment Instructions

You can deploy the Starter Kit using either AWS CLI or AWS Console.

Flink Application Properties

  1. The Starter Kit requires the following properties

    Key Value Description
    region us-east-1 AWS region
    input_stream_name kda_flink_starter_kit_kinesis_stream Input Kinesis Data Stream Name
    session_time_out_in_minutes 10 Session timeout in minutes
    stream_initial_position TRIM_HORIZON Refer documentation here for more details
    s3_output_path s3a://<bucket_name>/kda_flink_starter_kit_output s3 path for Flink Application output
    bucket_check_interval_in_seconds 2 interval for checking time based rolling policies
    rolling_interval_in_seconds 2 the max time a part file can stay open before having to roll
    inactivity_interval_in_seconds 2 Sets the interval of allowed inactivity after which a part file will have to roll

Using AWS CLI

  1. Log onto AWS console and go to S3, select the bucket you will use. If not create a new bucket and go to the bucket

  2. Create a folder with name kda_flink_starter_kit_jar

  3. Create a folder with name kda_flink_starter_kit_output

  4. Open command prompt on your Laptop / MacBook

  5. Upload Flink Application Jar file to S3 bucket

    aws s3 cp amazon-kinesis-data-analytics-flink-starter-kit-1.0.jar s3://bucket_name/kda_flink_starter_kit_jar/
  6. Create Kinesis Stream

    aws kinesis create-stream --stream-name kda_flink_starter_kit_kinesis_stream --shard-count 4
  7. Create IAM policies. On your terminal, navigate to folder /amazon-kinesis-data-analytics-flink-starter-kit/src/main/resources

    1. Policy for CloudWatch Logs

      aws iam create-policy --policy-name flink_starter_kit_iam_policy_cloudwatch_logs \
      --policy-document file://flink_starter_kit_iam_policy_cloudwatch_logs.json
    2. Policy for CloudWatch

      aws iam create-policy --policy-name flink_starter_kit_iam_policy_cloudwatch \
      --policy-document file://flink_starter_kit_iam_policy_cloudwatch.json
    3. Policy for Kinesis Data Stream

      aws iam create-policy --policy-name flink_starter_kit_iam_policy_kinesis \
      --policy-document file://flink_starter_kit_iam_policy_kinesis.json
    4. Policy for S3

      aws iam create-policy --policy-name flink_starter_kit_iam_policy_s3 \
      --policy-document file://flink_starter_kit_iam_policy_s3.json
  8. Create an IAM role

    aws iam create-role --role-name flink_starter_kit_role --assume-role-policy-document file://flink_starter_kit_assume-role-policy-document.json
  9. Attach policies to IAM role flink_starter_kit_role. Replace <1234567890> with your AWS Account Id before running the commands.

    1. Policy for CloudWatch Logs

      aws iam attach-role-policy --role-name flink_starter_kit_role \
      --policy-arn arn:aws:iam::<1234567890>:policy/flink_starter_kit_iam_policy_cloudwatch_logs
    2. Policy for CloudWatch

      aws iam attach-role-policy --role-name flink_starter_kit_role \
      --policy-arn arn:aws:iam::<1234567890>:policy/flink_starter_kit_iam_policy_cloudwatch
    3. Policy for Kinesis

      aws iam attach-role-policy --role-name flink_starter_kit_role \
      --policy-arn arn:aws:iam::<1234567890>:policy/flink_starter_kit_iam_policy_kinesis
    4. Policy for S3

      aws iam attach-role-policy --role-name flink_starter_kit_role \
      --policy-arn arn:aws:iam::<1234567890>:policy/flink_starter_kit_iam_policy_s3
  10. Open flink_starter_kit_def_stream_position_trim_horizon.json and update the following values:

    1. AWS account number in attributes ServiceExecutionRole and LogStreamARN
    2. S3 bucket name for attribute BucketARN
    3. S3 bucket name for parameter s3_output_path under PropertyMaps
  11. Create Log group in CloudWatch Logs

    aws logs create-log-group --log-group-name /aws/kinesis-analytics/kda_flink_starter_kit
  12. Create Log stream in under the Log group

    aws logs create-log-stream --log-group-name /aws/kinesis-analytics/kda_flink_starter_kit \
    --log-stream-name kda_flink_starter_kit
  13. Run this command to create Kinesis Data Analytics Flink application

    aws kinesisanalyticsv2 create-application \
    --cli-input-json file://flink_starter_kit_def_stream_position_trim_horizon.json
  14. Run this command to start the application

    aws kinesisanalyticsv2 start-application \
    --cli-input-json file://flink_starter_kit_start_configuration.json

Using AWS Console

  1. Login to AWS Console
  2. Choose or create an S3 bucket to be used to runs this Quick Start
  3. Go to the S3 bucket, create a folder called kda_flink_starter_kit
  4. Go to the folder and upload the Jar generated in the previous section
  5. Create following IAM policies
    1. IAM policy with name flink_starter_kit_iam_policy_s3 using Policy summary sample
    2. IAM policy with name flink_starter_kit_iam_policy_kinesis using Policy summary sample
    3. IAM policy with name flink_starter_kit_iam_policy_cloudwatch using Policy summary sample
    4. IAM policy with name flink_starter_kit_iam_policy_cloudwatch_logs using Policy summary sample
  6. Create an IAM role with name kda_flink_starter_kit and attach above policies
  7. Create a Kinesis Data Stream
    1. Name = kda_flink_starter_kit_kinesis_stream
    2. Number of shards = 6
  8. Create Kinesis Data Analysis Application as follows:
    1. Application name = amazon_kda_flink_starter_kit
    2. Runtime = Apache Flink. Select version 1.8
  9. Click on Configure
    1. Amazon S3 bucket = Choose the bucket you selected in Step # 2
    2. Path to Amazon S3 object = must be the prefix for amazon-kinesis-data-analytics-flink-starter-kit-1.0.jar
    3. Under section Access to application resources select Choose from IAM roles that Kinesis Data Analytics can assume
    4. IAM role = Choose the IAM role created above
    5. Using the Jar file generated in the above step
    6. Select the Runtime as Flink 1.8
    7. IAM role = the IAM role created above
    8. Snapshot = Enable
    9. Monitoring -> Monitoring metrics level = Parallelism
    10. Monitoring -> Monitoring with CloudWatch Logs -> Enable, Monitoring log level = Info
    11. Scaling -> Parallelism = 10, Parallelism per KPU = 1
  10. Under Properties, click on Add group and provide the Group ID as FlinkAppProperties. Create properties defined in the section Flink Application Properties

Testing Instructions

You can use Amazon Kinesis Data Analytics Flink – Benchmarking Utility to generate sample data, test Apache Flink Session Window, and to prove the architecture of this starter kit.


Future Releases

The future releases of this starter kit will include the following features

  1. Add example(s) for Event Time / Processing Time based streaming application. Refer Apache Flink documentation for more details.

Contributions are welcome, refer CONTRIBUTING.md for more details.


License Summary

This sample code is made available under the MIT-0 license. See the LICENSE file.