Brokoli
Pipelines

Conditions & Branching

Use condition nodes to create if/else branches in your pipeline.

Use condition nodes to create if/else branches in your pipeline.

How it works

A condition node evaluates an expression against the incoming dataset. Based on the result, downstream nodes on the "true" or "false" branch execute.

[Source] -> [Condition] -> true branch  -> [Sink A]
                        -> false branch -> [Sink B]

Configuration

KeyTypeRequiredDescription
expressionstringyesCondition to evaluate

Expression syntax

Condition expressions operate on the dataset as a whole:

ExpressionDescription
row_count > 0True if the dataset has any rows
row_count > 100True if more than 100 rows
row_count = 0True if the dataset is empty
column_exists("email")True if the column exists

Row-level conditions

For row-level filtering (keeping/removing rows), use a transform node with filter_rows instead. Condition nodes are for branching the pipeline execution path.

Example: Skip empty results

{
  "id": "check_data",
  "type": "condition",
  "name": "Has Data?",
  "config": {
    "expression": "row_count > 0"
  }
}

Connect two downstream paths:

  • True branch: process and sink the data
  • False branch: send an alert or log a warning

Example: Full pipeline with branching

JSON

{
  "nodes": [
    {"id": "src", "type": "source_api", "name": "Fetch Orders", "config": {"url": "https://api.example.com/orders"}},
    {"id": "cond", "type": "condition", "name": "Any Orders?", "config": {"expression": "row_count > 0"}},
    {"id": "process", "type": "transform", "name": "Process Orders", "config": {"rules": [{"type": "drop_columns", "columns": ["internal_id"]}]}},
    {"id": "notify", "type": "sink_api", "name": "Notify Empty", "config": {"url": "https://hooks.slack.com/...", "method": "POST"}}
  ],
  "edges": [
    {"from": "src", "to": "cond"},
    {"from": "cond", "to": "process"},
    {"from": "cond", "to": "notify"}
  ]
}

Python SDK

from brokoli import Pipeline, source_api, condition_node, transform, sink_api

with Pipeline("order-processing") as p:
    src = source_api("Fetch Orders", url="https://api.example.com/orders")
    check = condition_node("Any Orders?", expression="row_count > 0")
    process = transform("Process", rules=[
        {"type": "drop_columns", "columns": ["internal_id"]}
    ])
    notify = sink_api("Notify Empty", url="https://hooks.slack.com/...")

    src >> check
    check >> process  # true branch
    check >> notify   # false branch

Combining with quality checks

A common pattern is to run a quality check and branch based on the result:

[Source] -> [Quality Check] -> [Condition: passed?] -> [Sink]
                                                    -> [Alert]

This gives you fine-grained control over what happens when data quality assertions fail with on_failure: "warn" mode.