This is a custom flume sink for writing data to AWS s3. it uses AWS profile credentials (Credentials stored in ~/.aws/credentails file), so it enables to have any type of credentials like
- IAM USER (Static Credentials : access_key, secret_key)
- ASSUMED ROLE (Temporary Credentials : access_key, secret_key and session_token)
- FEDERATED TOKEN
it support various features like
- Batch Size
- Roll Interval
- Compression
- File Prefix
- File Sufix
It auto partitions the data while writing into S3 based on date and hours. For example, the file path will look like this
s3://customers/partition-date=28-05-2019/partition-hours=22-00/....
defaultRollInterval = 300;
defaultBatchSize = 500;
defaultAvroSchema = "";
defaultAvroSchemaRegistryURL = "";
defaultFilePrefix = "flumeS3Sink";
defaultFileSufix = ".data";
defaultTempFile = "/tmp/flumes3sink/data/file";
defaultCompress = "false";
There are many ways to build the jar from the source. one simplest way is using the below cmd
- Clone the git repo
- Run the maven cmd
mvn package
- Goto the flume installation directory and find the .../conf/flume-env.sh
- Add the fully qualified jar path to the FLUME_CLASSPATH in flume-env.sh
FLUME_CLASSPATH="/home/ravindrachellubani/Documents/code/git/flumes3sink/target/flume-s3-sink-1.0-SNAPSHOT-jar-with-dependencies.jar"
export FLUME_CLASSPATH=$FLUME_CLASSPATH:/home/ravindrachellubani/Documents/code/git/printsink/target/print-sink-1.0-SNAPSHOT-jar-with-dependencies.jar
Here is the sample flume agent configuration snippet for the sink
# FLUME SINK CONF
#-----------------
# don't give / at the end for the bucketname
# roll interval is in seconds
# compression codec is snappy
a1.sinks.k1.type = com.rab4u.flume.FlumeS3Sink
a1.sinks.k1.s3.bucketName = dp-cts-stream-test/after-sales/tracking
a1.sinks.k1.s3.awsRegion = eu-central-1
a1.sinks.k1.s3.filePrefix = after-sales
a1.sinks.k1.s3.FileSufix = avro
a1.sinks.k1.s3.rollInterval = 60
a1.sinks.k1.s3.tempFile = /home/ravindrachellubani/Documents/code/git/apache-flume-to-s3/after-sales-temp-file
a1.sinks.k1.s3.AvroSchema = /home/ravindrachellubani/Documents/code/git/apache-flume-to-s3/tracking.avsc
a1.sinks.k1.s3.batchSize = 500
a1.sinks.k1.s3.compress = false
# a1.sinks.k1.s3.AvroSchemaRegistryURL = "<SCHEMA REGISTRY URL>"
- HEAP MEMORY ISSUE
Solution :
open the flume-env.sh and change the below configuration
JAVA_OPTS="-Xms500m -Xmx1000m -Dcom.sun.management.jmxremote"
export JAVA_OPTS="$JAVA_OPTS ...................."
- AWS Credentials Not Found
check the ~/.aws/credentials file is present or not. if not create by running the following cmd
aws configure