Brokoli
Python SDK

Examples

Complete Python SDK examples for common pipeline patterns

Complete pipeline examples from simple to advanced. All examples are copy-pasteable and use module-level node functions.

Preview the YAML output of any example:

brokoli compile pipeline.py

Deploy to a server:

brokoli deploy pipeline.py --server http://localhost:8080

1. API to File (Simple)

Fetch posts from a public REST API, rename columns, and save as CSV.

from brokoli import Pipeline, source_api, transform, sink_file

with Pipeline("api-to-csv", description="Export posts to CSV") as p:
    posts = source_api(
        "Fetch Posts",
        url="https://jsonplaceholder.typicode.com/posts",
        method="GET",
    )

    clean = transform("Clean Posts", rules=[
        {"type": "drop_columns", "columns": ["userId"]},
        {"type": "rename_columns", "mapping": {"id": "post_id", "title": "post_title"}},
    ])

    output = sink_file("Save CSV", path="/data/posts.csv", format="csv")

    posts >> clean >> output

2. Two DB Sources + Join + Quality Check + DB Sink (Medium)

Load users and orders from two database connections, join them, run quality checks, and write an aggregated summary to a warehouse.

from brokoli import (
    Pipeline, source_db, source_api, join, quality_check, transform, sink_db,
)

with Pipeline(
    "user-orders",
    description="Join users with orders and load to warehouse",
    schedule="0 2 * * *",
    schedule_timezone="America/New_York",
    pipeline_id="user-orders-etl",
) as p:
    # --- Sources ---
    users = source_db(
        "Load Users",
        conn_id="prod-postgres",
        query="SELECT id AS user_id, name, email, created_at FROM users WHERE active = true",
    )

    orders = source_api(
        "Fetch Orders",
        url="https://api.example.com/orders?status=completed",
        headers={"Authorization": "Bearer ${var.orders_api_token}"},
        json_path="data",
    )

    # --- Join on user_id (inner join) ---
    joined = join(
        "Match Users to Orders",
        left="Load Users",
        right="Fetch Orders",
        on="user_id",
        how="inner",
    )

    # --- Quality gate ---
    quality = quality_check("Validate", rules=[
        {"column": "", "rule": "row_count", "params": {"min": 1}, "on_failure": "block"},
        {"column": "user_id", "rule": "not_null", "on_failure": "block"},
        {"column": "email", "rule": "regex", "params": {"pattern": "^.+@.+$"}, "on_failure": "warn"},
        {"column": "amount", "rule": "range", "params": {"min": 0, "max": 100000}, "on_failure": "block"},
    ])

    # --- Aggregate ---
    summary = transform("Summarize", rules=[{
        "type": "aggregate",
        "group_by": ["user_id", "name", "email"],
        "agg_fields": [
            {"column": "amount", "function": "sum", "alias": "total_spent"},
            {"column": "order_id", "function": "count", "alias": "order_count"},
        ],
    }])

    # --- Sink ---
    warehouse = sink_db(
        "Load Warehouse",
        conn_id="warehouse-pg",
        table="user_order_summary",
        mode="upsert",
        conflict_key="user_id",
    )

    # --- DAG ---
    users >> joined
    orders >> joined
    joined >> quality >> summary >> warehouse

3. Custom Python Logic with @task (Advanced)

Use the @task decorator to wrap arbitrary Python functions as pipeline nodes. Each decorated function receives rows, columns, config, and params, and must return {"columns": [...], "rows": [...]}.

from brokoli import Pipeline, source_db, quality_check, sink_db, task
import sys

@task
def calculate_metrics(rows, columns, config, params):
    """Calculate engagement score and segment users."""
    print(f"Processing {len(rows)} users", file=sys.stderr)

    new_columns = columns + ["engagement_score", "segment"]
    new_rows = []

    for row in rows:
        logins = float(row.get("login_count", 0))
        actions = float(row.get("action_count", 0))
        days_active = float(row.get("days_active", 1))

        score = (logins * 0.3 + actions * 0.5 + days_active * 0.2) / max(days_active, 1)
        row["engagement_score"] = round(score, 2)

        if score >= 8:
            row["segment"] = "power_user"
        elif score >= 4:
            row["segment"] = "active"
        elif score >= 1:
            row["segment"] = "casual"
        else:
            row["segment"] = "dormant"

        new_rows.append(row)

    print(f"Segmented: {len(new_rows)} users", file=sys.stderr)
    return {"columns": new_columns, "rows": new_rows}


