brook_green_supply

CSV to DeltaLake Ingestion Job

This project implements a Spark job for ingesting one or multiple CSV files into DeltaLake, while adding extra columns and ensuring data quality. The job is designed to run in a distributed environment using Apache Spark, and it can handle CSV files with or without headers. The ingested data is stored in Delta tables for further analysis and querying.

Features

  1. Ingests one or multiple CSV files into DeltaLake.
  2. Handles CSV files with and without headers.
  3. Adds two extra columns to the output DataFrame: ingestion_tms and batch_id.
  4. Ensures data quality by checking if the DataFrame is empty.
  5. Writes the data to Delta tables using the APPEND write mode.
  6. Provides logging for error handling and monitoring.

System Architecture

The system architecture for the CSV to DeltaLake ingestion job is as follows:

System Architecture

brook_green_supply.pdf

The components of the system architecture are described below:

User: This component represents the user or entity that triggers the ingestion job. It can be an end-user or an automated process.

AWS Lambda Function: This component is responsible for triggering the AWS Batch job that runs the Spark ingestion job. It receives the user's request and initiates the job execution.

AWS Batch: This component is a managed service that handles the execution and orchestration of batch computing workloads. It manages the allocation of compute resources, schedules jobs, and monitors their execution.

EC2 Instances managed by ECS: This component consists of a pool of Amazon EC2 instances managed by Amazon ECS (Elastic Container Service). These instances are used to run the Spark job in a distributed environment. ECS manages the container orchestration and ensures the Spark job is executed efficiently.

Spark Job: This component represents the main Spark job responsible for ingesting the CSV files into DeltaLake. It reads the CSV files, applies transformations, handles null values and duplicates, adds extra columns, and writes the data to Delta tables.

S3 (input data): This component is an Amazon S3 bucket where the input CSV files are stored. The Spark job reads the files from this bucket.

S3 (output data): This component is an Amazon S3 bucket where the output data in Delta format is stored. The Spark job writes the processed data into this bucket for further analysis and querying.

CloudWatch: This component is a monitoring and logging service provided by AWS. It collects and stores logs from various components of the system, including the Lambda function, AWS Batch, and the Spark job. It allows for easy troubleshooting and monitoring of the system's performance.

S3 (for logs): This component is an Amazon S3 bucket used to store logs generated by the Spark job. The logs can be used for auditing, debugging, and tracking purposes.

Spark History Server: This component is a separate service provided by Spark that runs on an EC2 instance. It collects and presents logs and metrics from completed Spark applications, providing a web UI for easy monitoring and analysis.

System Requirements

Python 3 PySpark 3.3.1 DeltaLake 1.2.1 Docker (optional, for containerization) AWS Account (optional, for deployment on AWS) For more detailed information about the system architecture, refer to the diagram included in the repository: system_architecture.png

Installation and Setup

...

The rest of the README document remains the same as in the previous version.

Feel free to modify this README document to suit your

Installation and Setup

Clone the repository:

git clone <repository_url>

Install the required Python packages:

pip install pyspark==3.3.1 delta-spark==1.2.1 python-dotenv py4j

Set up the environment variables:

Rename the .env.example file to .env.

Update the environment variables in the .env file according to your file paths and output locations.

Usage

Prepare your CSV files:

Ensure your CSV files are in the correct format and located in the specified file paths. If your CSV files have headers, make sure they match the defined schema in the green.py script.

Run the Spark job locally:

Execute the following command:

python green.py

The job will read the CSV files, perform data transformations, and write the data to Delta tables.

Run the Spark job in a Docker container:

Build the Docker image:

docker build -t csv-to-deltalake .

Run the Docker container:

docker run -v /path/to/csv/files:/data -e FILE_WITH_HEADER_PATH=/data/file_with_header.csv -e FILE_WITHOUT_HEADER_PATH=/data/file_without_header.csv -e GEN_FILE_WITH_HEADER=/data/gen_files_with_header.csv -e GEN_FILE_WITHOUT_HEADER=/data/gen_files_without_header.csv csv-to-deltalake The job will run inside the Docker container, reading the CSV files and writing the data to Delta tables.

Testing

Unit tests have been provided to ensure the functionality of the Spark job. To run the tests, execute the following command: python test_green.py

Tech Debt Deploy the job on AWS:

Create an AWS Lambda function to trigger the AWS Batch job. Create an AWS Batch job definition and job queue. Set up the necessary AWS resources, including S3 buckets and AWS Glue Catalog. Update the environment variables in the .env file with the AWS resource details. Trigger the Lambda function to start the AWS Batch job.