Getting Started
Your First Pipeline
Build a real pipeline that fetches data from an API, transforms it, and saves the result.
Build a real pipeline that fetches data from an API, transforms it, and saves the result. Three methods: visual editor, Python SDK, and raw JSON.
The pipeline
We'll build an ETL pipeline that:
- Fetches posts from an API
- Filters to keep only the first 10 posts
- Adds a
fetched_attimestamp column - Saves to a CSV file
Using the visual editor
Step 1: Create the source
Drag a Source API node onto the canvas:
- Name:
Fetch Posts - URL:
https://jsonplaceholder.typicode.com/posts - Method:
GET
Step 2: Add transforms
Drag a Transform node and connect it to the source:
- Name:
Clean Data - Add two transform rules:
[
{"type": "filter_rows", "condition": "id in [1,2,3,4,5,6,7,8,9,10]"},
{"type": "add_column", "name": "fetched_at", "expression": "2024-01-01"}
]Step 3: Add the sink
Drag a Sink File node and connect it to the transform:
- Name:
Save CSV - Path:
/tmp/posts.csv - Format:
csv
Step 4: Save and run
Click Save, then Run. Watch the nodes execute in sequence on the canvas.
Using the Python SDK
from brokoli import Pipeline, source_api, transform, sink_file
with Pipeline("api-to-csv", description="Fetch posts and save as CSV") as p:
source = source_api(
"Fetch Posts",
url="https://jsonplaceholder.typicode.com/posts",
method="GET",
)
xform = transform("Clean Data", rules=[
{"type": "filter_rows", "condition": "id in [1,2,3,4,5,6,7,8,9,10]"},
{"type": "add_column", "name": "fetched_at", "expression": "2024-01-01"},
])
sink = sink_file("Save CSV", path="/tmp/posts.csv", format="csv")
source >> xform >> sink
# Deploy to your Brokoli server
p.deploy("http://localhost:8080")Using the REST API
curl -X POST http://localhost:8080/api/pipelines \
-H "Content-Type: application/json" \
-d '{
"name": "API to CSV",
"description": "Fetch posts and save as CSV",
"enabled": true,
"nodes": [
{
"id": "source1",
"type": "source_api",
"name": "Fetch Posts",
"config": {
"url": "https://jsonplaceholder.typicode.com/posts",
"method": "GET"
},
"position": {"x": 100, "y": 200}
},
{
"id": "transform1",
"type": "transform",
"name": "Clean Data",
"config": {
"rules": [
{"type": "filter_rows", "condition": "id in [1,2,3,4,5,6,7,8,9,10]"},
{"type": "add_column", "name": "fetched_at", "expression": "2024-01-01"}
]
},
"position": {"x": 400, "y": 200}
},
{
"id": "sink1",
"type": "sink_file",
"name": "Save CSV",
"config": {"path": "/tmp/posts.csv", "format": "csv"},
"position": {"x": 700, "y": 200}
}
],
"edges": [
{"from": "source1", "to": "transform1"},
{"from": "transform1", "to": "sink1"}
]
}'Trigger a run:
curl -X POST http://localhost:8080/api/pipelines/{pipeline_id}/runUsing variables
Reference stored variables and connections in node config using ${var.name} and ${conn.name} syntax:
{
"url": "${var.api_base_url}/posts",
"headers": {"Authorization": "Bearer ${var.api_token}"}
}Set variables via the UI (Settings > Variables) or API:
curl -X POST http://localhost:8080/api/variables \
-H "Content-Type: application/json" \
-d '{"key": "api_base_url", "value": "https://jsonplaceholder.typicode.com", "type": "string"}'Next steps
- Core Concepts -- understand pipelines, nodes, and edges
- Scheduling -- run pipelines on a cron schedule
- Quality Checks -- add data assertions