Pipeline Class
Python SDK Pipeline class and node functions reference
The Pipeline class is the core building block for defining pipelines as code. Nodes are module-level functions imported from the brokoli package -- they are not methods on the Pipeline object.
Basic usage
from brokoli import Pipeline, source_db, transform, sink_file
with Pipeline("my-pipeline", description="Load and transform user data") as p:
src = source_db("Load Users", conn_id="prod-pg", query="SELECT * FROM users")
clean = transform("Clean", input=src, rules=[{"type": "drop_columns", "columns": ["password_hash"]}])
out = sink_file("Export", input=clean, path="/data/users.csv", format="csv")
src >> clean >> outDeploy with the CLI:
brokoli deployThere is no p.deploy() method. Deployment is always done through the brokoli deploy CLI command.
Constructor
Pipeline(
name: str,
pipeline_id: str | None = None,
description: str = "",
schedule: str = "",
catch_up: bool = False,
sla: str = "",
depends_on: list[str] | None = None,
tags: list[str] | None = None,
webhook: bool = False,
max_retries: int = 0,
concurrency: int = 4,
on_start: Callable | None = None,
on_success: Callable | None = None,
on_failure: Callable | None = None,
)| Parameter | Description |
|---|---|
name | Pipeline display name (required) |
pipeline_id | Stable slug for matching across deployments. If omitted, one is derived from name |
description | Human-readable description |
schedule | Cron expression (e.g., "0 6 * * *") |
catch_up | Run missed intervals when the pipeline is re-enabled |
sla | Duration string for SLA alerting (e.g., "30m", "2h") |
depends_on | List of pipeline IDs that must complete before this one starts |
tags | List of tags for filtering and grouping |
webhook | Enable webhook-triggered runs |
max_retries | Number of times to retry the entire pipeline on failure |
concurrency | Maximum number of nodes that can execute in parallel (default 4) |
on_start | Callback invoked when the pipeline run begins |
on_success | Callback invoked when the pipeline run succeeds |
on_failure | Callback invoked when the pipeline run fails |
Context manager
Always use Pipeline as a context manager. The with block collects all node definitions and edges:
with Pipeline("example") as p:
# define nodes here
...
# After the block, p.to_yaml() returns readable YAML, p.to_json() returns a dictAll node function calls must happen inside the with block so they are registered to the active pipeline.
The >> operator
Node functions return NodeRef objects that support the >> operator for chaining:
from brokoli import Pipeline, source_db, transform, sink_file, sink_db
with Pipeline("fan-out") as p:
src = source_db("Load", conn_id="pg", query="SELECT * FROM events")
clean = transform("Clean", input=src, rules=[{"type": "rename", "old": "ts", "new": "timestamp"}])
csv = sink_file("To CSV", input=clean, path="/data/events.csv")
db = sink_db("To Warehouse", input=clean, table="events_clean", conn_id="warehouse")
# Linear chain
src >> clean
# Fan-out from one node to multiple sinks
clean >> csv
clean >> dbYou can also write linear chains in a single expression:
src >> clean >> csvNode functions reference
Every node function is a module-level import from brokoli. They all take name as the first positional argument.
Sources
source_db
Read data from a SQL database.
from brokoli import source_db
src = source_db(
"Load Users",
query="SELECT * FROM users WHERE active = true",
conn_id="prod-pg", # reference a saved connection
# uri="postgres://...", # or pass a URI directly
retries=3,
retry_backoff="exponential",
timeout=60,
)| Parameter | Default | Description |
|---|---|---|
name | (required) | Node display name |
query | "" | SQL query to execute |
conn_id | "" | Saved connection ID |
uri | "" | Direct database URI (alternative to conn_id) |
retries | 0 | Number of retry attempts |
retry_backoff | "exponential" | Backoff strategy ("exponential") |
timeout | 0 | Query timeout in seconds (0 = no timeout) |
source_api
Fetch data from an HTTP API.
from brokoli import source_api
api = source_api(
"Fetch Orders",
url="https://api.example.com/orders",
method="GET",
headers={"Authorization": "Bearer ${var.api_token}"},
timeout=30,
)| Parameter | Default | Description |
|---|---|---|
name | (required) | Node display name |
url | "" | Request URL |
method | "GET" | HTTP method |
headers | None | Dictionary of HTTP headers |
body | "" | Request body (for POST/PUT) |
conn_id | "" | Saved connection ID |
retries | 0 | Number of retry attempts |
retry_backoff | "exponential" | Backoff strategy |
timeout | 30 | Request timeout in seconds |
source_file
Read data from a local file.
from brokoli import source_file
f = source_file("Read CSV", path="/data/input.csv", format="csv")| Parameter | Default | Description |
|---|---|---|
name | (required) | Node display name |
path | "" | File path |
format | "csv" | File format ("csv", "json", "parquet") |
Transforms
transform
Apply declarative transform rules to data.
from brokoli import transform
clean = transform(
"Clean Data",
input=src,
rules=[
{"type": "drop_columns", "columns": ["password_hash", "ssn"]},
{"type": "rename", "old": "ts", "new": "timestamp"},
{"type": "filter", "expression": "age >= 18"},
],
)| Parameter | Default | Description |
|---|---|---|
name | (required) | Node display name |
input | None | Upstream node reference |
rules | None | List of transform rule objects |
join
Join two data streams.
from brokoli import join
merged = join(
"Merge Users + Orders",
left=users,
right=orders,
on="user_id",
how="left",
)| Parameter | Default | Description |
|---|---|---|
name | (required) | Node display name |
left | None | Left input node reference |
right | None | Right input node reference |
on | "" | Join key column name |
how | "inner" | Join type ("inner", "left", "right", "outer") |
code
Execute a custom script.
from brokoli import code
custom = code(
"Custom Transform",
input=src,
language="python",
script="df['full_name'] = df['first'] + ' ' + df['last']",
python_path="/usr/bin/python3",
)| Parameter | Default | Description |
|---|---|---|
name | (required) | Node display name |
input | None | Upstream node reference |
language | "python" | Script language |
script | "" | Inline script to execute |
python_path | "" | Path to Python interpreter |
Quality and logic
quality_check
Validate data against rules. Fails the pipeline if checks do not pass.
from brokoli import quality_check
qc = quality_check(
"Validate",
input=clean,
rules=[
{"type": "not_null", "columns": ["id", "email"]},
{"type": "unique", "columns": ["id"]},
{"type": "row_count", "min": 1},
],
)| Parameter | Default | Description |
|---|---|---|
name | (required) | Node display name |
input | None | Upstream node reference |
rules | None | List of quality check rule objects |
condition_node
Branch pipeline execution based on an expression.
from brokoli import condition_node
gate = condition_node(
"Check Row Count",
expression="row_count > 0",
input=src,
)| Parameter | Default | Description |
|---|---|---|
name | (required) | Node display name |
expression | "" | Boolean expression to evaluate |
input | None | Upstream node reference |
Sinks
sink_db
Write data to a SQL database.
from brokoli import sink_db
out = sink_db(
"Write to Warehouse",
input=clean,
table="analytics.users",
mode="append",
conn_id="warehouse",
retries=2,
)| Parameter | Default | Description |
|---|---|---|
name | (required) | Node display name |
input | None | Upstream node reference |
table | "" | Target table name |
mode | "append" | Write mode ("append", "replace", "upsert") |
conn_id | "" | Saved connection ID |
uri | "" | Direct database URI |
retries | 0 | Number of retry attempts |
sink_file
Write data to a local file.
from brokoli import sink_file
out = sink_file(
"Export CSV",
input=clean,
path="/data/output.csv",
format="csv",
compress="gzip",
)| Parameter | Default | Description |
|---|---|---|
name | (required) | Node display name |
input | None | Upstream node reference |
path | "" | Output file path |
format | "csv" | File format ("csv", "json", "parquet") |
compress | "" | Compression ("", "gzip", "snappy") |
sink_api
Send data to an HTTP API.
from brokoli import sink_api
out = sink_api(
"Post Results",
input=clean,
url="https://api.example.com/ingest",
method="POST",
headers={"Content-Type": "application/json"},
)| Parameter | Default | Description |
|---|---|---|
name | (required) | Node display name |
input | None | Upstream node reference |
url | "" | Request URL |
method | "POST" | HTTP method |
body | "" | Request body template |
headers | None | Dictionary of HTTP headers |
Operations
migrate
Move data directly between two databases in a single node.
from brokoli import migrate
m = migrate(
"Replicate Users",
source_conn_id="prod-pg",
target_conn_id="warehouse",
query="SELECT * FROM users",
table="raw.users",
mode="append",
)| Parameter | Default | Description |
|---|---|---|
name | (required) | Node display name |
source_uri | "" | Source database URI |
target_uri | "" | Target database URI |
query | "" | SQL query to run on the source |
table | "" | Target table name |
mode | "append" | Write mode |
source_conn_id | "" | Source saved connection ID |
target_conn_id | "" | Target saved connection ID |
dbt
Run a dbt command as a pipeline node.
from brokoli import dbt
d = dbt(
"dbt Run",
command="run",
project_dir="/opt/dbt/my_project",
target="prod",
select="tag:daily",
)| Parameter | Default | Description |
|---|---|---|
name | (required) | Node display name |
command | "run" | dbt command ("run", "test", "build", "seed") |
project_dir | "" | Path to dbt project directory |
target | "" | dbt target/profile name |
select | "" | dbt node selection syntax |
profiles_dir | "" | Path to dbt profiles directory |
vars | "" | JSON string of dbt vars |
input | None | Upstream node reference (for ordering) |
notify
Send a notification (webhook, Slack, etc.).
from brokoli import notify
n = notify(
"Alert on Complete",
input=out,
notify_type="webhook",
webhook_url="https://hooks.slack.com/services/T.../B.../xxx",
message="Pipeline finished successfully",
)| Parameter | Default | Description |
|---|---|---|
name | (required) | Node display name |
input | None | Upstream node reference |
notify_type | "webhook" | Notification type ("webhook") |
webhook_url | "" | Webhook URL |
message | "" | Notification message |
channel | "" | Channel identifier |
Parallel execution
parallel
Combine multiple node references so they can feed into a downstream node together.
from brokoli import parallel, source_db, transform, sink_db
with Pipeline("parallel-sources") as p:
users = source_db("Users", conn_id="pg", query="SELECT * FROM users")
orders = source_db("Orders", conn_id="pg", query="SELECT * FROM orders")
merge = transform("Merge", rules=[{"type": "concat"}])
out = sink_db("Write", input=merge, table="combined", conn_id="warehouse")
parallel(users, orders) >> merge >> outpipeline_id for Git Sync
When using Git Sync, set pipeline_id to a stable slug. This lets Brokoli match pipelines across deployments instead of creating duplicates:
with Pipeline("Daily User Export", pipeline_id="daily-user-export") as p:
...Without pipeline_id, updates create new pipelines instead of updating existing ones.
to_yaml()
Export the pipeline as clean, readable YAML. Multi-line strings (scripts, SQL queries) are rendered as block scalars — no \n escapes:
from brokoli import Pipeline, source_db, sink_db
with Pipeline("daily-etl", schedule="0 5 * * *") as p:
src = source_db("Load", query="SELECT *\nFROM users\nWHERE active = true", conn_id="pg")
snk = sink_db("Save", table="output", conn_id="warehouse")
src >> snk
print(p.to_yaml())Output:
name: daily-etl
schedule: 0 5 * * *
enabled: true
nodes:
- id: load_a1b2c3
type: source_db
name: Load
config:
query: |
SELECT *
FROM users
WHERE active = true
conn_id: pg
- id: save_d4e5f6
type: sink_db
name: Save
config:
table: output
mode: append
conn_id: warehouse
edges:
- from: load_a1b2c3
to: save_d4e5f6to_json()
Returns a Python dict for API payloads or when you need JSON:
import json
print(json.dumps(p.to_json(), indent=2))Use to_yaml() for human-readable output and to_json() when sending to the API programmatically.
Lifecycle callbacks
Use on_start, on_success, and on_failure to run custom logic at pipeline lifecycle events:
def alert_on_failure(context):
print(f"Pipeline {context['pipeline_name']} failed: {context['error']}")
with Pipeline(
"critical-etl",
schedule="0 6 * * *",
on_failure=alert_on_failure,
max_retries=2,
) as p:
...Complete example
A realistic pipeline that extracts from two sources, validates, transforms, and loads:
from brokoli import (
Pipeline,
source_db,
source_api,
join,
quality_check,
transform,
sink_db,
notify,
)
with Pipeline(
"user-enrichment",
pipeline_id="user-enrichment",
description="Enrich user profiles with latest activity data",
schedule="0 */4 * * *",
tags=["etl", "users"],
max_retries=1,
concurrency=4,
) as p:
users = source_db(
"Load Users",
conn_id="prod-pg",
query="SELECT id, email, name FROM users WHERE active = true",
retries=2,
)
activity = source_api(
"Fetch Activity",
url="https://analytics.internal/api/v1/activity",
headers={"Authorization": "Bearer ${var.analytics_token}"},
timeout=60,
)
merged = join(
"Join Users + Activity",
left=users,
right=activity,
on="user_id",
how="left",
)
qc = quality_check(
"Validate",
input=merged,
rules=[
{"type": "not_null", "columns": ["id", "email"]},
{"type": "row_count", "min": 100},
],
)
clean = transform(
"Normalize",
input=qc,
rules=[
{"type": "rename", "old": "email", "new": "email_address"},
{"type": "drop_columns", "columns": ["raw_json"]},
],
)
out = sink_db(
"Write to Warehouse",
input=clean,
table="analytics.enriched_users",
mode="append",
conn_id="warehouse",
)
done = notify(
"Slack Notification",
input=out,
notify_type="webhook",
webhook_url="https://hooks.slack.com/services/T.../B.../xxx",
message="User enrichment pipeline completed",
)
users >> merged
activity >> merged
merged >> qc >> clean >> out >> doneDeploy with the CLI:
brokoli deployHelper types
| Type | Description |
|---|---|
NodeRef | Returned by every node function. Supports >> for chaining |
_MultiRef | Returned by parallel(). Supports >> to connect multiple upstream nodes at once |
TaskResult | Result object available in lifecycle callbacks |