Decorators
8 decorators to define pipeline logic as Python functions — @task, @condition, @source, @sink, @filter, @map, @validate, @sensor.
Brokoli provides 8 decorators that turn plain Python functions into pipeline nodes. Each compiles to a code node that the engine executes — no platform changes needed.
| Decorator | Input | Output | Use case |
|---|---|---|---|
@task | rows: list[dict] | list[dict] or TaskResult | General processing |
@condition | rows: list[dict] | bool | True/false branching |
@source | none | list[dict] | Custom data fetcher |
@sink | rows: list[dict] | none (pass-through) | Custom data writer |
@filter | row: dict | bool | Keep/drop individual rows |
@map | row: dict | dict | Transform each row |
@validate | rows: list[dict] | bool or (bool, str) | Custom quality gate |
@sensor | none | bool | Poll until a condition is met |
All decorators must be used inside a with Pipeline(...): block.
@task
General-purpose data processing. Receives all rows, returns transformed rows.
@task(name="", retries=0, retry_backoff="exponential", timeout=0, on_failure=None)from brokoli import Pipeline, task, source_db, sink_file
with Pipeline("user-enrichment") as p:
@task
def enrich_users(rows):
for row in rows:
row["full_name"] = f"{row.get('first_name', '')} {row.get('last_name', '')}"
return rows
src = source_db("Load Users", conn_id="prod-pg", query="SELECT * FROM users")
sink = sink_file("Export", path="/data/enriched.csv")
src >> enrich_users >> sinkWith parameters:
@task("Enrich Data", retries=3, timeout=120)
def enrich(rows):
for r in rows:
r["domain"] = r["email"].split("@")[1]
return rowsTaskResult
Return a TaskResult for structured warnings and metadata:
from brokoli import task
from brokoli.result import TaskResult
@task
def clean_orders(rows):
clean = [r for r in rows if r.get("amount", 0) > 0]
skipped = len(rows) - len(clean)
return TaskResult(
data=clean,
warnings=[f"Skipped {skipped} negative-amount rows"] if skipped else [],
metadata={"skipped": skipped},
)| Field | Type | Description |
|---|---|---|
data | list[dict] | Output rows |
warnings | list[str] | Non-fatal messages (logged to run stderr) |
errors | list[str] | Error messages (logged to run stderr) |
metadata | dict | Arbitrary key-value metadata |
Properties: has_warnings, has_errors, row_count. Method: to_rows().
Error handling
Raise an exception to fail the node:
@task
def validate_data(rows):
if not rows:
raise ValueError("No data received — source may be empty")
return rowsLogging
Print to stderr for messages in the run logs:
import sys
@task
def process(rows):
print(f"Processing {len(rows)} rows", file=sys.stderr)
return rows@source
Custom data fetcher. Takes no arguments, returns rows. Use this when the built-in source_db, source_api, source_file don't cover your data source.
@source(name="", retries=0, timeout=0)from brokoli import Pipeline, source, sink_db
with Pipeline("stripe-ingest") as p:
@source("Stripe Charges", timeout=60)
def fetch_charges():
import stripe
stripe.api_key = "sk_live_..."
charges = stripe.Charge.list(limit=100)
return [c.to_dict() for c in charges.data]
save = sink_db("Save", table="raw.stripe_charges", conn_id="warehouse")
fetch_charges >> saveWorks with any Python library — APIs, SDKs, scraping, file systems:
@source
def fetch_github_events():
import requests
resp = requests.get("https://api.github.com/events")
return resp.json()@sink
Custom data writer. Receives rows, writes them somewhere. Data passes through unchanged to downstream nodes.
@sink(name="", retries=0, timeout=0)from brokoli import Pipeline, sink, source_db
with Pipeline("hubspot-sync") as p:
leads = source_db("Load Leads", conn_id="crm-pg", query="SELECT * FROM leads")
@sink("Push to HubSpot", retries=3)
def push_to_hubspot(rows):
import hubspot
client = hubspot.Client.create(access_token="...")
for row in rows:
client.crm.contacts.basic_api.create(properties=row)
leads >> push_to_hubspot@sink("S3 Upload")
def upload_to_s3(rows):
import boto3, json
s3 = boto3.client("s3")
s3.put_object(Bucket="data-lake", Key="output.json", Body=json.dumps(rows))@filter
Row-level predicate. The function receives a single row and returns True to keep it, False to drop it. Simpler than @task for the common filter case.
@filter(name="")from brokoli import Pipeline, filter, source_api, sink_file
with Pipeline("active-users") as p:
users = source_api("Fetch Users", url="https://jsonplaceholder.typicode.com/users")
@filter
def active_only(row):
return row.get("status") == "active"
export = sink_file("Export", path="/data/active_users.csv")
users >> active_only >> exportMultiple filters chain naturally:
@filter
def has_email(row):
return "@" in row.get("email", "")
@filter("High Value")
def high_value(row):
return row.get("revenue", 0) > 1000
source >> has_email >> high_value >> sink@map
Row-level transform. The function receives a single row and returns the modified row. Most data enrichment is just this.
@map(name="")from brokoli import Pipeline, map, source_db, sink_db
with Pipeline("enrich-contacts") as p:
contacts = source_db("Load", conn_id="pg", query="SELECT * FROM contacts")
@map
def enrich(row):
row["full_name"] = f"{row['first']} {row['last']}"
row["domain"] = row["email"].split("@")[1]
return row
save = sink_db("Save", table="enriched_contacts", conn_id="pg")
contacts >> enrich >> saveChain @map and @filter for clean ETL:
@map
def normalize(row):
row["email"] = row["email"].strip().lower()
return row
@filter
def valid_email(row):
return "@" in row["email"] and "." in row["email"]
source >> normalize >> valid_email >> sink@validate
Custom quality gate. The function receives all rows and returns either bool or (bool, message). When validation fails:
on_failure="block"(default) — raises an error, stops the pipelineon_failure="warn"— logs a warning, continues execution
@validate(name="", on_failure="block")from brokoli import Pipeline, validate, source_db, sink_db
with Pipeline("revenue-etl") as p:
orders = source_db("Load Orders", conn_id="pg", query="SELECT * FROM orders")
@validate("Revenue sanity check")
def revenue_positive(rows):
total = sum(r.get("amount", 0) for r in rows)
return total > 0, f"Total revenue: {total}"
@validate("Minimum row count", on_failure="warn")
def enough_rows(rows):
return len(rows) >= 100, f"Got {len(rows)} rows"
save = sink_db("Save", table="fact_orders", conn_id="warehouse")
orders >> revenue_positive >> enough_rows >> saveThe message string is logged to the run's stderr — visible in the Brokoli UI run logs.
@condition
Boolean branching. The function returns True or False, and the DAG splits into two branches. Use with a with statement.
@condition(name="")from brokoli import Pipeline, condition, source_api, transform, notify
with Pipeline("conditional-etl") as p:
data = source_api("Fetch", url="https://jsonplaceholder.typicode.com/posts")
@condition("Has enough data?")
def has_data(rows):
return len(rows) >= 10
process = transform("Process", rules=[
{"type": "filter_rows", "column": "userId", "operator": "eq", "value": "1"}
])
alert = notify("Alert", notify_type="webhook",
webhook_url="https://hooks.example.com/alert",
message="Not enough data for processing")
with has_data(data) as (ok, fail):
ok >> process
fail >> alert@sensor
Poll until a condition is met before proceeding. The function takes no arguments and returns True when ready. The node polls at poll_interval seconds and fails after timeout seconds.
@sensor(name="", poll_interval=60, timeout=3600)from brokoli import Pipeline, sensor, source_file, sink_db
with Pipeline("daily-ingest") as p:
@sensor("Wait for export file", poll_interval=30, timeout=1800)
def file_ready():
import os
return os.path.exists("/data/daily_export.csv")
data = source_file("Read Export", path="/data/daily_export.csv")
save = sink_db("Load to DWH", table="daily_data", conn_id="warehouse")
file_ready >> data >> saveWait for an API to be healthy:
@sensor("Wait for API", poll_interval=10, timeout=300)
def api_ready():
import urllib.request
try:
urllib.request.urlopen("https://api.example.com/health", timeout=5)
return True
except Exception:
return FalseCombining decorators
All decorators produce nodes that chain with >> and work with built-in nodes:
from brokoli import (
Pipeline, source, filter, map, validate, sink,
transform, quality_check, sink_db,
)
with Pipeline("full-pipeline") as p:
@source
def fetch():
import requests
return requests.get("https://api.example.com/data").json()
@filter
def active(row):
return row.get("status") == "active"
@map
def enrich(row):
row["domain"] = row["email"].split("@")[1]
return row
@validate("Has data")
def check(rows):
return len(rows) > 0, f"{len(rows)} rows"
clean = transform("Deduplicate", rules=[{"type": "deduplicate", "columns": ["id"]}])
@sink("Write to API")
def push(rows):
import requests
requests.post("https://api.example.com/ingest", json=rows)
fetch >> active >> enrich >> check >> clean >> pushPerformance notes
- For small datasets (under 10K rows), data is passed via JSON stdin/stdout
- For large datasets (over 10K rows), Brokoli uses NDJSON file transfer automatically
- If
pyarroworpandasare installed, they are used for faster I/O - Set
timeouton@task,@source,@sinkto handle long-running operations @sensortimeout should account for the maximum expected wait time — the node timeout is automatically set totimeout + 60s