Brokoli
Python SDK

Pipeline Class

Python SDK Pipeline class and node functions reference

The Pipeline class is the core building block for defining pipelines as code. Nodes are module-level functions imported from the brokoli package -- they are not methods on the Pipeline object.

Basic usage

from brokoli import Pipeline, source_db, transform, sink_file

with Pipeline("my-pipeline", description="Load and transform user data") as p:
    src = source_db("Load Users", conn_id="prod-pg", query="SELECT * FROM users")
    clean = transform("Clean", input=src, rules=[{"type": "drop_columns", "columns": ["password_hash"]}])
    out = sink_file("Export", input=clean, path="/data/users.csv", format="csv")

    src >> clean >> out

Deploy with the CLI:

brokoli deploy

There is no p.deploy() method. Deployment is always done through the brokoli deploy CLI command.

Constructor

Pipeline(
    name: str,
    pipeline_id: str | None = None,
    description: str = "",
    schedule: str = "",
    catch_up: bool = False,
    sla: str = "",
    depends_on: list[str] | None = None,
    tags: list[str] | None = None,
    webhook: bool = False,
    max_retries: int = 0,
    concurrency: int = 4,
    on_start: Callable | None = None,
    on_success: Callable | None = None,
    on_failure: Callable | None = None,
)
ParameterDescription
namePipeline display name (required)
pipeline_idStable slug for matching across deployments. If omitted, one is derived from name
descriptionHuman-readable description
scheduleCron expression (e.g., "0 6 * * *")
catch_upRun missed intervals when the pipeline is re-enabled
slaDuration string for SLA alerting (e.g., "30m", "2h")
depends_onList of pipeline IDs that must complete before this one starts
tagsList of tags for filtering and grouping
webhookEnable webhook-triggered runs
max_retriesNumber of times to retry the entire pipeline on failure
concurrencyMaximum number of nodes that can execute in parallel (default 4)
on_startCallback invoked when the pipeline run begins
on_successCallback invoked when the pipeline run succeeds
on_failureCallback invoked when the pipeline run fails

Context manager

Always use Pipeline as a context manager. The with block collects all node definitions and edges:

with Pipeline("example") as p:
    # define nodes here
    ...

# After the block, p.to_yaml() returns readable YAML, p.to_json() returns a dict

All node function calls must happen inside the with block so they are registered to the active pipeline.

The >> operator

Node functions return NodeRef objects that support the >> operator for chaining:

from brokoli import Pipeline, source_db, transform, sink_file, sink_db

with Pipeline("fan-out") as p:
    src = source_db("Load", conn_id="pg", query="SELECT * FROM events")
    clean = transform("Clean", input=src, rules=[{"type": "rename", "old": "ts", "new": "timestamp"}])
    csv = sink_file("To CSV", input=clean, path="/data/events.csv")
    db = sink_db("To Warehouse", input=clean, table="events_clean", conn_id="warehouse")

    # Linear chain
    src >> clean

    # Fan-out from one node to multiple sinks
    clean >> csv
    clean >> db

You can also write linear chains in a single expression:

src >> clean >> csv

Node functions reference

Every node function is a module-level import from brokoli. They all take name as the first positional argument.

Sources

source_db

Read data from a SQL database.

from brokoli import source_db

src = source_db(
    "Load Users",
    query="SELECT * FROM users WHERE active = true",
    conn_id="prod-pg",       # reference a saved connection
    # uri="postgres://...",   # or pass a URI directly
    retries=3,
    retry_backoff="exponential",
    timeout=60,
)
ParameterDefaultDescription
name(required)Node display name
query""SQL query to execute
conn_id""Saved connection ID
uri""Direct database URI (alternative to conn_id)
retries0Number of retry attempts
retry_backoff"exponential"Backoff strategy ("exponential")
timeout0Query timeout in seconds (0 = no timeout)

source_api

Fetch data from an HTTP API.

from brokoli import source_api

api = source_api(
    "Fetch Orders",
    url="https://api.example.com/orders",
    method="GET",
    headers={"Authorization": "Bearer ${var.api_token}"},
    timeout=30,
)
ParameterDefaultDescription
name(required)Node display name
url""Request URL
method"GET"HTTP method
headersNoneDictionary of HTTP headers
body""Request body (for POST/PUT)
conn_id""Saved connection ID
retries0Number of retry attempts
retry_backoff"exponential"Backoff strategy
timeout30Request timeout in seconds

source_file

Read data from a local file.

from brokoli import source_file

f = source_file("Read CSV", path="/data/input.csv", format="csv")
ParameterDefaultDescription
name(required)Node display name
path""File path
format"csv"File format ("csv", "json", "parquet")

Transforms

transform

Apply declarative transform rules to data.

