A Python library for building and running directed graphs of operations, with support for both synchronous and asynchronous execution. It's a lightweight alternative to LangGraph with no dependencies other than the official typing_extensions.
- Graph-based workflows: Build flexible, directed workflows where nodes are customizable tasks
- Synchronous & asynchronous support: Define both sync and async nodes without any external dependencies
- Type safety: Built-in type validation ensures type consistency throughout the workflow
- Error handling: Configurable error handling and retry policies
- Branching logic: Support for conditional branches with async conditions
- State management: Proper state persistence between nodes
- Callback support: Configurable callbacks for monitoring execution progress
- Generic types: Support for generic types in workflow state
- Cycle support: By default, cycles are allowed. Use
enforce_acyclic=Trueto enforce a DAG structure
The State class is a fundamental component that represents the data flowing through your workflow. It's designed to be explicit about what you're tracking:
@dataclass
class State(Generic[T]):
value: T = field() # Required field - represents the main data being processed
current_node: str | None = field(default=None) # Current node in execution
trajectory: list[str] = field(default_factory=list) # Execution path
errors: list[str] = field(default_factory=list) # Error historyKey design decisions:
valueis a required field because it represents the explicit data you want to track through your workflow- The type parameter
Tallows you to specify exactly what kind of data your workflow processes - Additional fields (
current_node,trajectory,errors) track workflow execution metadata - The class is immutable - all updates return a new instance via
updated()
Example of a custom state:
@dataclass
class ChatState(State[Dict[str, Any]]):
"""State for chat workflow."""
value: Dict[str, Any] = field(default_factory=dict) # Override with default
messages: List[Dict[str, Any]] = field(default_factory=list)
temperature: float = 0
model: str = "gpt-4"pip install workflow-graph- Clone the repository:
git clone https://github.com/dextersjab/workflow-graph.git
cd workflow-graph- Create and activate a virtual environment:
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate- Install the package in development mode with all dependencies:
# Install with development tools (black, isort, flake8, mypy, pytest)
pip install -e ".[dev]"
# Install with example dependencies (pydantic)
pip install -e ".[dev,examples]"- Run the tests:
pytestpip install workflow-graph --preNote: The alpha/beta versions may include breaking changes as the API stabilizes.
Add the following line to your requirements.txt:
git+https://github.com/dextersjab/workflow-graph.git@main
Then, run:
pip install -r requirements.txtOr install directly using pip:
pip install git+https://github.com/dextersjab/workflow-graph.git@mainHere's how to create a simple workflow with state management and conditional paths:
from workflow_graph import WorkflowGraph, START, END, State
# Define your state class
NumberProcessingState = State[int]
# Define basic nodes that work with state
def add_one(state: NumberProcessingState) -> NumberProcessingState:
return state.updated(value=state.value + 1)
def calculate_parity(state: NumberProcessingState) -> bool:
return state
def even_number(state: NumberProcessingState) -> NumberProcessingState:
return state.updated(value=f"{state.value} ==> Even")
def odd_number(state: NumberProcessingState) -> NumberProcessingState:
return state.updated(value=f"{state.value} ==> Odd")
# Create the WorkflowGraph
workflow = WorkflowGraph()
# Add nodes to the graph
workflow.add_node("add_one", add_one)
workflow.add_node("calculate_parity", calculate_parity)
workflow.add_node("even_number", even_number)
workflow.add_node("odd_number", odd_number)
# Define edges for the main workflow
workflow.add_edge(START, "add_one")
workflow.add_edge("add_one", "calculate_parity")
# Define conditional edges based on whether the number is even or odd
workflow.add_conditional_edges(
"calculate_parity",
lambda state: state.value % 2 == 0,
path_map={True: "even_number", False: "odd_number"}
)
# Set finish points
workflow.add_edge("even_number", END)
workflow.add_edge("odd_number", END)
# Execute the workflow
initial_state = NumberProcessingState(value=5)
result = workflow.execute(initial_state)
print(result.value) # Output: "6 ==> Odd"This example creates a workflow that:
- Takes a number as input
- Adds one to it
- Checks if the result is even or odd
- Branches to different handlers based on the result
flowchart TD
__start__([Start]) --> add_one[Add one]
add_one --> parity[Calculate parity]
parity -.->|Yes| even[Even number]
parity -.->|No| odd[Odd number]
even --> __end__([End])
odd --> __end__
import asyncio
from workflow_graph import WorkflowGraph, START, END, State
@dataclass
class NumberState(State[int]):
"""State for number processing workflow."""
pass
async def async_operation(state: NumberState) -> NumberState:
await asyncio.sleep(0.1)
return state.updated(value=state.value + 1)
graph = WorkflowGraph()
graph.add_node("async_op", async_operation)
graph.add_edge(START, "async_op")
graph.add_edge("async_op", END)
initial_state = NumberState(value=1)
result = await graph.execute_async(initial_state)
assert result.value == 2WorkflowGraph supports built-in error handling and retry capabilities:
def failing_operation(state: NumberState) -> NumberState:
raise ValueError("Operation failed")
def error_handler(error: Exception, state: NumberState) -> NumberState:
return state.updated(value=-1).add_error(error, "failing_op")
graph = WorkflowGraph()
graph.add_node(
"failing_op",
failing_operation,
retries=2,
backoff_factor=0.1,
error_handler=error_handler
)
graph.add_edge(START, "failing_op")
graph.add_edge("failing_op", END)
result = graph.execute(NumberState(value=1))
assert result.value == -1
assert len(result.errors) > 0def process_data(state: NumberState) -> NumberState:
return state.updated(value=state.value * 10)
def callback(result: NumberState):
print(f"Processed result: {result.value}")
graph = WorkflowGraph()
graph.add_node("process", process_data, callback=callback)
graph.add_edge(START, "process")
graph.add_edge("process", END)
graph.execute(NumberState(value=5)) # Prints: Processed result: 50WorkflowGraph includes built-in support for generating Mermaid diagrams to visualise your workflow:
# Generate Mermaid diagram code
mermaid_code = graph.to_mermaid()
print(mermaid_code)The generated diagram follows the convention of using dashed lines (-.->), rather than decision nodes, to represent conditional branches. This provides a cleaner and more accurate representation of how the workflow behaves.
Mermaid diagrams can be rendered in:
- GitHub Markdown (just paste the code)
- VS Code (with the Mermaid extension)
- Web browsers (using the Mermaid Live Editor)
- Many other tools that support Mermaid
The library is organised into the following modules:
- workflow_graph: Main package
- constants.py: Defines constants like START and END
- models.py: Defines data structures like NodeSpec and Branch
- builder.py: Contains the WorkflowGraph class for building graphs
- executor.py: Contains the CompiledGraph class for executing workflows
- exceptions.py: Contains custom exceptions for better error handling
Contributions are welcome! Please feel free to submit a Pull Request. For development guidelines, see CONTRIBUTING.md.
This project is licensed under the MIT License - see the LICENSE file for details.