Brokoli
Pipelines

Node Types

Reference for all 15 node types with config options, input/output, and examples.

Reference for all 15 node types. Each section covers config options, input/output, and examples.

Overview

TypeCategoryInputOutputDescription
source_fileSource--DataSetRead local files
source_apiSource--DataSetHTTP request
source_dbSource--DataSetSQL query
transformTransformDataSetDataSetApply transform rules
codeTransformDataSetDataSetRun Python script
joinTransform2x DataSetDataSetJoin two datasets
sql_generateTransformDataSetSQL textGenerate SQL statements
quality_checkLogicDataSetDataSetAssert data quality
conditionLogicDataSetDataSetBranch on expression
sink_fileSinkDataSet--Write to file
sink_dbSinkDataSet--Insert into database
sink_apiSinkDataSet--POST to API
dbtOperation--DataSetRun dbt CLI commands
notifyNotificationDataSetDataSetSend Slack/webhook notification
migrateOperation----Run DB migrations

source_file

Read data from CSV, JSON, Parquet, or Excel files.

Config:

KeyTypeRequiredDescription
pathstringyesFile path (absolute or relative to data dirs)
formatstringnocsv, json, parquet, excel (auto-detected from extension)
delimiterstringnoCSV delimiter (default: ,)
has_headerboolnoWhether CSV has a header row (default: true)

Example:

{
  "path": "/data/sales.csv",
  "format": "csv",
  "delimiter": ","
}

File security: Set BROKOLI_DATA_DIRS to restrict which directories Brokoli can read. Default: unrestricted.


source_api

Fetch data from a REST API endpoint.

Config:

KeyTypeRequiredDescription
urlstringyesFull URL or ${conn.name} reference
methodstringnoGET (default) or POST
headersobjectnoHTTP headers
bodystringnoRequest body (for POST)
conn_idstringnoConnection ID for auth

Example:

{
  "url": "https://api.example.com/users",
  "method": "GET",
  "headers": {"Authorization": "Bearer ${var.api_token}"}
}

source_db

Run a SQL query against a database connection.

Config:

KeyTypeRequiredDescription
conn_idstringyesConnection ID
querystringyesSQL query

Example:

{
  "conn_id": "prod-postgres",
  "query": "SELECT id, name, email FROM users WHERE active = true"
}

transform

Apply one or more transformation rules to the incoming dataset. Rules execute in order.

Config:

KeyTypeRequiredDescription
rulesarrayyesList of transform rules

Transform rule types

RuleRequired fieldsDescription
rename_columnsmappingRename columns: {"old": "new"}
add_columnname, expressionAdd computed column
filter_rowsconditionKeep rows matching condition
apply_functioncolumn, functionApply lower, upper, trim, title
replace_valuescolumn, mappingMap values: {"old": "new"}
drop_columnscolumnsRemove columns
sortcolumns, ascendingSort by columns
deduplicatecolumnsRemove duplicates by key columns
aggregategroup_by, agg_fieldsGroup and aggregate

Short aliases: You can use short aliases instead of the full rule names: rename for rename_columns, filter for filter_rows, function for apply_function, replace for replace_values, drop for drop_columns, dedup for deduplicate, agg for aggregate.

Aggregate example:

{
  "rules": [{
    "type": "aggregate",
    "group_by": ["department"],
    "agg_fields": [
      {"column": "salary", "function": "avg", "alias": "avg_salary"},
      {"column": "id", "function": "count", "alias": "headcount"}
    ]
  }]
}

Supported aggregate functions: sum, count, avg, min, max.

Filter example:

{
  "rules": [
    {"type": "filter_rows", "condition": "status = active"},
    {"type": "filter_rows", "condition": "age != 0"},
    {"type": "filter_rows", "condition": "country in [US, UK, DE]"}
  ]
}

code

Run a custom Python script. The script receives columns (list), rows (list of dicts), config (dict), and params (dict). Set output_data to return results.

Config:

KeyTypeRequiredDescription
scriptstringyesPython code
timeoutintnoTimeout in seconds (default: 30)
python_pathstringnoPath to Python binary

Example:

{
  "script": "rows = [r for r in rows if r.get('score', 0) > 50]\noutput_data = {'columns': columns, 'rows': rows}",
  "timeout": 60
}

For datasets over 10,000 rows, Brokoli automatically switches to file-based transfer (NDJSON or CSV) for faster processing. If pyarrow or pandas are installed, they're used automatically.


join

Join two input datasets on a common key.

Config:

KeyTypeRequiredDescription
left_keystringyesColumn name to join on from the left dataset
right_keystringnoColumn name to join on from the right dataset (defaults to left_key if omitted)
join_typestringnoinner (default), left, right, outer

Connect two upstream nodes to a join node. The first input is the left table, the second is the right table.

Example:

{
  "left_key": "user_id",
  "right_key": "user_id",
  "join_type": "left"
}

sql_generate

Generate SQL statements from the input dataset. Useful for creating bulk INSERT or UPSERT scripts.

Config:

KeyTypeRequiredDescription
tablestringyesTarget table name
modestringnoinsert (default) or upsert
conflict_keystringnoColumn for upsert conflict detection

quality_check

Assert data quality rules. See Quality Checks for full details.

Config:

KeyTypeRequiredDescription
checksarrayyesList of quality check rules

Example:

{
  "checks": [
    {"column": "email", "rule": "not_null", "on_failure": "block"},
    {"column": "id", "rule": "unique", "on_failure": "warn"}
  ]
}

condition

Branch pipeline execution based on data. See Conditions for details.

Config:

KeyTypeRequiredDescription
expressionstringyesCondition expression

sink_file

Write the input dataset to a file.

Config:

KeyTypeRequiredDescription
pathstringyesOutput file path
formatstringnocsv (default), json, parquet
delimiterstringnoCSV delimiter

sink_db

Write rows to a database table.

Config:

KeyTypeRequiredDescription
conn_idstringyesConnection ID
tablestringyesTarget table name
modestringnoinsert (default), upsert, replace
conflict_keystringnoColumn for upsert conflict detection

Example:

{
  "conn_id": "warehouse-pg",
  "table": "dim_users",
  "mode": "upsert",
  "conflict_key": "user_id"
}

sink_api

POST the dataset to a REST API endpoint.

Config:

KeyTypeRequiredDescription
urlstringyesTarget URL
methodstringnoPOST (default) or PUT
headersobjectnoHTTP headers
batch_sizeintnoRows per request (default: all)
conn_idstringnoConnection ID for auth

dbt

Run dbt CLI commands and capture structured results. Requires dbt-core installed and accessible on PATH.

Config:

KeyTypeRequiredDescription
commandstringnodbt command to run (default: run). Allowed: run, test, build, seed, snapshot, compile, ls
project_dirstringnoPath to dbt project directory (default: .)
profiles_dirstringnoPath to dbt profiles directory
targetstringnodbt target to use
selectstringnodbt model selection syntax (e.g., my_model+, tag:nightly)
varsstringnoJSON string of dbt variables (e.g., {"start_date": "2024-01-01"})

Output: A dataset with columns model, status, execution_time, message parsed from dbt's JSON output.

Example:

{
  "command": "run",
  "project_dir": "/opt/dbt/my_project",
  "target": "prod",
  "select": "tag:daily"
}

Runs dbt <command> --project-dir <project_dir> --output json. If JSON parsing fails, falls back to reading target/run_results.json.


notify

Send a notification via Slack or a generic webhook. This is a pass-through node -- it returns the input data unchanged.

Config:

KeyTypeRequiredDescription
notify_typestringnowebhook (default) or slack
webhook_urlstringyesURL to send the notification to
messagestringnoNotification message (default: Pipeline <name> completed)
channelstringnoSlack channel override (only for slack type)

Message templates: Use {{pipeline}}, {{run_id}}, and {{rows}} as placeholders in your message string. They are replaced at runtime with the pipeline name, current run ID, and input row count.

Slack mode (notify_type: "slack"): Sends {"text": "<message>", "channel": "<channel>"} to the webhook URL.

Webhook mode (notify_type: "webhook"): Sends {"pipeline": "<name>", "run_id": "<id>", "message": "<message>", "row_count": <n>, "timestamp": "<iso8601>"} to the webhook URL.

Example:

{
  "notify_type": "slack",
  "webhook_url": "https://hooks.slack.com/services/T00/B00/xxx",
  "message": "Pipeline {{pipeline}} finished with {{rows}} rows",
  "channel": "#data-alerts"
}

migrate

Run database schema migrations.

Config:

KeyTypeRequiredDescription
conn_idstringyesConnection ID
sqlstringyesSQL migration script

Example:

{
  "conn_id": "prod-postgres",
  "sql": "ALTER TABLE users ADD COLUMN last_login TIMESTAMP;"
}

Common config keys

These keys work on any node type:

KeyTypeDescription
max_retriesintNumber of retry attempts (default: 0)
retry_delayintBase delay between retries in ms (default: 1000)
timeoutintNode timeout in seconds (default: 1800)