from brokoli import transform

clean = transform(
    "Clean Data",
    input=src,
    rules=[
        {"type": "drop_columns", "columns": ["password_hash", "ssn"]},
        {"type": "rename", "old": "ts", "new": "timestamp"},
        {"type": "filter", "expression": "age >= 18"},
    ],
)
ParameterDefaultDescription
name(required)Node display name
inputNoneUpstream node reference
rulesNoneList of transform rule objects

join

Join two data streams.

from brokoli import join

merged = join(
    "Merge Users + Orders",
    left=users,
    right=orders,
    on="user_id",
    how="left",
)
ParameterDefaultDescription
name(required)Node display name
leftNoneLeft input node reference
rightNoneRight input node reference
on""Join key column name
how"inner"Join type ("inner", "left", "right", "outer")

code

Execute a custom script.

from brokoli import code

custom = code(
    "Custom Transform",
    input=src,
    language="python",
    script="df['full_name'] = df['first'] + ' ' + df['last']",
    python_path="/usr/bin/python3",
)
ParameterDefaultDescription
name(required)Node display name
inputNoneUpstream node reference
language"python"Script language
script""Inline script to execute
python_path""Path to Python interpreter

Quality and logic

quality_check

Validate data against rules. Fails the pipeline if checks do not pass.

from brokoli import quality_check

qc = quality_check(
    "Validate",
    input=clean,
    rules=[
        {"type": "not_null", "columns": ["id", "email"]},
        {"type": "unique", "columns": ["id"]},
        {"type": "row_count", "min": 1},
    ],
)
ParameterDefaultDescription
name(required)Node display name
inputNoneUpstream node reference
rulesNoneList of quality check rule objects

condition_node

Branch pipeline execution based on an expression.

from brokoli import condition_node

gate = condition_node(
    "Check Row Count",
    expression="row_count > 0",
    input=src,
)
ParameterDefaultDescription
name(required)Node display name
expression""Boolean expression to evaluate
inputNoneUpstream node reference

Sinks

sink_db

Write data to a SQL database.

from brokoli import sink_db

out = sink_db(
    "Write to Warehouse",
    input=clean,
    table="analytics.users",
    mode="append",
    conn_id="warehouse",
    retries=2,
)
ParameterDefaultDescription
name(required)Node display name
inputNoneUpstream node reference
table""Target table name
mode"append"Write mode ("append", "replace", "upsert")
conn_id""Saved connection ID
uri""Direct database URI
retries0Number of retry attempts

sink_file

Write data to a local file.

from brokoli import sink_file

out = sink_file(
    "Export CSV",
    input=clean,
    path="/data/output.csv",
    format="csv",
    compress="gzip",
)
ParameterDefaultDescription
name(required)Node display name
inputNoneUpstream node reference
path""Output file path
format"csv"File format ("csv", "json", "parquet")
compress""Compression ("", "gzip", "snappy")

sink_api

Send data to an HTTP API.

from brokoli import sink_api

out = sink_api(
    "Post Results",
    input=clean,
    url="https://api.example.com/ingest",
    method="POST",
    headers={"Content-Type": "application/json"},
)
ParameterDefaultDescription
name(required)Node display name
inputNoneUpstream node reference
url""Request URL
method"POST"HTTP method
body""Request body template
headersNoneDictionary of HTTP headers

Operations

migrate

Move data directly between two databases in a single node.

from brokoli import migrate

m = migrate(
    "Replicate Users",
    source_conn_id="prod-pg",
    target_conn_id="warehouse",
    query="SELECT * FROM users",
    table="raw.users",
    mode="append",
)
ParameterDefaultDescription
name(required)Node display name
source_uri""Source database URI
target_uri""Target database URI
query""SQL query to run on the source
table""Target table name
mode"append"Write mode
source_conn_id""Source saved connection ID
target_conn_id""Target saved connection ID

dbt

Run a dbt command as a pipeline node.

from brokoli import dbt

d = dbt(
    "dbt Run",
    command="run",
    project_dir="/opt/dbt/my_project",
    target="prod",
    select="tag:daily",
)
ParameterDefaultDescription
name(required)Node display name
command"run"dbt command ("run", "test", "build", "seed")
project_dir""Path to dbt project directory
target""dbt target/profile name
select""dbt node selection syntax
profiles_dir""Path to dbt profiles directory
vars""JSON string of dbt vars
inputNoneUpstream node reference (for ordering)

notify

Send a notification (webhook, Slack, etc.).

from brokoli import notify

n = notify(
    "Alert on Complete",
    input=out,
    notify_type="webhook",
    webhook_url="https://hooks.slack.com/services/T.../B.../xxx",
    message="Pipeline finished successfully",
)
ParameterDefaultDescription
name(required)Node display name
inputNoneUpstream node reference
notify_type"webhook"Notification type ("webhook")
webhook_url""Webhook URL
message""Notification message
channel""Channel identifier

