# Dataflow
_Path: en/framework/dataflow_
## Table of Contents
- Dataflow
## Content
# Dataflow
The `wippy/dataflow` module provides a workflow orchestration engine based on directed acyclic graphs (DAGs). Workflows are composed of nodes — functions, agents, cycles, and parallel processors — connected by typed data routes. The orchestrator manages execution, state persistence, and recovery.
## Setup
Add the module to your project:
```bash
wippy add wippy/dataflow
wippy install
```
Declare the dependency:
```yaml
version: "1.0"
namespace: app
entries:
- name: dep.dataflow
kind: ns.dependency
component: wippy/dataflow
version: "*"
```
The dataflow module depends on `wippy/agent`, `wippy/llm`, `wippy/session`, and `wippy/views` — these are resolved automatically when you run `wippy install`. The module requires a database resource at `app:db` for workflow persistence and runs migrations automatically via `wippy/migration`.
| Parameter | Required | Default | Description |
|-----------|----------|---------|-------------|
| `web_host_origin_env` | no | internal | Environment variable for the public web host origin URL |
## Flow Builder
The flow builder provides a fluent interface for composing workflows. Import it into your entry:
```yaml
imports:
flow: userspace.dataflow.flow:flow
```
```lua
local flow = require("flow")
```
### Core API
```lua
flow.create()
:with_title(title)
:with_metadata(metadata)
:with_input(data)
:with_data(data)
:[operation](config)
:as(name)
:to(target, input_key, transform)
:error_to(target, input_key, transform)
:when(condition)
:run() -- synchronous
:start() -- asynchronous
flow.template()
:[operations]...
```
### Linear Pipeline
Nodes chain automatically when no explicit routing is defined. Output of each node flows to the next:
```lua
local result, err = flow.create()
:with_input({ text = "Hello world" })
:func("app:tokenize")
:func("app:translate", { args = { target_lang = "fr" } })
:func("app:format_output")
:run()
```
### Named Routing
Use `:as()` to name nodes and `:to()` to route data between them. Only use `:as()` when the node needs to be referenced:
```lua
local result, err = flow.create()
:with_input(task)
:to("router")
:func("app:router"):as("router")
:to("context", "routing")
:to("dev", "routing")
:agent("app:context_agent"):as("context")
:to("dev", "gathered_context")
:agent("app:dev_agent"):as("dev")
:to("@success")
:run()
```
The second parameter to `:to()` is the **discriminator** — the input key at the receiving node. When a node receives multiple inputs, they are collected as a table keyed by discriminator.
### Workflow Input and Static Data
`:with_input()` is the single primary input to the workflow. `:with_data()` creates independent static data sources:
```lua
flow.create()
:with_input(task)
:to("router")
:with_data(config):as("cfg")
:to("dev", "config")
:to("logger", "config")
:with_data(branch):as("branch_data")
:to("checker", "branch")
:func("app:router"):as("router")
:to("dev", "task")
:func("app:dev"):as("dev")
:to("@success")
:error_to("@fail")
:run()
```
Use `:with_input()` for external data entering the workflow. Use `:with_data()` for config, constants, and reference data shared across multiple nodes. Static data uses reference optimization — the first route creates actual data, subsequent routes create lightweight references.
### Conditional Routing
Use `:when()` after `:to()` to add conditions. Conditions evaluate against the node's output using `expr` syntax:
```lua
flow.create()
:with_input(data)
:func("app:classify"):as("classify")
:to("handler_a"):when("output.category == 'a'")
:to("handler_b"):when("output.category == 'b'")
:to("fallback")
:func("app:handler_a"):as("handler_a"):to("@success")
:func("app:handler_b"):as("handler_b"):to("@success")
:func("app:fallback"):as("fallback"):to("@success")
:run()
```
Conditions can combine with inline transforms for more complex routing:
```lua
:func("app:decompose"):as("decompose")
:to("@success", nil, "{passed: true, feedback: nil}"):when("len(output.items) == 0")
:to("processor", "items", "output.items")
```
Conditional expressions support: comparisons (`output.score > 0.8`), logical operators (`output.valid && output.count > 5`), array functions (`len(output.items) > 0`, `any(output.errors, {.critical})`), string operations (`output.status contains 'success'`), and optional chaining (`output.data?.nested?.value`).
### Workflow Terminals
Route to `@success` or `@fail` to terminate the workflow explicitly. In nested contexts (cycles, parallel), terminals create node outputs instead of workflow outputs:
```lua
:func("app:final_step"):to("@success")
:func("app:handler"):error_to("@fail")
```
### Error Routing
Use `:error_to()` to route node errors to a handler. Errors can be routed as normal inputs to recovery nodes:
```lua
:agent("app:gpt_planner", { model = "gpt-5" }):as("gpt_planner")
:to("consolidator", "gpt_plan")
:error_to("consolidator", "gpt_plan")
:agent("app:claude_planner", { model = "claude-4-5-sonnet" }):as("claude_planner")
:to("consolidator", "claude_plan")
:error_to("consolidator", "claude_plan")
:agent("app:consolidator", {
inputs = { required = { "gpt_plan", "claude_plan" } }
}):as("consolidator")
```
This pattern runs both planners in parallel — if one fails, its error becomes the input for the consolidator, which proceeds with whatever results are available.
## Input Merging
How nodes receive inputs depends on discriminators and whether `args` is configured.
**Without args — single default input:**
```lua
:func("source"):to("target")
-- target receives: raw content (unwrapped)
```
**Without args — single named input:**
```lua
:func("source"):to("target", "task")
-- target receives: { task = content }
```
**Without args — multiple inputs:**
```lua
:func("source1"):to("target", "data")
:func("source2"):to("target", "config")
-- target receives: { data = content1, config = content2 }
```
**With args — inputs merge into base:**
```lua
:func("app:api_client", {
args = { base_url = "https://api.com", timeout = 5000 }
})
-- with :to("api_client", "body") from upstream
-- api_client receives: { base_url = "https://api.com", timeout = 5000, body = content }
```
Nodes with args cannot receive inputs with the "default" discriminator. Use named discriminators with :to(target, "input_key") instead.
## Input Transforms
Transform data before it reaches a node:
```lua
-- String transform: single expression
:func("app:step", { input_transform = "input.nested.field" })
-- Table transform: named expressions
:func("app:step", {
input_transform = {
task = "inputs.task",
config = "inputs.settings",
priority = "output.score > 0.8 ? 'high' : 'normal'"
}
})
```
Context variables available in transforms: `input` (workflow input), `inputs` (all incoming node inputs), `output` (current node's output when routing).
### Inline Route Transforms
The third parameter to `:to()` is an inline transform expression:
```lua
:func("source"):as("source")
:to("target", nil, "output.data")
:to("other", nil, "{passed: true, value: output.x}")
:to("list", nil, "map(output.items, {.id})")
```
### Function Node
Executes a registered `function.lua` entry:
```lua
:func("app:my_function", {
args = { key = "value" },
inputs = { required = { "task", "config" } },
context = { session_id = "abc" },
input_transform = { task = "inputs.prompt" },
metadata = { title = "Process Data" }
})
```
| Option | Type | Description |
|--------|------|-------------|
| `args` | table | Base arguments merged with node inputs |
| `inputs` | table | Input requirements: `{ required = {...}, optional = {...} }` |
| `context` | table | Execution context passed to function |
| `input_transform` | string/table | Expression to transform inputs |
| `metadata` | table | Node metadata (e.g., `{ title = "..." }`) |
If the function returns `{ _control = { commands = [...] } }`, the orchestrator spawns a child workflow. This is how nested flows work.
### Agent Node
Executes an agent with tool calling and optional structured exit:
```lua
:agent("app:content_writer", {
model = "gpt-5",
inputs = { required = { "context", "content_plan", "analysis" } },
arena = {
prompt = "Write content based on the provided context.",
max_iterations = 12,
tool_calling = "any",
exit_schema = {
type = "object",
properties = {
content = { type = "string" },
title = { type = "string" }
},
required = { "content", "title" }
}
},
show_tool_calls = true,
metadata = { title = "Content Writer" }
})
```
| Option | Type | Description |
|--------|------|-------------|
| `model` | string | Override model |
| `arena.prompt` | string | System prompt |
| `arena.max_iterations` | number | Max reasoning loops (default: 32) |
| `arena.min_iterations` | number | Min iterations before exit (default: 1) |
| `arena.tool_calling` | string | `"auto"`, `"any"`, `"none"` |
| `arena.tools` | array | Tool registry IDs |
| `arena.exit_schema` | table | JSON schema for structured exit |
| `arena.exit_func_id` | string | Function to validate exit output |
| `arena.context` | table | Additional context |
| `inputs` | table | Input requirements |
| `show_tool_calls` | boolean | Include tool calls in output |
| `input_transform` | string/table | Transform inputs |
| `metadata` | table | Node metadata |
**Dynamic agent selection:** Pass an empty string as agent ID and resolve it via `input_transform`:
```lua
:agent("", {
inputs = { required = { "spec", "task" } },
input_transform = {
agent_id = "inputs.spec.agent_id",
task = "inputs.task"
},
arena = {
prompt = "Process according to spec",
max_iterations = 25
}
})
```
**Exit validation:** When `exit_func_id` is set, the function validates the agent's exit output. On validation failure, the agent receives the error as observation and continues (up to `max_iterations`).
### Cycle Node
Iterates a function or template repeatedly with persistent state:
```lua
:cycle({
func_id = "app:content_cycle",
max_iterations = 3,
initial_state = {
entry_id = entry_id,
content_prompt = prompt,
min_score = 8.0,
feedback_history = {}
}
})
```
The cycle function receives on each iteration:
```lua
{
input = ,
state = ,
last_result = ,
iteration =
}
```
The function controls continuation:
```lua
function my_cycle(cycle_context)
-- stop if approved
if cycle_context.last_result and cycle_context.last_result.approved then
return {
state = cycle_context.state,
result = cycle_context.last_result,
continue = false
}
end
-- spawn child workflow for this iteration
return flow.create()
:with_input({ task = cycle_context.input.task })
:agent("app:worker")
:agent("app:qa")
:run()
end
```
| Option | Type | Description |
|--------|------|-------------|
| `func_id` | string | Iteration function (mutually exclusive with `template`) |
| `template` | FlowBuilder | Template for each iteration (mutually exclusive with `func_id`) |
| `max_iterations` | number | Maximum iterations |
| `initial_state` | table | Starting state |
| `continue_condition` | string | Expression: continue while true |
**Template-based cycle:**
```lua
:cycle({
template = flow.template()
:agent("app:worker")
:func("app:validator"),
max_iterations = 5
})
```
### Parallel Node
Map-reduce pattern over arrays:
```lua
:parallel({
inputs = { required = { "specs", "task" } },
source_array_key = "specs",
iteration_input_key = "spec",
passthrough_keys = { "task" },
batch_size = 10,
on_error = "continue",
filter = "successes",
unwrap = true,
template = flow.template()
:agent("app:processor", {
inputs = { required = { "spec", "task" } },
input_transform = {
agent_id = "inputs.spec.agent_id",
task = "inputs.task"
},
arena = {
prompt = "Process according to spec",
max_iterations = 25
}
})
:to("@success"),
metadata = { title = "Process Specs" }
})
```
| Option | Type | Description |
|--------|------|-------------|
| `source_array_key` | string | Input key containing the array (required) |
| `template` | FlowBuilder | Template for each item (required, must route to `@success`) |
| `iteration_input_key` | string | Input key for current item (default: `"default"`) |
| `batch_size` | number | Items per parallel batch (default: 1 = sequential) |
| `on_error` | string | `"continue"` (default) or `"fail_fast"` |
| `filter` | string | `"all"` (default), `"successes"`, `"failures"` |
| `unwrap` | boolean | Return raw results instead of wrapped metadata (default: false) |
| `passthrough_keys` | array | Input keys forwarded to every iteration |
**Passthrough keys** provide shared context (config, task description) to every iteration without duplicating data in the source array:
```lua
:with_data(file_list):as("files"):to("processor", "files")
:with_data("secret"):as("api_key"):to("processor", "api_key")
:parallel({
inputs = { required = { "files", "api_key" } },
source_array_key = "files",
iteration_input_key = "filename",
passthrough_keys = { "api_key" },
template = flow.template()
:func("app:upload", {
inputs = { required = { "filename", "api_key" } }
})
:to("@success")
}):as("processor")
```
### Join Node
Collects multiple inputs before proceeding:
```lua
:join({
inputs = { required = { "source1", "source2" } },
output_mode = "object",
ignored_keys = { "triggered" }
})
```
| Option | Type | Description |
|--------|------|-------------|
| `output_mode` | string | `"object"` (default) or `"array"` (arrival order) |
| `ignored_keys` | array | Input keys excluded from output |
| `inputs` | table | Input requirements |
## Templates
Templates define reusable sub-workflows. Use `flow.template()` to create, `:use()` to inline:
```lua
local preprocessor = flow.template()
:func("app:clean")
:func("app:tokenize")
flow.create()
:with_input(data)
:use(preprocessor)
:func("app:process")
:run()
```
Templates inline their operations into the parent flow at compile time.
## Nested Workflows
Functions used in cycles and parallel nodes can spawn child workflows by returning `flow.create():run()`:
```lua
function my_processor(input)
return flow.create()
:with_input(input)
:func("app:step_a")
:func("app:step_b")
:run()
end
```
When `:run()` executes inside an existing dataflow context, it returns `{ _control = { commands = [...] } }` instead of executing directly. The orchestrator handles the child workflow through the yield mechanism.
Functions that participate in dataflow composition must return flow.create():run(). Functions returning anything else cannot spawn child workflows.
## Synchronous vs Asynchronous
`:run()` blocks until the workflow completes and returns output:
```lua
local result, err = flow.create()
:with_input({ text = "hello" })
:func("app:process")
:run()
```
`:start()` returns immediately with a workflow ID:
```lua
local dataflow_id, err = flow.create()
:with_input({ text = "hello" })
:func("app:process")
:start()
```
`:start()` cannot be used in nested contexts.
## Client API
For programmatic workflow management:
```yaml
imports:
client: userspace.dataflow:client
```
```lua
local client = require("client")
local c, err = client.new()
```
| Method | Description |
|--------|-------------|
| `client.new()` | Create client (requires security actor) |
| `:create_workflow(commands, options?)` | Create workflow, returns `dataflow_id` |
| `:execute(dataflow_id, options?)` | Run synchronously, returns result |
| `:start(dataflow_id, options?)` | Run asynchronously, returns `dataflow_id` |
| `:output(dataflow_id)` | Fetch workflow outputs |
| `:get_status(dataflow_id)` | Get current status |
| `:cancel(dataflow_id, timeout?)` | Gracefully cancel (default: 30s) |
| `:terminate(dataflow_id)` | Force terminate |
## Workflow Status
| Status | Description |
|--------|-------------|
| `template` | Node is a template instance |
| `pending` | Waiting for inputs |
| `ready` | Inputs collected, ready to execute |
| `running` | Actively executing |
| `paused` | Yielded, waiting for child workflow |
| `completed` | Finished successfully |
| `failed` | Failed |
| `cancelled` | User cancelled |
| `skipped` | Conditional branch not taken |
| `terminated` | Force terminated |
## Metadata
```lua
flow.create()
:with_title("Document Processing Pipeline")
:with_metadata({ source = "api", priority = "high" })
:func("app:process", { metadata = { title = "Process Document" } })
:run()
```
Title defaults to "Flow Builder Workflow" if not provided.
## Validation Rules
The compiler validates workflows at compile time:
- All `:as(name)` names must be unique
- All `:to()` and `:error_to()` targets must reference existing names (except `@success`, `@fail`)
- Graph must be acyclic
- All nodes must have incoming routes (from another node, workflow input, or static data)
- `:cycle()` requires `func_id` or `template` (not both)
- `:parallel()` requires `source_array_key` and `template`
- At least one path must lead to `@success` or have auto-output
- `:when()` only follows `:to()` or `:error_to()` from nodes (not static data)
- Nodes with `args` cannot receive inputs with `"default"` discriminator
## Expression Reference
Expressions use the `expr` module syntax, available in `:when()` conditions and `input_transform` values.
**Operators:** `+`, `-`, `*`, `/`, `%`, `**`, `==`, `!=`, `<`, `<=`, `>`, `>=`, `&&`, `||`, `!`, `contains`, `startsWith`, `endsWith`
**Array functions:** `all()`, `any()`, `none()`, `one()`, `filter()`, `map()`, `count()`, `len()`, `first()`, `last()`
**Math functions:** `max()`, `min()`, `abs()`, `ceil()`, `floor()`, `round()`, `sqrt()`, `pow()`
**String functions:** `len()`, `upper()`, `lower()`, `trim()`, `split()`, `join()`
**Type functions:** `type()`, `int()`, `float()`, `string()`
**Literals:** numbers, strings, booleans (`true`/`false`), null (`nil`), arrays (`[1, 2, 3]`), objects (`{key: value}`)
**Ternary:** `output.age >= 18 ? output.verified : false`
**Optional chaining:** `output.data?.nested?.value`
## Error Handling
Both `:run()` and `:start()` follow standard Lua error conventions:
- Success: `data, nil` (run) or `dataflow_id, nil` (start)
- Failure: `nil, error_message`
Error categories: compilation errors, client errors, workflow creation errors, execution errors, and workflow failures.
## See Also
- [Agents](agents.md) - Agent framework used by agent nodes
- [LLM](llm.md) - LLM module
- [Framework Overview](overview.md) - Framework module usage
## Navigation
Previous: Testing (framework/testing)
Next: Relay (framework/relay)