A high-performance Python package for efficiently loading large CSV datasets into PostgreSQL databases. Features chunked processing, automatic resume capability, and comprehensive error handling.
- 🚀 High Performance: Optimized chunk-based processing for handling large datasets efficiently
- 🔄 Resume Capability: Automatically resume interrupted imports from the last successful position
- 🛡️ Error Resilience: Comprehensive error handling with detailed logging and failed row tracking
- 🔍 Data Validation: Preview data before import and validate row structure
- 📊 Progress Tracking: Real-time progress updates with ETA and processing speed
- 🔄 Duplicate Handling: Smart handling of duplicate records
- 🔌 Connection Pooling: Efficient database connection management
- 📝 Detailed Logging: Comprehensive logging of all operations and errors
pip install bulkflow
- Create a database configuration file (
db_config.json
):
{
"dbname": "your_database",
"user": "your_username",
"password": "your_password",
"host": "localhost",
"port": "5432"
}
- Run the import:
bulkflow path/to/your/file.csv your_table_name
bulkflow/
├── src/
│ ├── models/ # Data models
│ ├── processors/ # Core processing logic
│ ├── database/ # Database operations
│ └── utils/ # Utility functions
from bulkflow import process_file
db_config = {
"dbname": "your_database",
"user": "your_username",
"password": "your_password",
"host": "localhost",
"port": "5432"
}
process_file(file_path, db_config, table_name)
# Basic usage
bulkflow data.csv target_table
# Custom config file
bulkflow data.csv target_table --config my_config.json
BulkFlow provides comprehensive error handling:
-
Failed Rows File:
failed_rows_YYYYMMDD_HHMMSS.csv
- Records individual row failures
- Includes row number, content, error reason, and timestamp
-
Import State File:
import_state.json
- Tracks overall import progress
- Enables resume capability
- Records failed chunk information
BulkFlow automatically optimizes performance by:
- Calculating optimal chunk sizes based on available memory
- Using connection pooling for database operations
- Implementing efficient duplicate handling strategies
- Minimizing memory usage through streaming processing
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
- Fork the repository
- Create your feature branch (
git checkout -b feature/AmazingFeature
) - Commit your changes (
git commit -m 'Add some AmazingFeature'
) - Push to the branch (
git push origin feature/AmazingFeature
) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.
- Inspired by the need for robust, production-ready data import solutions
- Built with modern Python best practices
- Designed for real-world use cases and large-scale data processing
If you encounter any issues or have questions:
- Check the Issues page
- Create a new issue if your problem isn't already listed
- Provide as much context as possible in your issue description
- Try to fix the issue yourself and submit a Pull Request if you can
Created and maintained by Chris Willingham
The majority of this project's code was generated using AI assistance, specifically:
- Cline - AI coding assistant
- Claude 3.5 Sonnet (new) - Large language model by Anthropic
- In fact... the entire project was generated by AI, i'm kinda freakin out right now!
- even the name was generated by AI... I'm not sure if I count as the author, all hail our robot overlords!