Containerized Apache Airflow etl pipeline for collecting and storing stock and cryptocurrency market data following object-oriented programming and unit testing framework.
Below is the default architecture diagram of the MarketPipe system, illustrating the flow of data from source to storage:
- Dynamic Data Source Integration: Extend and customize by plugging in new data sources with ease. Just drop in a module, and you're set!
- Automated Data Workflows: Let Airflow do the heavy lifting with scheduled data collection, so you can focus on making strategic decisions.
- Containerized from the Start: Dockerized setup means you get up and running in no time, maintaining consistency across all environments.
- User-Centric Configuration: Tailor everything via
mdp_config.json
to fit your unique needs, whether itβs tweaking the assets or adjusting the schedule. - Built-In Security: We take security seriously, so you're covered from the ground up with best practices in secure data handling.
- Design by Contract with Pysertive: We use the Pysertive library to enforce and invariants in our code. This ensures that stocks and cryptos are provided in the configuration file, enhancing the robustness and reliability of our application.
MarketPipe is structured with clarity and modularity in mind. Here's what you'll find inside:
MarketTrackPipe/
βββ core/ # The heart of MarketTrackPipe's logic
β βββ base_api.py # Defines the BaseApiClient abstract base class and ApiClientFactory
β βββ crypto_api_client.py # Retrieves cryptocurrency data
β βββ data_processor.py # Orchestrates the data retrieval and storage
β βββ __init__.py # Marks the directory as a Python package
β βββ stock_api_client.py # Retrieves stock market data
β βββ storage.py # Manages the storage of market data
βββ custom/ # A place for custom extensions and modules
β βββ __init__.py # Marks the directory as a Python package
β βββ stock_api_client.py # Custom stock API client
βββ dags/ # Contains the Airflow DAG definitions
β βββ market_data_dag.py # Defines the DAG for processing market data
βββ database_setup/ # SQL scripts for database initialization
β βββ init.sql # SQL script for initializing the database
βββ docker/ # Dockerfiles for container setup
β βββ airflow/
β β βββ Dockerfile # Dockerfile for the Airflow service
β β βββ requirements.txt # Python dependencies for the Airflow service
βββ docker-compose.yaml # Docker Compose configuration for services
βββ mdp_config.json # Configuration file for the pipeline
βββ requirements.txt # Python dependencies for the entire project
βββ tests/ # Test cases for the application's components
β βββ dags_test.py # Tests for the Airflow DAGs
β βββ test_crypto_api_client.py # Tests for the CryptoApiClient class
β βββ test_stock_api_client.py # Tests for the StockApiClient class
β βββ test_base_api_client.py # Tests for the BaseApiClient class (to be done)
β βββ test_data_processor.py # Tests for the DataProcessor class (to be done)
β βββ test_storage.py # Tests for the Storage class
βββ utils/ # Utility functions and helper scripts
βββ __init__.py
βββ market_data_processor_utils.py # Utility functions for the MarketTrackPipe
The core
directory is where the main logic of the project resides:
base_api.py
: Hosts theBaseApiClient
abstract base class that serves as the foundation for all API clients. It also contains theApiClientFactory
for creating instances of API clients dynamically.crypto_api_client.py
: Implements theCryptoApiClient
class, inheriting fromBaseApiClient
and fetching cryptocurrency data using the CoinMarketCap API.data_processor.py
: TheDataProcessor
class within orchestrates the retrieval and storage of market data, coordinating between API clients and storage mechanisms.stock_api_client.py
: TheStockApiClient
class, also inheriting fromBaseApiClient
, pulls stock data from APIs such as Alpha Vantage and Financial Modeling Prep.storage.py
: TheStorage
class handles all interactions with the database, ensuring market data is stored reliably and efficiently.
The custom
directory is designed to house custom user-defined modules:
-
Place any custom API client modules here, and they will be dynamically loaded into MarketPipe based on your
mdp_config.json
settings.Note: Any custom API client must inherit from the
BaseApiClient
class.
In the dags
directory, you'll find Airflow DAGs that define the project's workflow automation:
market_data_dag.py
: Contains the DAG definition for orchestrating the market data collection and storage processes.
The database_setup
directory has the SQL scripts needed to get your database up and ready:
init.sql
: This initialization script sets up the database schema, creating the necessary tables and relationships for MarketPipe.
The docker
directory includes Dockerfiles and related configurations for building the project's containers:
airflow/Dockerfile
: Defines the Airflow service's container, specifying the required Python environment and dependencies.airflow/requirements.txt
: Lists all Python packages needed by the Airflow service.
The docker-compose.yaml
file orchestrates the setup of all related services, ensuring that each container is configured correctly and linked properly for the pipeline to function.
The tests
directory provides a suite of unit tests that ensure each component of MarketPipe is functioning as expected:
dags_test.py
: Tests the Airflow DAGs to verify that they execute without errors.test_data_processor.py
: Validates theDataProcessor
class and ensures data is processed accurately. (to be done)test_base_api_client.py
: Tests theBaseApiClient
abstract base class to ensure it functions as expected. (to be done)test_stock_api_client.py
: Tests theStockApiClient
class to ensure it retrieves stock data correctly.test_crypto_api_client.py
: Tests theCryptoApiClient
class to ensure it retrieves cryptocurrency data correctly.
The utils
directory contains helper scripts and utility functions that aid various aspects of the pipeline:
market_data_processor_utils.py
: Offers utility functions for common data processing tasks within MarketPipe.
By understanding each component and how they fit into the MarketPipe ecosystem, contributors can effectively navigate the project and add value through their enhancements and features.
- Docker
- pre-commit (Developer)
-
Clone the repository:
git clone https://github.com/abeltavares/marketpipe.git
-
Environment Setup:
- Configure your
.env
from.env.example
.
-
Fire It Up:
docker-compose up
After setting up the workflow, you can access the Apache Airflow web UI to monitor the status of the tasks and the overall workflow.
To access the data stored in the PostgreSQL database you can go with:
Command-line tool psql
: You can use psql
to run SQL queries directly. Find the database credentials and connection information in the '.env' file. Use the following command in your terminal to connect to the database:
docker exec -it [host] psql -U [user] -d market_data
Make it yours by tweaking the mdp_config.json
:
{
"owner": "airflow",
"email_on_failure": false,
"email_on_retry": false,
"retries": 0,
"clients": {
"stocks": {
"module": "core.stock_api_client",
"class": "StockApiClient"
},
"cryptos":{
"module": "core.crypto_api_client",
"class": "CryptoApiClient"
}
},
"assets": {
"stocks": {
"symbols": [
"AAPL",
"GOOG",
"MSFT"
],
"schedule_interval": "29 22 * * *"
},
"cryptos": {
"symbols": [
"BTC",
"ETH"
],
"schedule_interval": "29 22 * * *"
}
}
}
To add a new API client:
- Create a new Python file under the
custom/
directory. - Implement an API client class extending
BaseApiClient
. - Update
mdp_config.json
to register your new client class.
-
Front-End Dashboard: Developing a sleek, user-friendly dashboard to visualize data trends and insights.
-
Broader Data Source Support: Expanding beyond financial data into other use cases.
-
Cloud Infrastructure Integration: Integrate cloud infrastructure provisioning.
-
Advanced Data Storage Solutions: Integrate ingestion of data into configurable lakes and data warehouses.
- Add tests for the DataProcessor class
- Add tests for the BaseApiClient class
This project is open to contributions. If you have any suggestions or improvements, please feel free to create a pull request.
The codebase of this project follows the black code style. To ensure consistent formatting, the pre-commit hook is set up to run the black formatter before each commit.
Additionally, a GitHub Action is configured to automatically run the black formatter on every pull request, ensuring that the codebase remains formatted correctly.
Please make sure to run pip install pre-commit
and pre-commit install
as mentioned in the setup instructions to enable the pre-commit hook on your local development environment.
Contributors are encouraged to follow the black code style guidelines when making changes to the codebase.
Please make sure to check out our Contribution Guidelines before making a pull request.
This project is licensed under the MIT License.