Node Types
Reference for all 15 node types with config options, input/output, and examples.
Reference for all 15 node types. Each section covers config options, input/output, and examples.
Overview
| Type | Category | Input | Output | Description |
|---|---|---|---|---|
source_file | Source | -- | DataSet | Read local files |
source_api | Source | -- | DataSet | HTTP request |
source_db | Source | -- | DataSet | SQL query |
transform | Transform | DataSet | DataSet | Apply transform rules |
code | Transform | DataSet | DataSet | Run Python script |
join | Transform | 2x DataSet | DataSet | Join two datasets |
sql_generate | Transform | DataSet | SQL text | Generate SQL statements |
quality_check | Logic | DataSet | DataSet | Assert data quality |
condition | Logic | DataSet | DataSet | Branch on expression |
sink_file | Sink | DataSet | -- | Write to file |
sink_db | Sink | DataSet | -- | Insert into database |
sink_api | Sink | DataSet | -- | POST to API |
dbt | Operation | -- | DataSet | Run dbt CLI commands |
notify | Notification | DataSet | DataSet | Send Slack/webhook notification |
migrate | Operation | -- | -- | Run DB migrations |
source_file
Read data from CSV, JSON, Parquet, or Excel files.
Config:
| Key | Type | Required | Description |
|---|---|---|---|
path | string | yes | File path (absolute or relative to data dirs) |
format | string | no | csv, json, parquet, excel (auto-detected from extension) |
delimiter | string | no | CSV delimiter (default: ,) |
has_header | bool | no | Whether CSV has a header row (default: true) |
Example:
{
"path": "/data/sales.csv",
"format": "csv",
"delimiter": ","
}File security: Set
BROKOLI_DATA_DIRSto restrict which directories Brokoli can read. Default: unrestricted.
source_api
Fetch data from a REST API endpoint.
Config:
| Key | Type | Required | Description |
|---|---|---|---|
url | string | yes | Full URL or ${conn.name} reference |
method | string | no | GET (default) or POST |
headers | object | no | HTTP headers |
body | string | no | Request body (for POST) |
conn_id | string | no | Connection ID for auth |
Example:
{
"url": "https://api.example.com/users",
"method": "GET",
"headers": {"Authorization": "Bearer ${var.api_token}"}
}source_db
Run a SQL query against a database connection.
Config:
| Key | Type | Required | Description |
|---|---|---|---|
conn_id | string | yes | Connection ID |
query | string | yes | SQL query |
Example:
{
"conn_id": "prod-postgres",
"query": "SELECT id, name, email FROM users WHERE active = true"
}transform
Apply one or more transformation rules to the incoming dataset. Rules execute in order.
Config:
| Key | Type | Required | Description |
|---|---|---|---|
rules | array | yes | List of transform rules |
Transform rule types
| Rule | Required fields | Description |
|---|---|---|
rename_columns | mapping | Rename columns: {"old": "new"} |
add_column | name, expression | Add computed column |
filter_rows | condition | Keep rows matching condition |
apply_function | column, function | Apply lower, upper, trim, title |
replace_values | column, mapping | Map values: {"old": "new"} |
drop_columns | columns | Remove columns |
sort | columns, ascending | Sort by columns |
deduplicate | columns | Remove duplicates by key columns |
aggregate | group_by, agg_fields | Group and aggregate |
Short aliases: You can use short aliases instead of the full rule names: rename for rename_columns, filter for filter_rows, function for apply_function, replace for replace_values, drop for drop_columns, dedup for deduplicate, agg for aggregate.
Aggregate example:
{
"rules": [{
"type": "aggregate",
"group_by": ["department"],
"agg_fields": [
{"column": "salary", "function": "avg", "alias": "avg_salary"},
{"column": "id", "function": "count", "alias": "headcount"}
]
}]
}Supported aggregate functions: sum, count, avg, min, max.
Filter example:
{
"rules": [
{"type": "filter_rows", "condition": "status = active"},
{"type": "filter_rows", "condition": "age != 0"},
{"type": "filter_rows", "condition": "country in [US, UK, DE]"}
]
}code
Run a custom Python script. The script receives columns (list), rows (list of dicts), config (dict), and params (dict). Set output_data to return results.
Config:
| Key | Type | Required | Description |
|---|---|---|---|
script | string | yes | Python code |
timeout | int | no | Timeout in seconds (default: 30) |
python_path | string | no | Path to Python binary |
Example:
{
"script": "rows = [r for r in rows if r.get('score', 0) > 50]\noutput_data = {'columns': columns, 'rows': rows}",
"timeout": 60
}For datasets over 10,000 rows, Brokoli automatically switches to file-based transfer (NDJSON or CSV) for faster processing. If pyarrow or pandas are installed, they're used automatically.
join
Join two input datasets on a common key.
Config:
| Key | Type | Required | Description |
|---|---|---|---|
left_key | string | yes | Column name to join on from the left dataset |
right_key | string | no | Column name to join on from the right dataset (defaults to left_key if omitted) |
join_type | string | no | inner (default), left, right, outer |
Connect two upstream nodes to a join node. The first input is the left table, the second is the right table.
Example:
{
"left_key": "user_id",
"right_key": "user_id",
"join_type": "left"
}sql_generate
Generate SQL statements from the input dataset. Useful for creating bulk INSERT or UPSERT scripts.
Config:
| Key | Type | Required | Description |
|---|---|---|---|
table | string | yes | Target table name |
mode | string | no | insert (default) or upsert |
conflict_key | string | no | Column for upsert conflict detection |
quality_check
Assert data quality rules. See Quality Checks for full details.
Config:
| Key | Type | Required | Description |
|---|---|---|---|
checks | array | yes | List of quality check rules |
Example:
{
"checks": [
{"column": "email", "rule": "not_null", "on_failure": "block"},
{"column": "id", "rule": "unique", "on_failure": "warn"}
]
}condition
Branch pipeline execution based on data. See Conditions for details.
Config:
| Key | Type | Required | Description |
|---|---|---|---|
expression | string | yes | Condition expression |
sink_file
Write the input dataset to a file.
Config:
| Key | Type | Required | Description |
|---|---|---|---|
path | string | yes | Output file path |
format | string | no | csv (default), json, parquet |
delimiter | string | no | CSV delimiter |
sink_db
Write rows to a database table.
Config:
| Key | Type | Required | Description |
|---|---|---|---|
conn_id | string | yes | Connection ID |
table | string | yes | Target table name |
mode | string | no | insert (default), upsert, replace |
conflict_key | string | no | Column for upsert conflict detection |
Example:
{
"conn_id": "warehouse-pg",
"table": "dim_users",
"mode": "upsert",
"conflict_key": "user_id"
}sink_api
POST the dataset to a REST API endpoint.
Config:
| Key | Type | Required | Description |
|---|---|---|---|
url | string | yes | Target URL |
method | string | no | POST (default) or PUT |
headers | object | no | HTTP headers |
batch_size | int | no | Rows per request (default: all) |
conn_id | string | no | Connection ID for auth |
dbt
Run dbt CLI commands and capture structured results. Requires dbt-core installed and accessible on PATH.
Config:
| Key | Type | Required | Description |
|---|---|---|---|
command | string | no | dbt command to run (default: run). Allowed: run, test, build, seed, snapshot, compile, ls |
project_dir | string | no | Path to dbt project directory (default: .) |
profiles_dir | string | no | Path to dbt profiles directory |
target | string | no | dbt target to use |
select | string | no | dbt model selection syntax (e.g., my_model+, tag:nightly) |
vars | string | no | JSON string of dbt variables (e.g., {"start_date": "2024-01-01"}) |
Output: A dataset with columns model, status, execution_time, message parsed from dbt's JSON output.
Example:
{
"command": "run",
"project_dir": "/opt/dbt/my_project",
"target": "prod",
"select": "tag:daily"
}Runs dbt <command> --project-dir <project_dir> --output json. If JSON parsing fails, falls back to reading target/run_results.json.
notify
Send a notification via Slack or a generic webhook. This is a pass-through node -- it returns the input data unchanged.
Config:
| Key | Type | Required | Description |
|---|---|---|---|
notify_type | string | no | webhook (default) or slack |
webhook_url | string | yes | URL to send the notification to |
message | string | no | Notification message (default: Pipeline <name> completed) |
channel | string | no | Slack channel override (only for slack type) |
Message templates: Use {{pipeline}}, {{run_id}}, and {{rows}} as placeholders in your message string. They are replaced at runtime with the pipeline name, current run ID, and input row count.
Slack mode (notify_type: "slack"): Sends {"text": "<message>", "channel": "<channel>"} to the webhook URL.
Webhook mode (notify_type: "webhook"): Sends {"pipeline": "<name>", "run_id": "<id>", "message": "<message>", "row_count": <n>, "timestamp": "<iso8601>"} to the webhook URL.
Example:
{
"notify_type": "slack",
"webhook_url": "https://hooks.slack.com/services/T00/B00/xxx",
"message": "Pipeline {{pipeline}} finished with {{rows}} rows",
"channel": "#data-alerts"
}migrate
Run database schema migrations.
Config:
| Key | Type | Required | Description |
|---|---|---|---|
conn_id | string | yes | Connection ID |
sql | string | yes | SQL migration script |
Example:
{
"conn_id": "prod-postgres",
"sql": "ALTER TABLE users ADD COLUMN last_login TIMESTAMP;"
}Common config keys
These keys work on any node type:
| Key | Type | Description |
|---|---|---|
max_retries | int | Number of retry attempts (default: 0) |
retry_delay | int | Base delay between retries in ms (default: 1000) |
timeout | int | Node timeout in seconds (default: 1800) |