Pipelines
Conditions & Branching
Use condition nodes to create if/else branches in your pipeline.
Use condition nodes to create if/else branches in your pipeline.
How it works
A condition node evaluates an expression against the incoming dataset. Based on the result, downstream nodes on the "true" or "false" branch execute.
[Source] -> [Condition] -> true branch -> [Sink A]
-> false branch -> [Sink B]Configuration
| Key | Type | Required | Description |
|---|---|---|---|
expression | string | yes | Condition to evaluate |
Expression syntax
Condition expressions operate on the dataset as a whole:
| Expression | Description |
|---|---|
row_count > 0 | True if the dataset has any rows |
row_count > 100 | True if more than 100 rows |
row_count = 0 | True if the dataset is empty |
column_exists("email") | True if the column exists |
Row-level conditions
For row-level filtering (keeping/removing rows), use a transform node with filter_rows instead. Condition nodes are for branching the pipeline execution path.
Example: Skip empty results
{
"id": "check_data",
"type": "condition",
"name": "Has Data?",
"config": {
"expression": "row_count > 0"
}
}Connect two downstream paths:
- True branch: process and sink the data
- False branch: send an alert or log a warning
Example: Full pipeline with branching
JSON
{
"nodes": [
{"id": "src", "type": "source_api", "name": "Fetch Orders", "config": {"url": "https://api.example.com/orders"}},
{"id": "cond", "type": "condition", "name": "Any Orders?", "config": {"expression": "row_count > 0"}},
{"id": "process", "type": "transform", "name": "Process Orders", "config": {"rules": [{"type": "drop_columns", "columns": ["internal_id"]}]}},
{"id": "notify", "type": "sink_api", "name": "Notify Empty", "config": {"url": "https://hooks.slack.com/...", "method": "POST"}}
],
"edges": [
{"from": "src", "to": "cond"},
{"from": "cond", "to": "process"},
{"from": "cond", "to": "notify"}
]
}Python SDK
from brokoli import Pipeline, source_api, condition_node, transform, sink_api
with Pipeline("order-processing") as p:
src = source_api("Fetch Orders", url="https://api.example.com/orders")
check = condition_node("Any Orders?", expression="row_count > 0")
process = transform("Process", rules=[
{"type": "drop_columns", "columns": ["internal_id"]}
])
notify = sink_api("Notify Empty", url="https://hooks.slack.com/...")
src >> check
check >> process # true branch
check >> notify # false branchCombining with quality checks
A common pattern is to run a quality check and branch based on the result:
[Source] -> [Quality Check] -> [Condition: passed?] -> [Sink]
-> [Alert]This gives you fine-grained control over what happens when data quality assertions fail with on_failure: "warn" mode.