Parallel execution

parallel

Combine multiple node references so they can feed into a downstream node together.

from brokoli import parallel, source_db, transform, sink_db

with Pipeline("parallel-sources") as p:
    users = source_db("Users", conn_id="pg", query="SELECT * FROM users")
    orders = source_db("Orders", conn_id="pg", query="SELECT * FROM orders")
    merge = transform("Merge", rules=[{"type": "concat"}])
    out = sink_db("Write", input=merge, table="combined", conn_id="warehouse")

    parallel(users, orders) >> merge >> out

pipeline_id for Git Sync

When using Git Sync, set pipeline_id to a stable slug. This lets Brokoli match pipelines across deployments instead of creating duplicates:

with Pipeline("Daily User Export", pipeline_id="daily-user-export") as p:
    ...

Without pipeline_id, updates create new pipelines instead of updating existing ones.

to_yaml()

Export the pipeline as clean, readable YAML. Multi-line strings (scripts, SQL queries) are rendered as block scalars — no \n escapes:

from brokoli import Pipeline, source_db, sink_db

with Pipeline("daily-etl", schedule="0 5 * * *") as p:
    src = source_db("Load", query="SELECT *\nFROM users\nWHERE active = true", conn_id="pg")
    snk = sink_db("Save", table="output", conn_id="warehouse")
    src >> snk

print(p.to_yaml())

Output:

name: daily-etl
schedule: 0 5 * * *
enabled: true
nodes:
- id: load_a1b2c3
  type: source_db
  name: Load
  config:
    query: |
      SELECT *
      FROM users
      WHERE active = true
    conn_id: pg
- id: save_d4e5f6
  type: sink_db
  name: Save
  config:
    table: output
    mode: append
    conn_id: warehouse
edges:
- from: load_a1b2c3
  to: save_d4e5f6

to_json()

Returns a Python dict for API payloads or when you need JSON:

import json
print(json.dumps(p.to_json(), indent=2))

Use to_yaml() for human-readable output and to_json() when sending to the API programmatically.

Lifecycle callbacks

Use on_start, on_success, and on_failure to run custom logic at pipeline lifecycle events:

def alert_on_failure(context):
    print(f"Pipeline {context['pipeline_name']} failed: {context['error']}")

with Pipeline(
    "critical-etl",
    schedule="0 6 * * *",
    on_failure=alert_on_failure,
    max_retries=2,
) as p:
    ...

Complete example

A realistic pipeline that extracts from two sources, validates, transforms, and loads:

from brokoli import (
    Pipeline,
    source_db,
    source_api,
    join,
    quality_check,
    transform,
    sink_db,
    notify,
)

with Pipeline(
    "user-enrichment",
    pipeline_id="user-enrichment",
    description="Enrich user profiles with latest activity data",
    schedule="0 */4 * * *",
    tags=["etl", "users"],
    max_retries=1,
    concurrency=4,
) as p:
    users = source_db(
        "Load Users",
        conn_id="prod-pg",
        query="SELECT id, email, name FROM users WHERE active = true",
        retries=2,
    )

    activity = source_api(
        "Fetch Activity",
        url="https://analytics.internal/api/v1/activity",
        headers={"Authorization": "Bearer ${var.analytics_token}"},
        timeout=60,
    )

    merged = join(
        "Join Users + Activity",
        left=users,
        right=activity,
        on="user_id",
        how="left",
    )

    qc = quality_check(
        "Validate",
        input=merged,
        rules=[
            {"type": "not_null", "columns": ["id", "email"]},
            {"type": "row_count", "min": 100},
        ],
    )

    clean = transform(
        "Normalize",
        input=qc,
        rules=[
            {"type": "rename", "old": "email", "new": "email_address"},
            {"type": "drop_columns", "columns": ["raw_json"]},
        ],
    )

    out = sink_db(
        "Write to Warehouse",
        input=clean,
        table="analytics.enriched_users",
        mode="append",
        conn_id="warehouse",
    )

    done = notify(
        "Slack Notification",
        input=out,
        notify_type="webhook",
        webhook_url="https://hooks.slack.com/services/T.../B.../xxx",
        message="User enrichment pipeline completed",
    )

    users >> merged
    activity >> merged
    merged >> qc >> clean >> out >> done

Deploy with the CLI:

brokoli deploy

Helper types

TypeDescription
NodeRefReturned by every node function. Supports >> for chaining
_MultiRefReturned by parallel(). Supports >> to connect multiple upstream nodes at once
TaskResultResult object available in lifecycle callbacks