Composable data transformation pipeline with lazy evaluation.
pip install philiprehberger-data-pipelineComposable data transformation pipeline with lazy evaluation.
pip install philiprehberger-data-pipeline
from philiprehberger_data_pipeline import Pipeline
data = [
{"name": " Alice ", "email": "alice@example.com", "status": "active", "age": 30},
{"name": "Bob", "email": "bob@example.com", "status": "inactive", "age": 25},
{"name": "Alice", "email": "alice@example.com", "status": "active", "age": 30},
]
result = (
Pipeline(data)
.filter(lambda r: r["status"] == "active")
.map(lambda r: {**r, "name": r["name"].strip()})
.unique_by("email")
.sort_by("name")
.collect()
)
from philiprehberger_data_pipeline import Pipeline
clean_users = (
Pipeline.define()
.filter(lambda r: r.get("email"))
.map(lambda r: {**r, "email": r["email"].lower()})
.unique_by("email")
)
active = clean_users.run(active_users)
archived = clean_users.run(archived_users)
from philiprehberger_data_pipeline import Pipeline
result = (
Pipeline([1, 2, 3])
.tap(lambda x: print(f"Processing: {x}"))
.map(lambda x: x * 2)
.collect()
)
# Prints each item without altering the data
from philiprehberger_data_pipeline import Pipeline
result = (
Pipeline([1, 2, 3])
.branch(
lambda p: p.map(lambda x: x * 2).collect(),
lambda p: p.filter(lambda x: x > 1).collect(),
)
.collect()
)
# [2, 4, 6, 2, 3]
from philiprehberger_data_pipeline import Pipeline, retry
def fetch_url(url):
# might fail transiently
return requests.get(url).text
result = Pipeline(urls).map(retry(fetch_url, attempts=3, delay=1.0)).collect()
from philiprehberger_data_pipeline import Pipeline
clean = Pipeline.define().filter(lambda x: x > 0).map(lambda x: x * 2)
limit = Pipeline.define().take(3)
combined = clean + limit
combined.run([−1, 5, 0, 3, 7, 2])
# [10, 6, 14]
from philiprehberger_data_pipeline import Pipeline
log = (
Pipeline([1, 2, 3, 4])
.filter(lambda x: x > 2)
.map(lambda x: x * 10)
.dry_run()
)
# [{"step": 0, "name": "filter", "input": [1,2,3,4], "output": [3,4]},
# {"step": 1, "name": "map", "input": [3,4], "output": [30,40]}]
from philiprehberger_data_pipeline import Pipeline
Pipeline([1, 2, 3, 4, 5]).window(3, 1).collect()
# [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
from philiprehberger_data_pipeline import Pipeline
p = Pipeline(sales_data)
total = p.sum("amount")
average = p.avg("amount")
grouped = p.group_by("category")
from philiprehberger_data_pipeline import Pipeline
Pipeline(data).filter(lambda x: x["active"]).to_csv("output.csv")
Pipeline(data).filter(lambda x: x["active"]).to_json("output.json")
| Function / Class | Description |
|---|---|
Pipeline(data) | Composable, lazy data transformation pipeline |
.filter(fn) | Keep items where fn returns True |
.map(fn) | Transform each item |
.flat_map(fn) | Transform and flatten |
.flatten() | Flatten one level of nesting |
.sort_by(key) | Sort by key (string or callable) |
.unique_by(key) | Remove duplicates by key |
.take(n) | Take first n items |
.skip(n) | Skip first n items |
.chunk(size) | Split into chunks |
.each(fn) | Execute side effect for each item |
.tap(fn) | Side effect without altering data, skipped in dry run |
.window(size, step) | Sliding window grouping |
.deduplicate() | Remove duplicate items preserving order |
.branch(*fns) | Split into parallel branches and merge results |
.dry_run(data) | Log each step's input/output without side effects |
pipeline_a + pipeline_b | Compose two pipelines into one |
.collect() | Execute and return list |
.first() | Return first item |
.count() | Count items |
.sum(key) | Sum values |
.avg(key) | Average values |
.min(key) | Find minimum value |
.max(key) | Find maximum value |
.reduce(fn, initial) | Reduce to single value |
.group_by(key) | Group into dict |
.to_csv(path) | Export as CSV |
.to_json(path) | Export as JSON |
.enumerate(start) | Pair each item with its index |
.zip_with(other) | Pair items with another iterable |
.take_while(fn) | Take items while predicate is True |
.skip_while(fn) | Skip items while predicate is True |
retry(fn, attempts, delay, on_error) | Wrap a step function with configurable retry logic |
pip install -e .
python -m pytest tests/ -v
If you find this project useful: