Brokoli
Python SDK

Decorators

8 decorators to define pipeline logic as Python functions — @task, @condition, @source, @sink, @filter, @map, @validate, @sensor.

Brokoli provides 8 decorators that turn plain Python functions into pipeline nodes. Each compiles to a code node that the engine executes — no platform changes needed.

DecoratorInputOutputUse case
@taskrows: list[dict]list[dict] or TaskResultGeneral processing
@conditionrows: list[dict]boolTrue/false branching
@sourcenonelist[dict]Custom data fetcher
@sinkrows: list[dict]none (pass-through)Custom data writer
@filterrow: dictboolKeep/drop individual rows
@maprow: dictdictTransform each row
@validaterows: list[dict]bool or (bool, str)Custom quality gate
@sensornoneboolPoll until a condition is met

All decorators must be used inside a with Pipeline(...): block.

@task

General-purpose data processing. Receives all rows, returns transformed rows.

@task(name="", retries=0, retry_backoff="exponential", timeout=0, on_failure=None)
from brokoli import Pipeline, task, source_db, sink_file

with Pipeline("user-enrichment") as p:
    @task
    def enrich_users(rows):
        for row in rows:
            row["full_name"] = f"{row.get('first_name', '')} {row.get('last_name', '')}"
        return rows

    src = source_db("Load Users", conn_id="prod-pg", query="SELECT * FROM users")
    sink = sink_file("Export", path="/data/enriched.csv")

    src >> enrich_users >> sink

With parameters:

@task("Enrich Data", retries=3, timeout=120)
def enrich(rows):
    for r in rows:
        r["domain"] = r["email"].split("@")[1]
    return rows

TaskResult

Return a TaskResult for structured warnings and metadata:

from brokoli import task
from brokoli.result import TaskResult

@task
def clean_orders(rows):
    clean = [r for r in rows if r.get("amount", 0) > 0]
    skipped = len(rows) - len(clean)
    return TaskResult(
        data=clean,
        warnings=[f"Skipped {skipped} negative-amount rows"] if skipped else [],
        metadata={"skipped": skipped},
    )
FieldTypeDescription
datalist[dict]Output rows
warningslist[str]Non-fatal messages (logged to run stderr)
errorslist[str]Error messages (logged to run stderr)
metadatadictArbitrary key-value metadata

Properties: has_warnings, has_errors, row_count. Method: to_rows().

Error handling

Raise an exception to fail the node:

@task
def validate_data(rows):
    if not rows:
        raise ValueError("No data received — source may be empty")
    return rows

Logging

Print to stderr for messages in the run logs:

import sys

@task
def process(rows):
    print(f"Processing {len(rows)} rows", file=sys.stderr)
    return rows

@source

Custom data fetcher. Takes no arguments, returns rows. Use this when the built-in source_db, source_api, source_file don't cover your data source.

@source(name="", retries=0, timeout=0)
from brokoli import Pipeline, source, sink_db

with Pipeline("stripe-ingest") as p:
    @source("Stripe Charges", timeout=60)
    def fetch_charges():
        import stripe
        stripe.api_key = "sk_live_..."
        charges = stripe.Charge.list(limit=100)
        return [c.to_dict() for c in charges.data]

    save = sink_db("Save", table="raw.stripe_charges", conn_id="warehouse")
    fetch_charges >> save

Works with any Python library — APIs, SDKs, scraping, file systems:

@source
def fetch_github_events():
    import requests
    resp = requests.get("https://api.github.com/events")
    return resp.json()

@sink

Custom data writer. Receives rows, writes them somewhere. Data passes through unchanged to downstream nodes.

@sink(name="", retries=0, timeout=0)
from brokoli import Pipeline, sink, source_db

with Pipeline("hubspot-sync") as p:
    leads = source_db("Load Leads", conn_id="crm-pg", query="SELECT * FROM leads")

    @sink("Push to HubSpot", retries=3)
    def push_to_hubspot(rows):
        import hubspot
        client = hubspot.Client.create(access_token="...")
        for row in rows:
            client.crm.contacts.basic_api.create(properties=row)

    leads >> push_to_hubspot
@sink("S3 Upload")
def upload_to_s3(rows):
    import boto3, json
    s3 = boto3.client("s3")
    s3.put_object(Bucket="data-lake", Key="output.json", Body=json.dumps(rows))

@filter

Row-level predicate. The function receives a single row and returns True to keep it, False to drop it. Simpler than @task for the common filter case.

@filter(name="")
from brokoli import Pipeline, filter, source_api, sink_file

with Pipeline("active-users") as p:
    users = source_api("Fetch Users", url="https://jsonplaceholder.typicode.com/users")

    @filter
    def active_only(row):
        return row.get("status") == "active"

    export = sink_file("Export", path="/data/active_users.csv")
    users >> active_only >> export

Multiple filters chain naturally:

@filter
def has_email(row):
    return "@" in row.get("email", "")

