Dataflow
The wippy/dataflow module provides a workflow orchestration engine based on directed acyclic graphs (DAGs). Workflows are composed of nodes — functions, agents, cycles, and parallel processors — connected by typed data routes. The orchestrator manages execution, state persistence, and recovery.
Setup
Add the module to your project:
wippy add wippy/dataflow
wippy install
Declare the dependency:
version: "1.0"
namespace: app
entries:
- name: dep.dataflow
kind: ns.dependency
component: wippy/dataflow
version: "*"
The dataflow module depends on wippy/agent, wippy/llm, and wippy/session — these are resolved automatically when you run wippy install. The module requires a database resource at app:db for workflow persistence and runs migrations automatically via wippy/migration.
The module publishes an env.variable entry userspace.dataflow.env:web_host_origin (default https://front.wippy.ai) that downstream flows can read for building public URLs. Override it through the env router or a requirement.
Flow Builder
The flow builder provides a fluent interface for composing workflows. Import it into your entry:
imports:
flow: userspace.dataflow.flow:flow
local flow = require("flow")
Core API
flow.create()
:with_title(title)
:with_metadata(metadata)
:with_input(data)
:with_data(data)
:[operation](config)
:as(name)
:to(target, input_key, transform)
:error_to(target, input_key, transform)
:when(condition)
:run() -- synchronous
:start() -- asynchronous
flow.template()
:[operations]...
Linear Pipeline
Nodes chain automatically when no explicit routing is defined. Output of each node flows to the next:
local result, err = flow.create()
:with_input({ text = "Hello world" })
:func("app:tokenize")
:func("app:translate", { args = { target_lang = "fr" } })
:func("app:format_output")
:run()
Named Routing
Use :as() to name nodes and :to() to route data between them. Only use :as() when the node needs to be referenced:
local result, err = flow.create()
:with_input(task)
:to("router")
:func("app:router"):as("router")
:to("context", "routing")
:to("dev", "routing")
:agent("app:context_agent"):as("context")
:to("dev", "gathered_context")
:agent("app:dev_agent"):as("dev")
:to("@success")
:run()
The second parameter to :to() is the discriminator — the input key at the receiving node. When a node receives multiple inputs, they are collected as a table keyed by discriminator.
Workflow Input and Static Data
:with_input() is the single primary input to the workflow. :with_data() creates independent static data sources:
flow.create()
:with_input(task)
:to("router")
:with_data(config):as("cfg")
:to("dev", "config")
:to("logger", "config")
:with_data(branch):as("branch_data")
:to("checker", "branch")
:func("app:router"):as("router")
:to("dev", "task")
:func("app:dev"):as("dev")
:to("@success")
:error_to("@fail")
:run()
Use :with_input() for external data entering the workflow. Use :with_data() for config, constants, and reference data shared across multiple nodes. Static data uses reference optimization — the first route creates actual data, subsequent routes create lightweight references.
Conditional Routing
Use :when() after :to() to add conditions. Conditions evaluate against the node's output using expr syntax:
flow.create()
:with_input(data)
:func("app:classify"):as("classify")
:to("handler_a"):when("output.category == 'a'")
:to("handler_b"):when("output.category == 'b'")
:to("fallback")
:func("app:handler_a"):as("handler_a"):to("@success")
:func("app:handler_b"):as("handler_b"):to("@success")
:func("app:fallback"):as("fallback"):to("@success")
:run()
Conditions can combine with inline transforms for more complex routing:
:func("app:decompose"):as("decompose")
:to("@success", nil, "{passed: true, feedback: nil}"):when("len(output.items) == 0")
:to("processor", "items", "output.items")
Conditional expressions support: comparisons (output.score > 0.8), logical operators (output.valid && output.count > 5), array functions (len(output.items) > 0, any(output.errors, {.critical})), string operations (output.status contains 'success'), and optional chaining (output.data?.nested?.value).
Workflow Terminals
Route to @success or @fail to terminate the workflow explicitly. In nested contexts (cycles, parallel), terminals create node outputs instead of workflow outputs:
:func("app:final_step"):to("@success")
:func("app:handler"):error_to("@fail")
Error Routing
Use :error_to() to route node errors to a handler. Errors can be routed as normal inputs to recovery nodes:
:agent("app:gpt_planner", { model = "gpt-5" }):as("gpt_planner")
:to("consolidator", "gpt_plan")
:error_to("consolidator", "gpt_plan")
:agent("app:claude_planner", { model = "claude-4-5-sonnet" }):as("claude_planner")
:to("consolidator", "claude_plan")
:error_to("consolidator", "claude_plan")
:agent("app:consolidator", {
inputs = { required = { "gpt_plan", "claude_plan" } }
}):as("consolidator")
This pattern runs both planners in parallel — if one fails, its error becomes the input for the consolidator, which proceeds with whatever results are available.
Input Merging
How nodes receive inputs depends on discriminators and whether args is configured.
Without args — single default input:
:func("source"):to("target")
-- target receives: raw content (unwrapped)
Without args — single named input:
:func("source"):to("target", "task")
-- target receives: { task = content }
Without args — multiple inputs:
:func("source1"):to("target", "data")
:func("source2"):to("target", "config")
-- target receives: { data = content1, config = content2 }
With args — inputs merge into base:
:func("app:api_client", {
args = { base_url = "https://api.com", timeout = 5000 }
})
-- with :to("api_client", "body") from upstream
-- api_client receives: { base_url = "https://api.com", timeout = 5000, body = content }
args cannot receive inputs with the "default" discriminator. Use named discriminators with :to(target, "input_key") instead.
Input Transforms
Transform data before it reaches a node:
-- String transform: single expression
:func("app:step", { input_transform = "input.nested.field" })
-- Table transform: named expressions
:func("app:step", {
input_transform = {
task = "inputs.task",
config = "inputs.settings",
priority = "output.score > 0.8 ? 'high' : 'normal'"
}
})
Context variables available in transforms: input (workflow input), inputs (all incoming node inputs), output (current node's output when routing).
Inline Route Transforms
The third parameter to :to() is an inline transform expression:
:func("source"):as("source")
:to("target", nil, "output.data")
:to("other", nil, "{passed: true, value: output.x}")
:to("list", nil, "map(output.items, {.id})")
Node Types
Function Node
Executes a registered function.lua entry:
:func("app:my_function", {
args = { key = "value" },
inputs = { required = { "task", "config" } },
context = { session_id = "abc" },
input_transform = { task = "inputs.prompt" },
metadata = { title = "Process Data" }
})
| Option | Type | Description |
|---|---|---|
args |
table | Base arguments merged with node inputs |
inputs |
table | Input requirements: { required = {...}, optional = {...} } |
context |
table | Execution context passed to function |
input_transform |
string/table | Expression to transform inputs |
metadata |
table | Node metadata (e.g., { title = "..." }) |
If the function returns { _control = { commands = [...] } }, the orchestrator spawns a child workflow. This is how nested flows work.
Agent Node
Executes an agent with tool calling and optional structured exit:
:agent("app:content_writer", {
model = "gpt-5",
inputs = { required = { "context", "content_plan", "analysis" } },
arena = {
prompt = "Write content based on the provided context.",
max_iterations = 12,
tool_calling = "any",
exit_schema = {
type = "object",
properties = {
content = { type = "string" },
title = { type = "string" }
},
required = { "content", "title" }
}
},
show_tool_calls = true,
metadata = { title = "Content Writer" }
})
| Option | Type | Description |
|---|---|---|
model |
string | Override model |
arena.prompt |
string | System prompt |
arena.max_iterations |
number | Max reasoning loops (default: 64) |
arena.min_iterations |
number | Min iterations before exit (default: 1) |
arena.tool_calling |
string | "auto", "any" (erfordert exit_schema), "none" (lehnt exit_schema ab) |
arena.tools |
array | Tool registry IDs |
arena.exit_schema |
table | JSON schema for structured exit |
arena.exit_func_id |
string | Function to validate exit output |
arena.context |
table | Additional context |
inputs |
table | Input requirements |
show_tool_calls |
boolean | Include tool calls in output |
input_transform |
string/table | Transform inputs |
metadata |
table | Node metadata |
Dynamic agent selection: Pass an empty string as agent ID and resolve it via input_transform:
:agent("", {
inputs = { required = { "spec", "task" } },
input_transform = {
agent_id = "inputs.spec.agent_id",
task = "inputs.task"
},
arena = {
prompt = "Process according to spec",
max_iterations = 25
}
})
Exit validation: When exit_func_id is set, the function validates the agent's exit output. On validation failure, the agent receives the error as observation and continues (up to max_iterations).
Cycle Node
Iterates a function or template repeatedly with persistent state:
:cycle({
func_id = "app:content_cycle",
max_iterations = 3,
initial_state = {
entry_id = entry_id,
content_prompt = prompt,
min_score = 8.0,
feedback_history = {}
}
})
The cycle function receives on each iteration:
{
input = <workflow_input>,
state = <accumulated_state>,
last_result = <previous_iteration_output>,
iteration = <current_iteration_number>
}
The function controls continuation:
function my_cycle(cycle_context)
-- stop if approved
if cycle_context.last_result and cycle_context.last_result.approved then
return {
state = cycle_context.state,
result = cycle_context.last_result,
continue = false
}
end
-- spawn child workflow for this iteration
return flow.create()
:with_input({ task = cycle_context.input.task })
:agent("app:worker")
:agent("app:qa")
:run()
end
| Option | Type | Description |
|---|---|---|
func_id |
string | Iteration function (mutually exclusive with template) |
template |
FlowBuilder | Template for each iteration (mutually exclusive with func_id) |
max_iterations |
number | Maximum iterations |
initial_state |
table | Starting state |
continue_condition |
string | Expression: continue while true |
Template-based cycle:
:cycle({
template = flow.template()
:agent("app:worker")
:func("app:validator"),
max_iterations = 5
})
Parallel Node
Map-reduce pattern over arrays:
:parallel({
inputs = { required = { "specs", "task" } },
source_array_key = "specs",
iteration_input_key = "spec",
passthrough_keys = { "task" },
batch_size = 10,
on_error = "collect_errors",
filter = "successes",
unwrap = true,
template = flow.template()
:agent("app:processor", {
inputs = { required = { "spec", "task" } },
input_transform = {
agent_id = "inputs.spec.agent_id",
task = "inputs.task"
},
arena = {
prompt = "Process according to spec",
max_iterations = 25
}
})
:to("@success"),
metadata = { title = "Process Specs" }
})
| Option | Type | Description |
|---|---|---|
source_array_key |
string | Input key containing the array (required) |
template |
FlowBuilder | Template for each item (required, must route to @success) |
iteration_input_key |
string | Input key for current item (default: "default") |
batch_size |
number | Items per parallel batch (default: 1 = sequential) |
on_error |
string | "collect_errors" (default) or "fail_fast" |
filter |
string | "all" (default), "successes", "failures" |
unwrap |
boolean | Return raw results instead of wrapped metadata (default: false) |
passthrough_keys |
array | Input keys forwarded to every iteration |
Passthrough keys provide shared context (config, task description) to every iteration without duplicating data in the source array:
:with_data(file_list):as("files"):to("processor", "files")
:with_data("secret"):as("api_key"):to("processor", "api_key")
:parallel({
inputs = { required = { "files", "api_key" } },
source_array_key = "files",
iteration_input_key = "filename",
passthrough_keys = { "api_key" },
template = flow.template()
:func("app:upload", {
inputs = { required = { "filename", "api_key" } }
})
:to("@success")
}):as("processor")
Signal Node
Pausiert die Ausführung, bis ein externes Signal eintrifft. Wird für menschliche Freigaben, externe Ereignisse oder mehrstufige Workflows verwendet:
:signal({
signal_id = "approval",
inputs = { required = { "draft" } },
metadata = { title = "Wait for approval" }
})
| Option | Typ | Beschreibung |
|---|---|---|
signal_id |
string | Signalname, der mit client:signal() abgeglichen wird. Wenn leer oder weggelassen, wird zur Laufzeit eine UUID v7 generiert |
inputs |
table | Input-Anforderungen |
input_transform |
string/table | Transformiert Inputs, bevor der Knoten sie erhält |
metadata |
table | Knoten-Metadaten |
Senden Sie das Signal von außerhalb des Workflows über die Client-API (siehe client:signal() unten).
Verhalten
Der Knoten yieldet mit wait_for_signal = true und persistiert diesen Yield im Workflow-Zustand. Der Orchestrator nimmt den Knoten wieder auf, wenn ein passender NODE_SIGNAL-Commit eintrifft.
- Das Signal wird durch jede nicht-
nilPayload erfüllt.false,0,""und{}erfüllen den Yield alle; nurnillässt ihn ausstehend. - Ein Signal-Yield blockiert
COMPLETE_WORKFLOW, aber nicht andere ausstehende Knoten — parallele Zweige werden weiter ausgeführt, während ein Zweig wartet. - Signale können vor
:start()vorab in die Warteschlange gestellt werden: Wenn ein passenderNODE_SIGNAL-Commit eintrifft, bevor der Signal-Knoten den Yield erreicht, wird er in dem Moment zugestellt, in dem der Yield erfasst wird. - Nur ein Signal erfüllt jeden Yield. Wenn ein zweites Signal mit derselben
signal_ideintrifft, bevor der Yield erfüllt ist, überschreibt es das erste. - Wenn mehrere Signal-Yields dieselbe
signal_idteilen, erhält der erste passende Yield die Daten. - Wenn das Feld
signal_idfehlt, fällt der Abgleich auf den Diskriminator des Knotens zurück. - Die zugestellten Signaldaten werden als Signal-Payload an den Output des Knotens übergeben.
Dauerhaftigkeit und Wiederherstellung
Der Signal-Yield ist Teil des Workflow-Zustands und wird über denselben Outbox-Mechanismus wie jedes andere Kommando persistiert. Wenn der Orchestrator-Prozess während des Wartens beendet wird:
- Der ausstehende Yield wird beim Neustart wiederhergestellt.
- Während des Ausfalls zugestellte Signale werden in die Warteschlange gestellt und beim erneuten Laden des Zustands angewendet.
- Verbund-Pipelines (
func → signal → signal → func) erholen sich schrittweise — jedes Signal kann über einen separaten Neustart hinweg zugestellt werden.
Verwaiste Signal-Yields (Yields, deren Elternprozess ohne Abschluss beendet wurde) werden vom Process-Exit-Handler des Workflow-Zustands bereinigt.
Pipeline-Muster
Signal-Knoten nehmen an jeder Topologie teil:
-- Human-in-the-Loop-Freigabe zwischen zwei Funktionen
flow.create()
:func("app:draft")
:signal({ signal_id = "approve_draft" })
:func("app:publish")
:run()
-- Zwei parallele Freigaben, die beide vor der Veröffentlichung eintreffen müssen
flow.create()
:with_input({ doc = "release-notes" })
:as("trigger")
:to("legal", "doc")
:to("finance", "doc")
:signal({ signal_id = "legal_ok", inputs = { required = { "doc" } } })
:as("legal")
:to("gate", "legal")
:signal({ signal_id = "finance_ok", inputs = { required = { "doc" } } })
:as("finance")
:to("gate", "finance")
:join({ inputs = { required = { "legal", "finance" } } })
:as("gate")
:to("release")
:func("app:release"):as("release"):to("@success")
:run()
Signaldaten werden als Knoten-Output bereitgestellt, sodass nachgelagerte Knoten alles erhalten, was an client:signal() übergeben wurde.
Join Node
Collects multiple inputs before proceeding:
:join({
inputs = { required = { "source1", "source2" } },
output_mode = "object",
ignored_keys = { "triggered" }
})
| Option | Type | Description |
|---|---|---|
output_mode |
string | "object" (default) or "array" (arrival order) |
ignored_keys |
array | Input keys excluded from output |
inputs |
table | Input requirements |
Templates
Templates define reusable sub-workflows. Use flow.template() to create, :use() to inline:
local preprocessor = flow.template()
:func("app:clean")
:func("app:tokenize")
flow.create()
:with_input(data)
:use(preprocessor)
:func("app:process")
:run()
Templates inline their operations into the parent flow at compile time.
Nested Workflows
Functions used in cycles and parallel nodes can spawn child workflows by returning flow.create():run():
function my_processor(input)
return flow.create()
:with_input(input)
:func("app:step_a")
:func("app:step_b")
:run()
end
When :run() executes inside an existing dataflow context, it returns { _control = { commands = [...] } } instead of executing directly. The orchestrator handles the child workflow through the yield mechanism.
flow.create():run(). Functions returning anything else cannot spawn child workflows.
Synchronous vs Asynchronous
:run() blocks until the workflow completes and returns output:
local result, err = flow.create()
:with_input({ text = "hello" })
:func("app:process")
:run()
:start() returns immediately with a workflow ID:
local dataflow_id, err = flow.create()
:with_input({ text = "hello" })
:func("app:process")
:start()
:start() cannot be used in nested contexts.
Client API
For programmatic workflow management:
imports:
client: userspace.dataflow:client
local client = require("client")
local c, err = client.new()
| Method | Description |
|---|---|
client.new() |
Create client (requires security actor) |
:create_workflow(commands, options?) |
Create workflow, returns dataflow_id |
:execute(dataflow_id, options?) |
Run synchronously, returns result |
:start(dataflow_id, options?) |
Run asynchronously, returns dataflow_id |
:output(dataflow_id) |
Fetch workflow outputs |
:get_status(dataflow_id) |
Get current status |
:cancel(dataflow_id, timeout?) |
Gracefully cancel (default: 30s) |
:terminate(dataflow_id) |
Force terminate |
:signal(dataflow_id, signal_id, data?) |
Liefert ein externes Signal an einen wartenden Signal-Knoten |
Workflow Status
| Status | Description |
|---|---|
template |
Node is a template instance |
pending |
Waiting for inputs |
ready |
Inputs collected, ready to execute |
running |
Actively executing |
paused |
Yielded, waiting for child workflow |
completed |
Finished successfully |
failed |
Failed |
cancelled |
User cancelled |
skipped |
Conditional branch not taken |
terminated |
Force terminated |
Metadata
flow.create()
:with_title("Document Processing Pipeline")
:with_metadata({ source = "api", priority = "high" })
:func("app:process", { metadata = { title = "Process Document" } })
:run()
Title defaults to "Flow Builder Workflow" if not provided.
Validation Rules
The compiler validates workflows at compile time:
- All
:as(name)names must be unique - All
:to()and:error_to()targets must reference existing names (except@success,@fail) - Graph must be acyclic
- All nodes must have incoming routes (from another node, workflow input, or static data)
:cycle()requiresfunc_idortemplate(not both):parallel()requiressource_array_keyandtemplate- At least one path must lead to
@successor have auto-output :when()only follows:to()or:error_to()from nodes (not static data)- Nodes with
argscannot receive inputs with"default"discriminator
Expression Reference
Expressions use the expr module syntax, available in :when() conditions and input_transform values.
Operators: +, -, *, /, %, **, ==, !=, <, <=, >, >=, &&, ||, !, contains, startsWith, endsWith
Array functions: all(), any(), none(), one(), filter(), map(), count(), len(), first(), last()
Math functions: max(), min(), abs(), ceil(), floor(), round(), sqrt(), pow()
String functions: len(), upper(), lower(), trim(), split(), join()
Type functions: type(), int(), float(), string()
Literals: numbers, strings, booleans (true/false), null (nil), arrays ([1, 2, 3]), objects ({key: value})
Ternary: output.age >= 18 ? output.verified : false
Optional chaining: output.data?.nested?.value
Error Handling
Both :run() and :start() follow standard Lua error conventions:
- Success:
data, nil(run) ordataflow_id, nil(start) - Failure:
nil, error_message
Error categories: compilation errors, client errors, workflow creation errors, execution errors, and workflow failures.
See Also
- Agents - Agent framework used by agent nodes
- LLM - LLM module
- Framework Overview - Framework module usage