/reddit-streaming

streaming eight subreddits from reddit api using kafka producer & spark structured streaming.

Primary LanguageJupyter Notebook

reddit-streaming

An attempt to stream multiple subreddits from the reddit api using kafka & spark, and store in s3 data lake as delta tables. nightly glue jobs processes raw data into clean data partitioned by year/month/day in athena.x

Build dockerfiles

Go to docker directory and run build script.

./build.sh

Run docker-compose.

docker-compose up -d --no-recreate

Access jupyterlab shell. Can also attach VSCode to the jupyterlab container.

docker exec -it jupyterlab bash

Start streaming data

Activate the virtual environment.

source redditStreaming/reddit-env/bin/activate

Go to reddit directory.

cd redditStreaming/src/reddit

Start pyspark streaming application.

python3 -m reddit_streaming.py

Common errors:

  • java gateway exited before sending port number: make sure java home is set, java -version is 1.8

Start kafka producer

Start the kafka producer.

python3 -m reddit_producer.py

Common errors:

  • stored cluster id xyz does not match: go to /cluster_config/kafka/logs/metadata.properties and change to correct cluster id

Remove untagged docker images

Remove untagged docker images.

docker rmi $(docker images | grep "^<none>" | awk "{print $3}")

Prune docker system volumes, containers & images.

docker system prune && docker volume prune && docker container prune && docker image prune

Note on versions

When changing version of spark, hadoop, jupyterlab, etc, versions must be updated in build.sh, respective *.Dockerfile, requirements.txt and reddit_streaming.py.

pyspark write stream to s3 error

Likely caused by guava jar mismatch, follow steps here: https://kontext.tech/article/689/pyspark-read-file-in-google-cloud-storage

Kafka/zookeeper broker id mismatch

wurstmeister/kafka-docker#409

If there are kafka errors, run docker-compose down, delete cluster_config/kafka/logs and cluster_config/zookeeper/data/version-2 directories, run docker-compose up -d.

S3

s3 artifact directory.

s3://aws-glue-assets-965504608278-us-east-2/scripts/

aws s3 sync

configure aws cli first.

aws configure

upload

aws s3 sync redditStreaming/src/main/python/scripts/ s3://reddit-streaming-stevenhurwitt/scripts/

download

aws s3 sync s3://reddit-streaming-stevenhurwitt/scripts/ redditStreaming/src/main/python/scripts/.

github actions

https://github.com/stevenhurwitt/reddit-streaming/actions

pipelines

  • python.yml
  • docker.yml
  • terraform.yml
  • aws.yml

python

Build wheel file.

cd redditStreaming/src/main/python/reddit && python3 setup.py bdist_wheel

docker

Docker Compose.

docker-compose down && ./build.sh && docker-compose up -d

jupyterlab shell.

docker exec -it jupyterlab bash

terraform

cd terraform && tf plan -out tfplan && tf apply tfplan s3://aws-glue-assets-965504608278-us-east-2/scripts/

enhancements

  • lambda function to backup s3 to local daily (aws s3 sync...)
  • glue function for s3 to docker postgres (aws is $6 a day??? could use a smaller instance?)
  • airflow to gracefully restart streaming and producer jobs as needed
  • could move from docker-compose local streaming app to cloud based
  • kubernetes cluster w/ raspberry pis and local pc