Lightweight task dependency engine with topological execution
pip install philiprehberger-task-graphLightweight task dependency engine with topological execution.
pip install philiprehberger-task-graph
from philiprehberger_task_graph import TaskGraph
graph = TaskGraph()
@graph.task()
def fetch_data():
return download()
@graph.task(depends=["fetch_data"])
def process_data():
return transform()
@graph.task(depends=["process_data"])
def save_results():
return store()
# Run tasks in dependency order
results = graph.run()
# Or run with parallelism
results = graph.run_parallel(max_workers=4)
graph = TaskGraph()
graph.add_task("fetch", fetch_fn)
graph.add_task("process", process_fn, depends=["fetch"])
graph.add_task("save", save_fn, depends=["process"])
# Preview execution order
order = graph.dry_run()
# ["fetch", "process", "save"]
Set a maximum execution time for a task. Raises TimeoutError if the task exceeds the limit.
@graph.task(timeout=30.0)
def slow_task():
return long_running_operation()
# Or with add_task
graph.add_task("fetch", fetch_fn, timeout=10.0)
Automatically retry a task on failure. The task is retried up to N times before the exception propagates.
@graph.task(retries=3)
def flaky_task():
return call_unreliable_api()
# Or with add_task
graph.add_task("fetch", fetch_fn, retries=2)
@graph.task(timeout=5.0, retries=2)
def resilient_task():
return fetch_with_deadline()
Pass each dependency's return value as a keyword argument to the task function (named after the dependency).
graph = TaskGraph()
graph.add_task("source", lambda: 7)
graph.add_task("double", lambda source: source * 2, depends=["source"])
results = graph.run(pass_results=True)
# {"source": 7, "double": 14}
from philiprehberger_task_graph import CycleError
# Raises CycleError if dependencies form a cycle
graph.run()
| Function / Class | Description |
|---|---|
TaskGraph() | Create a new task graph |
@graph.task(name=None, depends=None, timeout=None, retries=0) | Decorator to register a task with optional timeout and retries |
graph.add_task(name, fn, depends=None, timeout=None, retries=0) | Add a task programmatically |
graph.run(pass_results=False) | Execute tasks in topological order; optionally pass dep results as kwargs |
graph.run_parallel(max_workers=None, pass_results=False) | Execute with thread parallelism |
graph.dry_run() | Return execution order without running |
CycleError | Raised when a dependency cycle is detected |
pip install -e .
python -m pytest tests/ -v
If you find this project useful: