/amazon-kinesis-data-processor-aws-fargate

Sample code for the AWS Big Data Blog Post Building a scalable streaming data processor with Amazon Kinesis Data Streams on AWS Fargate

Primary LanguagePythonMIT No AttributionMIT-0

Building a scalable streaming data processor with Amazon Kinesis Data Streams on AWS Fargate

Florian Mair, Solutions Architect

Data is ubiquitous in businesses today, and the volume and speed of incoming data are constantly increasing. To derive insights from data, it's essential to deliver it to a data lake or a data store and analyze it. Real-time or near-real-time data delivery can be cost prohibitive, therefore an efficient architecture is key for processing, and becomes more essential with growing data volume and velocity.

In this post, we show you how to build a scalable producer and consumer application for Amazon Kinesis Data Streams running on AWS Fargate. Kinesis Data Streams is a fully managed and scalable data stream that enables you to ingest, buffer, and process data in real time. AWS Fargate is a serverless compute engine for containers that works with AWS container orchestration services like Amazon Elastic Container Service (Amazon ECS), which allows us to easily run, scale, and secure containerized applications.

This solution also uses the Amazon Kinesis Producer Library (KPL) and Amazon Kinesis Client Library (KCL) to ingest data into the stream and to process it. KPL helps you optimize shard utilization in your data stream by specifying settings for aggregation and batching as data is being produced into your data stream. KCL helps you write robust and scalable consumers that can keep up with fluctuating data volumes being sent to your data stream.

The sample code for this post is available in a GitHub repo, which also includes an AWS CloudFormation template to get you started.

What is data streaming?

Before we look into the details of data streaming architectures, let's get started with a brief overview of data streaming. Streaming data is data that is generated continuously by a large number of sources that transmit the data records simultaneously in small packages. You can use data streaming for many use cases, such as log processing, clickstream analysis, device geo-location, social media data processing, and financial trading.

A data streaming application consists of two layers: the storage layer and the processing layer. As stream storage, AWS offers the managed services Kinesis Data Streams and Amazon Managed Streaming for Apache Kafka (Amazon MSK), but you can also run stream storageslike Apache Kafka or Apache Flume on Amazon Elastic Compute Cloud (Amazon EC2) or Amazon EMR. The processing layer consumes the data from the storage layer and runs computations on that data. This could be an Apache Flink application running fully managed on Amazon Kinesis Analytics for Apache Flink, an application running stream processing frameworks like Apache Spark Streaming and Apache Storm or a custom application using the Kinesis API or KCL. For this post, we use Kinesis Data Streams as the storage layer and the containerized KCL applicationon AWS Fargate as the processing layer.

Streaming data processing architecture

This section gives a brief introduction to the solution's architecture, as shown in the following diagram.

The architecture consists of four components:

  • Producer group (data ingestion)

  • Stream storage

  • Consumer group (stream processing)

  • Kinesis Data Streams auto scaling

Data ingestion

For ingesting data into the data stream, you use the KPL, which aggregates, compresses, and batches data records to make the ingestion more efficient. In this architecture, the KPL increased the per-shard throughput up to 100 times, compared to ingesting the records with the PutRecord API (more on this in the Monitoring your stream and applications section). This is because the records are smaller than 1 KB each and the example code uses the KPL to buffer and send a collection of records in one HTTP request.

The record buffering can consume enough memory to crash itself; therefore, we recommend handling back-pressure. A sample on handling back-pressure is available in the KPL GitHub repo.

Not every use case is suited for using the KPL for ingestion. Due to batching and aggregation, the KPL has to buffer records, and therefore introduces some additional per-record latency. For a large number of small producers (such as mobile applications), you should use the PutRecords API to batch records or implement a proxy that handles aggregation and batching.

In this post, you set up a simple HTTP endpoint that receives data records and processes them using the KPL. The producer application runs in a Docker container, which is orchestrated by Amazon ECS on AWS Fargate. A target tracking scaling policy manages the number of parallel running data ingestion containers. It adjusts the number of running containers so you maintain an average CPU utilization of 65%.

Stream storage: Kinesis Data Streams

As mentioned earlier, you can run a variety of streaming platforms on AWS. However, for the data processor in this post, you use Kinesis Data Streams. Kinesis Data Streams is a data store where the data is held for 24 hours and configurable up to 1 year. Kinesis Data Streams is designed to be highly available and redundant by storing data across three Availability Zones in the specified Region.

The stream consists of one or more shards, which are uniquely identified sequences of data records in a stream. One shard has a maximum of 2 MB/s in reads (up to five transactions) and 1 MB/s writes per second (up to 1,000 records per second). Consumers with Dedicated Throughput (Enhanced Fan-Out) support up to 2 MB/s data egress per consumer and shard.

Each record written to Kinesis Data Streams has a partition key, which is used to group data by shard. In this example, the data stream starts with five shards. You use random generated partition keys for the records because records don't have to be in a specific shard. Kinesis Data Streams assigns a sequence number to each data record, which is unique within the partition key. Sequence numbers generally increase over time so you can identify which record was written to the stream before or after another.

Stream processing: KCL application on AWS Fargate

This post shows you how to use custom consumers - specifically, enhanced fan-out consumers---using the KCL. Enhanced fan-out consumers have a dedicated throughput of 2 MB/s and use a push model instead of pull to get data. Records are pushed to the consumer from the Kinesis Data Streams shards using HTTP/2 Server Push, which also reduces the latency for record processing. If you have more than one instance of a consumer, each instance has a 2 MB/s fan-out pipe to each shard independent from any other consumers. You can use enhanced fan-out consumers with the AWS SDK or the KCL.

