process and transform various types of documents, including PDFs and HTML files, into different formats such as JSON and Markdown. This pipeline uses Prefect for orchestration.
- PDF to JSON Conversion: Reads PDFs, chunks the text, and writes it to a JSON file.
- HTML to Markdown Conversion: Reads HTML files, converts them to Markdown, and writes the output to a file.
- PDF to API and JSON Conversion: Reads PDFs, sends them to an API, and writes the API response to a JSON file.
Each step in the data processing pipeline is encapsulated as a Prefect task, for modularity and reusability. The pipeline is defined using YAML configuration files, which specify the flow and dependencies of tasks.
To get started with the RAG Data Pipeline, follow these steps:
-
Clone the repository:
git clone git@github.com:archit15singh/rag-data-pipeline.git cd rag-data-pipeline
-
Set up a virtual environment and install dependencies:
python -m venv venv source venv/bin/activate pip install -r requirements.txt
To run the individual pipelines, use the main.py
script.
-
Run Individual Pipelines:
python src/main.py
This will execute all individual pipelines defined in the
main.py
script.
The YAML configuration files define the flow of tasks and their dependencies. Below is an example of a pipeline configuration:
flow_name: Pipeline
parameters:
- name: pdf_path
- name: html_path
- name: json_output_path
- name: markdown_output_path
- name: api_response_output_path
tasks:
- name: read_pdf
params:
file_path: pdf_path
- name: chunk_text
params:
text: read_pdf
depends_on: read_pdf
- name: write_json
params:
data: chunk_text
output_path: json_output_path
depends_on: chunk_text
- name: read_html
params:
file_path: html_path
- name: convert_html_to_markdown
params:
html_content: read_html
depends_on: read_html
parallel: true
- name: write_file
params:
content: convert_html_to_markdown
output_path: markdown_output_path
depends_on: convert_html_to_markdown
- name: call_api
params:
file_name: pdf_path
depends_on: read_pdf
parallel: true
- name: write_api_response
params:
data: call_api
output_path: api_response_output_path
depends_on: call_api
Reads a PDF file and extracts the text.
Chunks the extracted text into smaller pieces.
Writes the chunked text into a JSON file.
Reads an HTML file and extracts its content.
Converts HTML content to Markdown format.
Writes the converted Markdown content to a file.
Simulates an API call with the PDF file.
Writes the API response to a JSON file.
The pipeline uses Prefect's built-in retry mechanisms to handle transient errors. Each task can be configured with a number of retries and a delay between retries.
Example:
@task(retries=3, retry_delay_seconds=10)
def read_pdf(file_path: str):
# Task implementation
prefect server start