Pipeline Overview
How pipelines work in Brokoli -- creation, validation, execution, and lifecycle.
How pipelines work in Brokoli: creation, validation, execution, and lifecycle.
What is a pipeline
A pipeline is a DAG (directed acyclic graph) that defines a data processing workflow. Data flows from source nodes through transforms to sink nodes. The engine guarantees that each node runs only after all its upstream dependencies complete.
Creating pipelines
Visual Editor
The drag-and-drop editor at /pipelines/new lets you:
- Drag node types from the palette onto the canvas
- Connect nodes by dragging between ports
- Configure each node in the side panel
- Validate before saving (cycles, missing config)
- Dry-run to preview data at each step
Python SDK
from brokoli import Pipeline, source_db, transform, sink_file
with Pipeline("my-pipeline") as p:
src = source_db("Load Users", conn_id="prod-pg", query="SELECT * FROM users")
clean = transform("Deduplicate", rules=[
{"type": "deduplicate", "columns": ["email"]}
])
sink = sink_file("Export", path="/data/users.csv", format="csv")
src >> clean >> sink
p.deploy("http://localhost:8080")REST API
curl -X POST http://localhost:8080/api/pipelines \
-H "Content-Type: application/json" \
-d '{
"name": "My Pipeline",
"enabled": true,
"nodes": [...],
"edges": [...]
}'Execution model
Brokoli uses Kahn's algorithm with wave-based parallel execution:
Wave 1: [source_a] [source_b] <- both run in parallel
Wave 2: [join] <- waits for both sources
Wave 3: [transform] [quality] <- parallel again
Wave 4: [sink] <- finalWithin each wave, nodes execute concurrently up to the parallelism limit (default: 4 nodes).
Data passing
Each node receives a DataSet (columns + rows) from its upstream node(s) and produces a DataSet as output. The engine stores outputs in memory during a run and passes them to downstream nodes.
For multi-input nodes like join, all upstream DataSets are passed as a list.
Retries
Configure per-node retry behavior:
{
"max_retries": 3,
"retry_delay": 1000
}Retries use exponential backoff: delay * 2^(attempt-1), capped at 60 seconds.
Timeouts
Each node has a 30-minute default timeout. Override per node:
{
"timeout": 300
}The timeout value is in seconds.
Validation
Before saving, the API validates pipelines for:
- Non-empty name (max 255 characters)
- No duplicate node IDs
- All edge endpoints reference existing nodes
- No cycles in the graph
- Valid cron expression (if schedule is set)
- Valid timezone (if schedule_timezone is set)
- Max 500 nodes per pipeline
Pipeline lifecycle
| Operation | Method | Endpoint |
|---|---|---|
| Create | POST | /api/pipelines |
| List | GET | /api/pipelines |
| Get | GET | /api/pipelines/{id} |
| Update | PUT | /api/pipelines/{id} |
| Delete | DELETE | /api/pipelines/{id} |
| Clone | POST | /api/pipelines/{id}/clone |
| Export (JSON) | GET | /api/pipelines/{id}/export |
| Import | POST | /api/pipelines/import |
| Validate | GET | /api/pipelines/{id}/validate |
Versioning
Every update creates a version snapshot. View history and rollback:
# List versions
curl http://localhost:8080/api/pipelines/{id}/versions
# Rollback to a previous version
curl -X POST http://localhost:8080/api/pipelines/{id}/rollback \
-H "Content-Type: application/json" \
-d '{"version_id": "abc123"}'Dry run
Preview pipeline output without persisting a real run:
curl -X POST http://localhost:8080/api/pipelines/{id}/dry-runReturns the first 10 rows from each node -- useful for testing transforms in the editor.
Parameters
Pipelines can define default parameters that are available in node configs via ${param.key}:
{
"params": {
"date": "2024-01-01",
"limit": "1000"
}
}Override at runtime:
curl -X POST http://localhost:8080/api/pipelines/{id}/run \
-H "Content-Type: application/json" \
-d '{"params": {"date": "2024-06-15"}}'Tags
Organize pipelines with tags for filtering:
{
"tags": ["production", "finance", "daily"]
}Next steps
- Node Types -- detailed reference for all 15 node types
- Scheduling -- cron, timezones, dependencies
- API Reference -- full API documentation