Brokoli
Getting Started

Your First Pipeline

Build a real pipeline that fetches data from an API, transforms it, and saves the result.

Build a real pipeline that fetches data from an API, transforms it, and saves the result. Three methods: visual editor, Python SDK, and raw JSON.

The pipeline

We'll build an ETL pipeline that:

  1. Fetches posts from an API
  2. Filters to keep only the first 10 posts
  3. Adds a fetched_at timestamp column
  4. Saves to a CSV file

Using the visual editor

Step 1: Create the source

Drag a Source API node onto the canvas:

  • Name: Fetch Posts
  • URL: https://jsonplaceholder.typicode.com/posts
  • Method: GET

Step 2: Add transforms

Drag a Transform node and connect it to the source:

  • Name: Clean Data
  • Add two transform rules:
[
  {"type": "filter_rows", "condition": "id in [1,2,3,4,5,6,7,8,9,10]"},
  {"type": "add_column", "name": "fetched_at", "expression": "2024-01-01"}
]

Step 3: Add the sink

Drag a Sink File node and connect it to the transform:

  • Name: Save CSV
  • Path: /tmp/posts.csv
  • Format: csv

Step 4: Save and run

Click Save, then Run. Watch the nodes execute in sequence on the canvas.

Using the Python SDK

from brokoli import Pipeline, source_api, transform, sink_file

with Pipeline("api-to-csv", description="Fetch posts and save as CSV") as p:
    source = source_api(
        "Fetch Posts",
        url="https://jsonplaceholder.typicode.com/posts",
        method="GET",
    )

    xform = transform("Clean Data", rules=[
        {"type": "filter_rows", "condition": "id in [1,2,3,4,5,6,7,8,9,10]"},
        {"type": "add_column", "name": "fetched_at", "expression": "2024-01-01"},
    ])

    sink = sink_file("Save CSV", path="/tmp/posts.csv", format="csv")

    source >> xform >> sink

# Deploy to your Brokoli server
p.deploy("http://localhost:8080")

Using the REST API

curl -X POST http://localhost:8080/api/pipelines \
  -H "Content-Type: application/json" \
  -d '{
    "name": "API to CSV",
    "description": "Fetch posts and save as CSV",
    "enabled": true,
    "nodes": [
      {
        "id": "source1",
        "type": "source_api",
        "name": "Fetch Posts",
        "config": {
          "url": "https://jsonplaceholder.typicode.com/posts",
          "method": "GET"
        },
        "position": {"x": 100, "y": 200}
      },
      {
        "id": "transform1",
        "type": "transform",
        "name": "Clean Data",
        "config": {
          "rules": [
            {"type": "filter_rows", "condition": "id in [1,2,3,4,5,6,7,8,9,10]"},
            {"type": "add_column", "name": "fetched_at", "expression": "2024-01-01"}
          ]
        },
        "position": {"x": 400, "y": 200}
      },
      {
        "id": "sink1",
        "type": "sink_file",
        "name": "Save CSV",
        "config": {"path": "/tmp/posts.csv", "format": "csv"},
        "position": {"x": 700, "y": 200}
      }
    ],
    "edges": [
      {"from": "source1", "to": "transform1"},
      {"from": "transform1", "to": "sink1"}
    ]
  }'

Trigger a run:

curl -X POST http://localhost:8080/api/pipelines/{pipeline_id}/run

Using variables

Reference stored variables and connections in node config using ${var.name} and ${conn.name} syntax:

{
  "url": "${var.api_base_url}/posts",
  "headers": {"Authorization": "Bearer ${var.api_token}"}
}

Set variables via the UI (Settings > Variables) or API:

curl -X POST http://localhost:8080/api/variables \
  -H "Content-Type: application/json" \
  -d '{"key": "api_base_url", "value": "https://jsonplaceholder.typicode.com", "type": "string"}'

Next steps