@task
def flag_anomalies(rows, columns, config, params):
    """Flag users with suspiciously high activity using simple z-score detection."""
    if "anomaly" not in columns:
        columns.append("anomaly")

    scores = [float(r.get("engagement_score", 0)) for r in rows]
    if not scores:
        return {"columns": columns, "rows": rows}

    mean = sum(scores) / len(scores)
    std = (sum((s - mean) ** 2 for s in scores) / len(scores)) ** 0.5

    for row in rows:
        score = float(row.get("engagement_score", 0))
        row["anomaly"] = std > 0 and abs(score - mean) > 2 * std

    return {"columns": columns, "rows": rows}


with Pipeline(
    "user-segmentation",
    description="Segment users by engagement and flag anomalies",
    schedule="0 4 * * *",
    pipeline_id="user-segmentation",
    params={"min_days": "7"},
) as p:
    source = source_db(
        "Load Activity",
        conn_id="analytics-pg",
        query="""
            SELECT user_id, login_count, action_count, days_active
            FROM user_activity
            WHERE days_active >= ${param.min_days}
        """,
    )

    metrics = calculate_metrics("Calculate Metrics")
    anomalies = flag_anomalies("Flag Anomalies")

    quality = quality_check("Validate", rules=[
        {"column": "engagement_score", "rule": "not_null", "on_failure": "block"},
        {"column": "segment", "rule": "not_null", "on_failure": "block"},
        {"column": "engagement_score", "rule": "min", "params": {"min": 0}, "on_failure": "warn"},
    ])

    sink = sink_db(
        "Save Segments",
        conn_id="warehouse-pg",
        table="user_segments",
        mode="upsert",
        conflict_key="user_id",
    )

    source >> metrics >> anomalies >> quality >> sink

4. dbt Integration with Notifications

Load source data, run a dbt model, and send a Slack notification on completion.

from brokoli import Pipeline, source_db, dbt, notify

with Pipeline(
    "dbt-daily-build",
    description="Run dbt models after source refresh and notify team",
    schedule="0 6 * * *",
    pipeline_id="dbt-daily-build",
) as p:
    source = source_db(
        "Refresh Staging",
        conn_id="prod-postgres",
        query="SELECT 1",  # lightweight check that the source DB is reachable
    )

    build = dbt(
        "Run dbt Models",
        project_dir="/opt/dbt/my_project",
        command="run",
        select="tag:daily",
        profiles_dir="/opt/dbt/profiles",
    )

    alert = notify(
        "Slack Notification",
        channel="data-team",
        message="dbt daily build completed for pipeline ${pipeline.name}",
        on_failure_message="dbt daily build FAILED -- check logs",
    )

    source >> build >> alert

5. Conditional Branching with @condition

Use the @condition decorator to route data down different paths. The true branch is the first >> from the condition node; the false branch is the second.

from brokoli import Pipeline, source_api, transform, sink_file, sink_db, notify, condition

@condition
def has_data(rows, columns, config, params):
    """Returns True when the dataset is non-empty."""
    return len(rows) > 0

@condition
def is_large_batch(rows, columns, config, params):
    """Returns True when there are more than 1000 rows."""
    return len(rows) > 1000

with Pipeline(
    "conditional-routing",
    description="Route events by volume to the right destination",
    pipeline_id="conditional-routing",
) as p:
    source = source_api(
        "Fetch Events",
        url="https://jsonplaceholder.typicode.com/comments",
        method="GET",
    )

    check = has_data("Has Events?")
    size_check = is_large_batch("Large Batch?")

    # Small batch path: save to a local JSON file
    small_sink = sink_file("Save Small Batch", path="/data/events_small.json", format="json")

    # Large batch path: load into the warehouse
    large_sink = sink_db(
        "Save Large Batch",
        conn_id="warehouse-pg",
        table="events",
        mode="append",
    )

    # Empty path: alert the team
    alert = notify(
        "Alert Empty",
        channel="data-alerts",
        message="No events returned from the events API",
    )

    # --- DAG ---
    source >> check

    check >> size_check    # true branch: has data -> check size
    check >> alert         # false branch: no data -> alert

    size_check >> large_sink   # true branch: large batch -> DB
    size_check >> small_sink   # false branch: small batch -> file