Examples
Complete Python SDK examples for common pipeline patterns
Complete pipeline examples from simple to advanced. All examples are copy-pasteable and use module-level node functions.
Preview the YAML output of any example:
brokoli compile pipeline.pyDeploy to a server:
brokoli deploy pipeline.py --server http://localhost:80801. API to File (Simple)
Fetch posts from a public REST API, rename columns, and save as CSV.
from brokoli import Pipeline, source_api, transform, sink_file
with Pipeline("api-to-csv", description="Export posts to CSV") as p:
posts = source_api(
"Fetch Posts",
url="https://jsonplaceholder.typicode.com/posts",
method="GET",
)
clean = transform("Clean Posts", rules=[
{"type": "drop_columns", "columns": ["userId"]},
{"type": "rename_columns", "mapping": {"id": "post_id", "title": "post_title"}},
])
output = sink_file("Save CSV", path="/data/posts.csv", format="csv")
posts >> clean >> output2. Two DB Sources + Join + Quality Check + DB Sink (Medium)
Load users and orders from two database connections, join them, run quality checks, and write an aggregated summary to a warehouse.
from brokoli import (
Pipeline, source_db, source_api, join, quality_check, transform, sink_db,
)
with Pipeline(
"user-orders",
description="Join users with orders and load to warehouse",
schedule="0 2 * * *",
schedule_timezone="America/New_York",
pipeline_id="user-orders-etl",
) as p:
# --- Sources ---
users = source_db(
"Load Users",
conn_id="prod-postgres",
query="SELECT id AS user_id, name, email, created_at FROM users WHERE active = true",
)
orders = source_api(
"Fetch Orders",
url="https://api.example.com/orders?status=completed",
headers={"Authorization": "Bearer ${var.orders_api_token}"},
json_path="data",
)
# --- Join on user_id (inner join) ---
joined = join(
"Match Users to Orders",
left="Load Users",
right="Fetch Orders",
on="user_id",
how="inner",
)
# --- Quality gate ---
quality = quality_check("Validate", rules=[
{"column": "", "rule": "row_count", "params": {"min": 1}, "on_failure": "block"},
{"column": "user_id", "rule": "not_null", "on_failure": "block"},
{"column": "email", "rule": "regex", "params": {"pattern": "^.+@.+$"}, "on_failure": "warn"},
{"column": "amount", "rule": "range", "params": {"min": 0, "max": 100000}, "on_failure": "block"},
])
# --- Aggregate ---
summary = transform("Summarize", rules=[{
"type": "aggregate",
"group_by": ["user_id", "name", "email"],
"agg_fields": [
{"column": "amount", "function": "sum", "alias": "total_spent"},
{"column": "order_id", "function": "count", "alias": "order_count"},
],
}])
# --- Sink ---
warehouse = sink_db(
"Load Warehouse",
conn_id="warehouse-pg",
table="user_order_summary",
mode="upsert",
conflict_key="user_id",
)
# --- DAG ---
users >> joined
orders >> joined
joined >> quality >> summary >> warehouse3. Custom Python Logic with @task (Advanced)
Use the @task decorator to wrap arbitrary Python functions as pipeline nodes. Each decorated function receives rows, columns, config, and params, and must return {"columns": [...], "rows": [...]}.
from brokoli import Pipeline, source_db, quality_check, sink_db, task
import sys
@task
def calculate_metrics(rows, columns, config, params):
"""Calculate engagement score and segment users."""
print(f"Processing {len(rows)} users", file=sys.stderr)
new_columns = columns + ["engagement_score", "segment"]
new_rows = []
for row in rows:
logins = float(row.get("login_count", 0))
actions = float(row.get("action_count", 0))
days_active = float(row.get("days_active", 1))
score = (logins * 0.3 + actions * 0.5 + days_active * 0.2) / max(days_active, 1)
row["engagement_score"] = round(score, 2)
if score >= 8:
row["segment"] = "power_user"
elif score >= 4:
row["segment"] = "active"
elif score >= 1:
row["segment"] = "casual"
else:
row["segment"] = "dormant"
new_rows.append(row)
print(f"Segmented: {len(new_rows)} users", file=sys.stderr)
return {"columns": new_columns, "rows": new_rows}
@task
def flag_anomalies(rows, columns, config, params):
"""Flag users with suspiciously high activity using simple z-score detection."""
if "anomaly" not in columns:
columns.append("anomaly")
scores = [float(r.get("engagement_score", 0)) for r in rows]
if not scores:
return {"columns": columns, "rows": rows}
mean = sum(scores) / len(scores)
std = (sum((s - mean) ** 2 for s in scores) / len(scores)) ** 0.5
for row in rows:
score = float(row.get("engagement_score", 0))
row["anomaly"] = std > 0 and abs(score - mean) > 2 * std
return {"columns": columns, "rows": rows}
with Pipeline(
"user-segmentation",
description="Segment users by engagement and flag anomalies",
schedule="0 4 * * *",
pipeline_id="user-segmentation",
params={"min_days": "7"},
) as p:
source = source_db(
"Load Activity",
conn_id="analytics-pg",
query="""
SELECT user_id, login_count, action_count, days_active
FROM user_activity
WHERE days_active >= ${param.min_days}
""",
)
metrics = calculate_metrics("Calculate Metrics")
anomalies = flag_anomalies("Flag Anomalies")
quality = quality_check("Validate", rules=[
{"column": "engagement_score", "rule": "not_null", "on_failure": "block"},
{"column": "segment", "rule": "not_null", "on_failure": "block"},
{"column": "engagement_score", "rule": "min", "params": {"min": 0}, "on_failure": "warn"},
])
sink = sink_db(
"Save Segments",
conn_id="warehouse-pg",
table="user_segments",
mode="upsert",
conflict_key="user_id",
)
source >> metrics >> anomalies >> quality >> sink4. dbt Integration with Notifications
Load source data, run a dbt model, and send a Slack notification on completion.
from brokoli import Pipeline, source_db, dbt, notify
with Pipeline(
"dbt-daily-build",
description="Run dbt models after source refresh and notify team",
schedule="0 6 * * *",
pipeline_id="dbt-daily-build",
) as p:
source = source_db(
"Refresh Staging",
conn_id="prod-postgres",
query="SELECT 1", # lightweight check that the source DB is reachable
)
build = dbt(
"Run dbt Models",
project_dir="/opt/dbt/my_project",
command="run",
select="tag:daily",
profiles_dir="/opt/dbt/profiles",
)
alert = notify(
"Slack Notification",
channel="data-team",
message="dbt daily build completed for pipeline ${pipeline.name}",
on_failure_message="dbt daily build FAILED -- check logs",
)
source >> build >> alert5. Conditional Branching with @condition
Use the @condition decorator to route data down different paths. The true branch is the first >> from the condition node; the false branch is the second.
from brokoli import Pipeline, source_api, transform, sink_file, sink_db, notify, condition
@condition
def has_data(rows, columns, config, params):
"""Returns True when the dataset is non-empty."""
return len(rows) > 0
@condition
def is_large_batch(rows, columns, config, params):
"""Returns True when there are more than 1000 rows."""
return len(rows) > 1000
with Pipeline(
"conditional-routing",
description="Route events by volume to the right destination",
pipeline_id="conditional-routing",
) as p:
source = source_api(
"Fetch Events",
url="https://jsonplaceholder.typicode.com/comments",
method="GET",
)
check = has_data("Has Events?")
size_check = is_large_batch("Large Batch?")
# Small batch path: save to a local JSON file
small_sink = sink_file("Save Small Batch", path="/data/events_small.json", format="json")
# Large batch path: load into the warehouse
large_sink = sink_db(
"Save Large Batch",
conn_id="warehouse-pg",
table="events",
mode="append",
)
# Empty path: alert the team
alert = notify(
"Alert Empty",
channel="data-alerts",
message="No events returned from the events API",
)
# --- DAG ---
source >> check
check >> size_check # true branch: has data -> check size
check >> alert # false branch: no data -> alert
size_check >> large_sink # true branch: large batch -> DB
size_check >> small_sink # false branch: small batch -> file