@filter("High Value")
def high_value(row):
    return row.get("revenue", 0) > 1000

source >> has_email >> high_value >> sink

@map

Row-level transform. The function receives a single row and returns the modified row. Most data enrichment is just this.

@map(name="")
from brokoli import Pipeline, map, source_db, sink_db

with Pipeline("enrich-contacts") as p:
    contacts = source_db("Load", conn_id="pg", query="SELECT * FROM contacts")

    @map
    def enrich(row):
        row["full_name"] = f"{row['first']} {row['last']}"
        row["domain"] = row["email"].split("@")[1]
        return row

    save = sink_db("Save", table="enriched_contacts", conn_id="pg")
    contacts >> enrich >> save

Chain @map and @filter for clean ETL:

@map
def normalize(row):
    row["email"] = row["email"].strip().lower()
    return row

@filter
def valid_email(row):
    return "@" in row["email"] and "." in row["email"]

source >> normalize >> valid_email >> sink

@validate

Custom quality gate. The function receives all rows and returns either bool or (bool, message). When validation fails:

  • on_failure="block" (default) — raises an error, stops the pipeline
  • on_failure="warn" — logs a warning, continues execution
@validate(name="", on_failure="block")
from brokoli import Pipeline, validate, source_db, sink_db

with Pipeline("revenue-etl") as p:
    orders = source_db("Load Orders", conn_id="pg", query="SELECT * FROM orders")

    @validate("Revenue sanity check")
    def revenue_positive(rows):
        total = sum(r.get("amount", 0) for r in rows)
        return total > 0, f"Total revenue: {total}"

    @validate("Minimum row count", on_failure="warn")
    def enough_rows(rows):
        return len(rows) >= 100, f"Got {len(rows)} rows"

    save = sink_db("Save", table="fact_orders", conn_id="warehouse")
    orders >> revenue_positive >> enough_rows >> save

The message string is logged to the run's stderr — visible in the Brokoli UI run logs.

@condition

Boolean branching. The function returns True or False, and the DAG splits into two branches. Use with a with statement.

@condition(name="")
from brokoli import Pipeline, condition, source_api, transform, notify

with Pipeline("conditional-etl") as p:
    data = source_api("Fetch", url="https://jsonplaceholder.typicode.com/posts")

    @condition("Has enough data?")
    def has_data(rows):
        return len(rows) >= 10

    process = transform("Process", rules=[
        {"type": "filter_rows", "column": "userId", "operator": "eq", "value": "1"}
    ])
    alert = notify("Alert", notify_type="webhook",
                   webhook_url="https://hooks.example.com/alert",
                   message="Not enough data for processing")

    with has_data(data) as (ok, fail):
        ok >> process
        fail >> alert

@sensor

Poll until a condition is met before proceeding. The function takes no arguments and returns True when ready. The node polls at poll_interval seconds and fails after timeout seconds.

@sensor(name="", poll_interval=60, timeout=3600)
from brokoli import Pipeline, sensor, source_file, sink_db

with Pipeline("daily-ingest") as p:
    @sensor("Wait for export file", poll_interval=30, timeout=1800)
    def file_ready():
        import os
        return os.path.exists("/data/daily_export.csv")

    data = source_file("Read Export", path="/data/daily_export.csv")
    save = sink_db("Load to DWH", table="daily_data", conn_id="warehouse")

    file_ready >> data >> save

Wait for an API to be healthy:

@sensor("Wait for API", poll_interval=10, timeout=300)
def api_ready():
    import urllib.request
    try:
        urllib.request.urlopen("https://api.example.com/health", timeout=5)
        return True
    except Exception:
        return False

Combining decorators

All decorators produce nodes that chain with >> and work with built-in nodes:

from brokoli import (
    Pipeline, source, filter, map, validate, sink,
    transform, quality_check, sink_db,
)

with Pipeline("full-pipeline") as p:
    @source
    def fetch():
        import requests
        return requests.get("https://api.example.com/data").json()

    @filter
    def active(row):
        return row.get("status") == "active"

    @map
    def enrich(row):
        row["domain"] = row["email"].split("@")[1]
        return row

    @validate("Has data")
    def check(rows):
        return len(rows) > 0, f"{len(rows)} rows"

    clean = transform("Deduplicate", rules=[{"type": "deduplicate", "columns": ["id"]}])

    @sink("Write to API")
    def push(rows):
        import requests
        requests.post("https://api.example.com/ingest", json=rows)

    fetch >> active >> enrich >> check >> clean >> push

Performance notes

  • For small datasets (under 10K rows), data is passed via JSON stdin/stdout
  • For large datasets (over 10K rows), Brokoli uses NDJSON file transfer automatically
  • If pyarrow or pandas are installed, they are used for faster I/O
  • Set timeout on @task, @source, @sink to handle long-running operations
  • @sensor timeout should account for the maximum expected wait time — the node timeout is automatically set to timeout + 60s