Pravega Data Importer is a collection of Apache Flink applications for importing stream data to Pravega from various sources.
It provides the following Flink jobs:
- stream-mirroring: Continuously copy a Pravega Stream to another Pravega Stream, even on a different Pravega cluster
- kafka-stream-mirroring: Continuously copy a Kafka topic to a Pravega Stream
Each job uses Flink checkpoints to provide exactly-once guarantees, ensuring that events are never missed nor duplicated. They automatically recover from failures and resume where they left off. They can use parallelism for high-volume streams with multiple segments.
To learn more about Pravega, visit http://pravega.io
-
Java JDK 11+. On Ubuntu, this can be installed with:
sudo apt-get install openjdk-11-jdk
-
A Pravega installation
-
A Flink installation
This Flink job will continuously copy a Pravega Stream to another Pravega Stream.
It uses Flink checkpoints to provide exactly-once guarantees, ensuring that events
are never missed nor duplicated.
It automatically recovers from failures and resumes where it left off.
It can use parallelism for high-volume streams with multiple segments.
It marks the associated input and output Pravega Streams with stream tag "mirror"
.
First build project:
./gradlew clean build installDist -x test
Make sure you have a Flink cluster running locally. In the Flink installation directory, you can run the following command to start a Flink cluster locally:
bin/start-cluster.sh
In the input and output Pravega clusters, make sure the associated stream scope has already been created: In the Pravega CLI on both source and destination clusters:
scope create <scope-name>
The help
command in the Pravega CLI gives more information on how to use the Pravega CLI.
Please navigate to this directory once gradle build is complete:
cd pravega-data-importer/build/install/pravega-data-importer/bin
Running the following help commands in above directory show the parameters needed to run the script:
pravega-data-importer --help
pravega-data-importer stream-mirroring --help
This example takes a Pravega Stream running at localhost:9090
with stream source/sourceStream
as input, and outputs to
Pravega Stream destination/destinationStream
running at localhost:9990
.
pravega-data-importer
stream-mirroring
input-controller=tcp://localhost:9090
input-stream=source/sourceStream
input-startAtTail=false
output-stream=destination/destinationStream
output-controller=tcp://127.0.0.1:9990
flinkHost=localhost
flinkPort=8081
To test if the job is working, start up an instance of pravega-cli
and connect to the destination cluster.
The destination scope and stream should be visible using scope list
command and stream list <scope-name>
.
To see the data importer in action, you can use the consoleWriter
functionality provided in
pravega-samples after your Flink job has been submitted.
After building the project, navigate and run:
cd pravega-samples/pravega-client-examples/build/install/pravega-client-examples
bin/consoleWriter -scope <source> -name <sourceStream>
After entering submitting some input to the consoleWriter
,
the same output should appear in the destination stream, showing that the job is working.
This Flink job will continuously copy a Kafka stream to a Pravega Stream.
It uses Flink checkpoints to provide exactly-once guarantees, ensuring that events
are never missed nor duplicated.
It can use parallelism for high-volume streams with multiple segments.
It marks the associated output Pravega Stream with stream tag "kafka-mirror"
.
First build project:
./gradlew clean build installDist -x test
Make sure you have a Flink cluster running locally. In the Flink installation directory, you can run the following command to start a Flink cluster locally:
bin/start-cluster.sh
In the output Pravega cluster, make sure the associated stream scope has already been created: In the Pravega CLI:
scope create <destination-scope>
The help
command in the Pravega CLI gives more information on how to use the Pravega CLI.
Please navigate to this directory once gradle build is complete:
cd pravega-data-importer/build/install/pravega-data-importer/bin
Running the following help commands in above directory show the parameters needed to run the script:
pravega-data-importer --help
pravega-data-importer kafka-stream-mirroring --help
This example takes a Kafka stream running at localhost:9092
with topic test-input
and outputs to
Pravega Stream destination/from-kafka
running at localhost:9090
.
pravega-data-importer
kafka-stream-mirroring
input-topic=test-input
bootstrap-servers=localhost:9092
output-stream=destination/from-kafka
output-controller=tcp://127.0.0.1:9090
isStreamOrdered=true
To test if the job is working, start up an instance of pravega-cli
and connect to the destination cluster.
The destination scope and stream should be visible using scope list
command and stream list <scope-name>
.
To write to a local Kafka cluster, please install Kafka and follow the instructions listed on the Quickstart page.
Once you have Kafka running, running this command will create a "test-input"
topic with Kafka running at
localhost:9092
:
bin/kafka-topics.sh --create --topic test-input --bootstrap-server localhost:9092
The data importer can extract lots of different information from a Kafka message, including key, value, headers, partition number, topic, timestamp, and timestamp type.
With the following command, key/value pairs can be written:
bin/kafka-console-producer.sh --topic test-input --bootstrap-server localhost:9092 \
--property parse.key=true --property key.separator=":"
On the destination Pravega side, the destination stream should show PravegaRecord
Java objects with the input,
as well as the other information extracted from Kafka messages mentioned above.