For the producer application, this example uses the KPL, which aggregates and batches records. For the consumer to be able to process these records, the application needs to deaggregate the records. To do this, you can use the KCL or the Kinesis Producer Library Deaggeragtion Modules for AWS Lambda (support for Java, Node.js, Python, and Go). The KCL is a Java library but also supports other languages via a MultiLangDaemon. The MultiLangDaemon uses STDIN and STDOUT to communicate with the record processor, so be aware of logging limitations. For this sample application, you use enhanced fan-out consumers with the KCL for Python 2.0.1.

Due to the STDOUT limitation, the record processor logs data records to a file that is written to the container logs and published to Amazon CloudWatch. If you create your own record processor, make sure it handles exceptions, otherwise records may be skipped.

The KCL creates an Amazon DynamoDB table to keep track of consumer progress. For example, if your stream has four shards and you have one producer instance, your instance runs a separate record processor for each shard. If the consumer scales to two instances, the KCL rebalances the record processor and runs two record processors on each instance. For more information, see Using the Kinesis Client Library.

A target tracking scaling policy manages the number of parallel running data processor containers. It adjusts the number of running containers to maintain an average CPU utilization of 65%.

Container configuration

The base layer of the container is Amazon Linux 2 with Python 3 and Java 8. Although you use KCL for Python, you need Java because the record processor communicates with the MultiLangDaemon of the KCL.

During the Docker image build, the Python library for the KCL (version 2.0.1 of amazon_kclpy) is installed, and the sample application (release 2.0.1) from the KCL for Python GitHub repo is cloned. This allows you to use helper tools (samples/amazon_kclpy_helper.py) so you can focus on developing the record processor. The KCL is configured via a properties file (record_processor.properties).

For logging, you have to distinguish between logging of the MultiLangDaemon and the record processor. The logging configuration for the MultiLangDaemon is specified in logback.xml, whereas the record processor has its own logger. The record processor logs to a file and not to STDOUT, because the MultiLangDaemon uses STDOUT for communication, therefore the Daemon would throw an unrecognized messages error.

Logs written to a file (app/logs/record_processor.log) are attached to container logs by a subprocess that runs in the container entry point script (run.sh). The starting script also runs set_properties_py, which uses environment variables to set the AWS Region, stream name, and application name dynamically. If you want to also change other properties, you can extend this script.

The container gets its permissions (such as to read from Kinesis Data Streams and write to DynamoDB) by assuming the role ECSTaskConsumerRole01. This sample deployment uses 2 vCPU and 4 GB memory to run the container.

Prerequisites

For this walkthrough, you must have an AWS account.

Solution overview

In this post, we walk through the following steps:

  1. Deploying the CDK Stack template.

  2. Sending records to Kinesis Data Streams.

  3. Monitoring your stream and applications.

Deploying the CDK Stack

  1. Clone the repository
  2. cd cdk
  3. npm install
  4. cdk deploy

The following Regions are supported:

  • US East (Ohio)

  • US West (N. California)

  • US West (Oregon)

  • Asia Pacific (Singapore)

  • Asia Pacific (Sydney)

  • Europe (Frankfurt)

  • Europe (Ireland)

Sending records to Kinesis Data Streams

You have several options to send records to Kinesis Data Streams. You can do it from the CLI or any API client that can send REST requests. To do a POST request via curl, run the following command and replace ALB_ENDPOINT with the DNS record of your Application Load Balancer. You can find it on the CloudFormation stack's Outputs tab. Ensure you have a JSON element "data". Otherwise, the application can't process the record.

curl --location --request POST '*<ALB_ENDPOINT>*' --header 'Content-Type: application/json' --data-raw '{"data":" This is a testing record"}'

Your Application Load Balancer is the entry point for your data records, so all traffic has to pass through it. Application Load Balancers automatically scale to the appropriate size based on traffic by adding or removing different sized load balancer nodes.

Monitoring your stream and applications

The CloudFormation template creates a CloudWatch dashboard. You can find it on the CloudWatch console or by choosing the link on the stack's Outputs tab on the CloudFormation console. The following screenshot shows the dashboard.

This dashboard shows metrics for the producer, consumer, and stream. The metric Consumer Behind Latest gives you the offset between current time and when the last record was written to the stream. An increase in this metric means that your consumer application can't keep up with the rate records are ingested. For more information, see Consumer Record Processing Falling Behind.

The dashboard also shows you the average CPU utilization for the consumer and producer applications, the number of PutRecords API calls to ingest data into Kinesis Data Streams, and how many user records are ingested.

Without using the KPL, you would see one PutRecord equals one user record, but in our architecture, you should see a significantly higher number of user records than PutRecords. The ratio between UserRecords and PutRecords operations strongly depends on KPL configuration parameters. For example, if you increase the value of RecordMaxBufferedTime, data records are buffered longer at the producer, more records can be aggregated, but the latency for ingestion is increased.

All applications publish logs to their respective log group in CloudWatch. You can either check the CloudWatch logs of the Auto Scaling Application or the data stream metrics to see the scaling behavior of Kinesis Data Streams.

Cleaning up

Run the command cdk destroy from the cdk directory.

Conclusion

In this post, you saw how to run the KCL for Python on AWS Fargate to consume data from Kinesis Data Streams. The post also showed you how to scale the data production layer (KPL), data storage layer (Kinesis Data Streams), and the stream processing layer (KCL). You can build your own data streaming solution by deploying the sample code from the GitHub repo. To get started with Kinesis Data Streams, see Getting Started with Amazon Kinesis Data Streams.

License

This library is licensed under the MIT-0 License. See the LICENSE file.