Brokoli
Pipelines

Pipeline Overview

How pipelines work in Brokoli -- creation, validation, execution, and lifecycle.

How pipelines work in Brokoli: creation, validation, execution, and lifecycle.

What is a pipeline

A pipeline is a DAG (directed acyclic graph) that defines a data processing workflow. Data flows from source nodes through transforms to sink nodes. The engine guarantees that each node runs only after all its upstream dependencies complete.

Creating pipelines

Visual Editor

The drag-and-drop editor at /pipelines/new lets you:

  • Drag node types from the palette onto the canvas
  • Connect nodes by dragging between ports
  • Configure each node in the side panel
  • Validate before saving (cycles, missing config)
  • Dry-run to preview data at each step

Python SDK

from brokoli import Pipeline, source_db, transform, sink_file

with Pipeline("my-pipeline") as p:
    src = source_db("Load Users", conn_id="prod-pg", query="SELECT * FROM users")
    clean = transform("Deduplicate", rules=[
        {"type": "deduplicate", "columns": ["email"]}
    ])
    sink = sink_file("Export", path="/data/users.csv", format="csv")
    src >> clean >> sink

p.deploy("http://localhost:8080")

REST API

curl -X POST http://localhost:8080/api/pipelines \
  -H "Content-Type: application/json" \
  -d '{
    "name": "My Pipeline",
    "enabled": true,
    "nodes": [...],
    "edges": [...]
  }'

Execution model

Brokoli uses Kahn's algorithm with wave-based parallel execution:

Wave 1:  [source_a]  [source_b]     <- both run in parallel
Wave 2:  [join]                       <- waits for both sources
Wave 3:  [transform]  [quality]       <- parallel again
Wave 4:  [sink]                       <- final

Within each wave, nodes execute concurrently up to the parallelism limit (default: 4 nodes).

Data passing

Each node receives a DataSet (columns + rows) from its upstream node(s) and produces a DataSet as output. The engine stores outputs in memory during a run and passes them to downstream nodes.

For multi-input nodes like join, all upstream DataSets are passed as a list.

Retries

Configure per-node retry behavior:

{
  "max_retries": 3,
  "retry_delay": 1000
}

Retries use exponential backoff: delay * 2^(attempt-1), capped at 60 seconds.

Timeouts

Each node has a 30-minute default timeout. Override per node:

{
  "timeout": 300
}

The timeout value is in seconds.

Validation

Before saving, the API validates pipelines for:

  • Non-empty name (max 255 characters)
  • No duplicate node IDs
  • All edge endpoints reference existing nodes
  • No cycles in the graph
  • Valid cron expression (if schedule is set)
  • Valid timezone (if schedule_timezone is set)
  • Max 500 nodes per pipeline

Pipeline lifecycle

OperationMethodEndpoint
CreatePOST/api/pipelines
ListGET/api/pipelines
GetGET/api/pipelines/{id}
UpdatePUT/api/pipelines/{id}
DeleteDELETE/api/pipelines/{id}
ClonePOST/api/pipelines/{id}/clone
Export (JSON)GET/api/pipelines/{id}/export
ImportPOST/api/pipelines/import
ValidateGET/api/pipelines/{id}/validate

Versioning

Every update creates a version snapshot. View history and rollback:

# List versions
curl http://localhost:8080/api/pipelines/{id}/versions

# Rollback to a previous version
curl -X POST http://localhost:8080/api/pipelines/{id}/rollback \
  -H "Content-Type: application/json" \
  -d '{"version_id": "abc123"}'

Dry run

Preview pipeline output without persisting a real run:

curl -X POST http://localhost:8080/api/pipelines/{id}/dry-run

Returns the first 10 rows from each node -- useful for testing transforms in the editor.

Parameters

Pipelines can define default parameters that are available in node configs via ${param.key}:

{
  "params": {
    "date": "2024-01-01",
    "limit": "1000"
  }
}

Override at runtime:

curl -X POST http://localhost:8080/api/pipelines/{id}/run \
  -H "Content-Type: application/json" \
  -d '{"params": {"date": "2024-06-15"}}'

Tags

Organize pipelines with tags for filtering:

{
  "tags": ["production", "finance", "daily"]
}

Next steps