Dataflow
El módulo wippy/dataflow proporciona un motor de orquestación de workflows basado en grafos acíclicos dirigidos (DAG). Los workflows se componen de nodos — funciones, agentes, ciclos y procesadores paralelos — conectados por rutas de datos tipadas. El orquestador gestiona la ejecución, la persistencia del estado y la recuperación.
Instalación
Añade el módulo a tu proyecto:
wippy add wippy/dataflow
wippy install
Declara la dependencia:
version: "1.0"
namespace: app
entries:
- name: dep.dataflow
kind: ns.dependency
component: wippy/dataflow
version: "*"
El módulo dataflow depende de wippy/agent, wippy/llm y wippy/session — estos se resuelven automáticamente cuando ejecutas wippy install. El módulo requiere un recurso de base de datos en app:db para la persistencia del workflow y ejecuta las migraciones automáticamente vía wippy/migration.
El módulo publica una entrada env.variable userspace.dataflow.env:web_host_origin (por defecto https://front.wippy.ai) que los flujos descendentes pueden leer para construir URLs públicas. Sobrescríbela a través del router de env o una requirement.
Flow Builder
El flow builder proporciona una interfaz fluida para componer workflows. Impórtalo a tu entrada:
imports:
flow: userspace.dataflow.flow:flow
local flow = require("flow")
API principal
flow.create()
:with_title(title)
:with_metadata(metadata)
:with_input(data)
:with_data(data)
:[operation](config)
:as(name)
:to(target, input_key, transform)
:error_to(target, input_key, transform)
:when(condition)
:run() -- synchronous
:start() -- asynchronous
flow.template()
:[operations]...
Pipeline lineal
Los nodos se encadenan automáticamente cuando no se define un enrutado explícito. La salida de cada nodo fluye al siguiente:
local result, err = flow.create()
:with_input({ text = "Hello world" })
:func("app:tokenize")
:func("app:translate", { args = { target_lang = "fr" } })
:func("app:format_output")
:run()
Enrutado con nombre
Usa :as() para nombrar nodos y :to() para enrutar datos entre ellos. Usa :as() solo cuando sea necesario referenciar el nodo:
local result, err = flow.create()
:with_input(task)
:to("router")
:func("app:router"):as("router")
:to("context", "routing")
:to("dev", "routing")
:agent("app:context_agent"):as("context")
:to("dev", "gathered_context")
:agent("app:dev_agent"):as("dev")
:to("@success")
:run()
El segundo parámetro de :to() es el discriminador — la clave de entrada en el nodo receptor. Cuando un nodo recibe varias entradas, se recopilan como una tabla indexada por discriminador.
Entrada del workflow y datos estáticos
:with_input() es la única entrada primaria del workflow. :with_data() crea fuentes independientes de datos estáticos:
flow.create()
:with_input(task)
:to("router")
:with_data(config):as("cfg")
:to("dev", "config")
:to("logger", "config")
:with_data(branch):as("branch_data")
:to("checker", "branch")
:func("app:router"):as("router")
:to("dev", "task")
:func("app:dev"):as("dev")
:to("@success")
:error_to("@fail")
:run()
Usa :with_input() para datos externos que entran al workflow. Usa :with_data() para configuración, constantes y datos de referencia compartidos entre múltiples nodos. Los datos estáticos usan optimización por referencia — la primera ruta crea los datos reales, las rutas subsiguientes crean referencias ligeras.
Enrutado condicional
Usa :when() después de :to() para añadir condiciones. Las condiciones se evalúan contra la salida del nodo usando la sintaxis expr:
flow.create()
:with_input(data)
:func("app:classify"):as("classify")
:to("handler_a"):when("output.category == 'a'")
:to("handler_b"):when("output.category == 'b'")
:to("fallback")
:func("app:handler_a"):as("handler_a"):to("@success")
:func("app:handler_b"):as("handler_b"):to("@success")
:func("app:fallback"):as("fallback"):to("@success")
:run()
Las condiciones pueden combinarse con transformaciones en línea para un enrutado más complejo:
:func("app:decompose"):as("decompose")
:to("@success", nil, "{passed: true, feedback: nil}"):when("len(output.items) == 0")
:to("processor", "items", "output.items")
Las expresiones condicionales soportan: comparaciones (output.score > 0.8), operadores lógicos (output.valid && output.count > 5), funciones de array (len(output.items) > 0, any(output.errors, {.critical})), operaciones de cadena (output.status contains 'success') y encadenamiento opcional (output.data?.nested?.value).
Terminales del workflow
Enruta a @success o @fail para terminar el workflow explícitamente. En contextos anidados (ciclos, paralelo), los terminales crean salidas de nodo en lugar de salidas del workflow:
:func("app:final_step"):to("@success")
:func("app:handler"):error_to("@fail")
Enrutado de errores
Usa :error_to() para enrutar errores de nodos a un manejador. Los errores pueden enrutarse como entradas normales a nodos de recuperación:
:agent("app:gpt_planner", { model = "gpt-5" }):as("gpt_planner")
:to("consolidator", "gpt_plan")
:error_to("consolidator", "gpt_plan")
:agent("app:claude_planner", { model = "claude-4-5-sonnet" }):as("claude_planner")
:to("consolidator", "claude_plan")
:error_to("consolidator", "claude_plan")
:agent("app:consolidator", {
inputs = { required = { "gpt_plan", "claude_plan" } }
}):as("consolidator")
Este patrón ejecuta ambos planificadores en paralelo — si uno falla, su error se convierte en la entrada para el consolidador, que procede con los resultados que estén disponibles.
Fusión de entradas
Cómo un nodo recibe entradas depende de los discriminadores y de si args está configurado.
Sin args — entrada por defecto única:
:func("source"):to("target")
-- target receives: raw content (unwrapped)
Sin args — entrada con nombre única:
:func("source"):to("target", "task")
-- target receives: { task = content }
Sin args — múltiples entradas:
:func("source1"):to("target", "data")
:func("source2"):to("target", "config")
-- target receives: { data = content1, config = content2 }
Con args — las entradas se fusionan con la base:
:func("app:api_client", {
args = { base_url = "https://api.com", timeout = 5000 }
})
-- with :to("api_client", "body") from upstream
-- api_client receives: { base_url = "https://api.com", timeout = 5000, body = content }
args no pueden recibir entradas con el discriminador "default". Usa discriminadores con nombre mediante :to(target, "input_key") en su lugar.
Transformaciones de entrada
Transforma los datos antes de que lleguen a un nodo:
-- String transform: single expression
:func("app:step", { input_transform = "input.nested.field" })
-- Table transform: named expressions
:func("app:step", {
input_transform = {
task = "inputs.task",
config = "inputs.settings",
priority = "output.score > 0.8 ? 'high' : 'normal'"
}
})
Variables de contexto disponibles en las transformaciones: input (entrada del workflow), inputs (todas las entradas entrantes del nodo), output (salida del nodo actual al enrutar).
Transformaciones de ruta en línea
El tercer parámetro de :to() es una expresión de transformación en línea:
:func("source"):as("source")
:to("target", nil, "output.data")
:to("other", nil, "{passed: true, value: output.x}")
:to("list", nil, "map(output.items, {.id})")
Tipos de nodos
Nodo función
Ejecuta una entrada function.lua registrada:
:func("app:my_function", {
args = { key = "value" },
inputs = { required = { "task", "config" } },
context = { session_id = "abc" },
input_transform = { task = "inputs.prompt" },
metadata = { title = "Process Data" }
})
| Option | Type | Description |
|---|---|---|
args |
table | Argumentos base fusionados con las entradas del nodo |
inputs |
table | Requisitos de entrada: { required = {...}, optional = {...} } |
context |
table | Contexto de ejecución pasado a la función |
input_transform |
string/table | Expresión para transformar entradas |
metadata |
table | Metadatos del nodo (p. ej., { title = "..." }) |
Si la función devuelve { _control = { commands = [...] } }, el orquestador genera un workflow hijo. Así es como funcionan los flujos anidados.
Nodo agente
Ejecuta un agente con llamada a herramientas y salida estructurada opcional:
:agent("app:content_writer", {
model = "gpt-5",
inputs = { required = { "context", "content_plan", "analysis" } },
arena = {
prompt = "Write content based on the provided context.",
max_iterations = 12,
tool_calling = "any",
exit_schema = {
type = "object",
properties = {
content = { type = "string" },
title = { type = "string" }
},
required = { "content", "title" }
}
},
show_tool_calls = true,
metadata = { title = "Content Writer" }
})
| Option | Type | Description |
|---|---|---|
model |
string | Sobrescribir el modelo |
arena.prompt |
string | Prompt del sistema |
arena.max_iterations |
number | Máx. bucles de razonamiento (por defecto: 64) |
arena.min_iterations |
number | Mín. iteraciones antes de salir (por defecto: 1) |
arena.tool_calling |
string | "auto", "any" (requiere exit_schema), "none" (rechaza exit_schema) |
arena.tools |
array | IDs del registro de herramientas |
arena.exit_schema |
table | JSON schema para salida estructurada |
arena.exit_func_id |
string | Función para validar la salida de exit |
arena.context |
table | Contexto adicional |
inputs |
table | Requisitos de entrada |
show_tool_calls |
boolean | Incluir llamadas a herramientas en la salida |
input_transform |
string/table | Transformar entradas |
metadata |
table | Metadatos del nodo |
Selección dinámica de agente: Pasa una cadena vacía como ID de agente y resuélvela vía input_transform:
:agent("", {
inputs = { required = { "spec", "task" } },
input_transform = {
agent_id = "inputs.spec.agent_id",
task = "inputs.task"
},
arena = {
prompt = "Process according to spec",
max_iterations = 25
}
})
Validación de salida: Cuando exit_func_id está establecido, la función valida la salida de exit del agente. En caso de fallo de validación, el agente recibe el error como observación y continúa (hasta max_iterations).
Nodo ciclo
Itera una función o plantilla repetidamente con estado persistente:
:cycle({
func_id = "app:content_cycle",
max_iterations = 3,
initial_state = {
entry_id = entry_id,
content_prompt = prompt,
min_score = 8.0,
feedback_history = {}
}
})
La función de ciclo recibe en cada iteración:
{
input = <workflow_input>,
state = <accumulated_state>,
last_result = <previous_iteration_output>,
iteration = <current_iteration_number>
}
La función controla la continuación:
function my_cycle(cycle_context)
-- stop if approved
if cycle_context.last_result and cycle_context.last_result.approved then
return {
state = cycle_context.state,
result = cycle_context.last_result,
continue = false
}
end
-- spawn child workflow for this iteration
return flow.create()
:with_input({ task = cycle_context.input.task })
:agent("app:worker")
:agent("app:qa")
:run()
end
| Option | Type | Description |
|---|---|---|
func_id |
string | Función de iteración (mutuamente exclusiva con template) |
template |
FlowBuilder | Plantilla para cada iteración (mutuamente exclusiva con func_id) |
max_iterations |
number | Iteraciones máximas |
initial_state |
table | Estado inicial |
continue_condition |
string | Expresión: continuar mientras sea verdadera |
Ciclo basado en plantilla:
:cycle({
template = flow.template()
:agent("app:worker")
:func("app:validator"),
max_iterations = 5
})
Nodo paralelo
Patrón map-reduce sobre arrays:
:parallel({
inputs = { required = { "specs", "task" } },
source_array_key = "specs",
iteration_input_key = "spec",
passthrough_keys = { "task" },
batch_size = 10,
on_error = "collect_errors",
filter = "successes",
unwrap = true,
template = flow.template()
:agent("app:processor", {
inputs = { required = { "spec", "task" } },
input_transform = {
agent_id = "inputs.spec.agent_id",
task = "inputs.task"
},
arena = {
prompt = "Process according to spec",
max_iterations = 25
}
})
:to("@success"),
metadata = { title = "Process Specs" }
})
| Option | Type | Description |
|---|---|---|
source_array_key |
string | Clave de entrada que contiene el array (requerido) |
template |
FlowBuilder | Plantilla para cada ítem (requerido, debe enrutar a @success) |
iteration_input_key |
string | Clave de entrada para el ítem actual (por defecto: "default") |
batch_size |
number | Ítems por lote paralelo (por defecto: 1 = secuencial) |
on_error |
string | "collect_errors" (por defecto) o "fail_fast" |
filter |
string | "all" (por defecto), "successes", "failures" |
unwrap |
boolean | Devolver resultados en bruto en lugar de metadatos envueltos (por defecto: false) |
passthrough_keys |
array | Claves de entrada reenviadas a cada iteración |
Las passthrough keys proporcionan contexto compartido (configuración, descripción de tarea) a cada iteración sin duplicar datos en el array origen:
:with_data(file_list):as("files"):to("processor", "files")
:with_data("secret"):as("api_key"):to("processor", "api_key")
:parallel({
inputs = { required = { "files", "api_key" } },
source_array_key = "files",
iteration_input_key = "filename",
passthrough_keys = { "api_key" },
template = flow.template()
:func("app:upload", {
inputs = { required = { "filename", "api_key" } }
})
:to("@success")
}):as("processor")
Nodo signal
Pausa la ejecución hasta que llega una señal externa. Úsalo para aprobaciones humanas, eventos externos o workflows por etapas:
:signal({
signal_id = "approval",
inputs = { required = { "draft" } },
metadata = { title = "Wait for approval" }
})
| Option | Type | Description |
|---|---|---|
signal_id |
string | Nombre de la señal comparado con client:signal(). Si está vacío u omitido, se genera un UUID v7 en tiempo de ejecución |
inputs |
table | Requisitos de entrada |
input_transform |
string/table | Transforma entradas antes de que el nodo las reciba |
metadata |
table | Metadatos del nodo |
Envía la señal desde fuera del workflow usando la API del cliente (ver client:signal() más abajo).
Comportamiento
El nodo hace yield con wait_for_signal = true y persiste ese yield en el estado del workflow. El orquestador reanuda el nodo cuando llega un commit NODE_SIGNAL coincidente.
- La señal se satisface con cualquier payload distinto de
nil.false,0,""y{}satisfacen el yield; solonillo mantiene pendiente. - Un yield de señal bloquea
COMPLETE_WORKFLOWpero no bloquea otros nodos pendientes — las ramas paralelas continúan ejecutándose mientras una rama espera. - Las señales pueden pre-encolarse antes de
:start(): si un commitNODE_SIGNALcoincidente llega antes de que el nodo signal alcance el yield, se entrega en el momento en que el yield es registrado. - Solo una señal satisface cada yield. Si una segunda señal con el mismo
signal_idllega antes de que el yield se satisfaga, sobrescribe la primera. - Cuando múltiples yields de señal comparten el mismo
signal_id, el primer yield coincidente recibe los datos. - Si el campo
signal_idestá ausente, la coincidencia recurre al discriminador del nodo. - Los datos de la señal entregada se pasan a la salida del nodo como payload de la señal.
Durabilidad y recuperación
El yield de la señal es parte del estado del workflow, persistido a través del mismo mecanismo de outbox que cualquier otro comando. Si el proceso del orquestador es matado mientras espera:
- El yield pendiente se restaura al reiniciar.
- Las señales entregadas durante la interrupción se encolan y aplican cuando el estado se recarga.
- Los pipelines compuestos (
func → signal → signal → func) se recuperan paso a paso — cada señal puede entregarse a través de un reinicio separado.
Los yields de señal huérfanos (yields cuyo proceso padre salió sin completar) son limpiados por el manejador de salida de proceso del estado del workflow.
Patrones de pipeline
Los nodos signal participan en cualquier topología:
-- Human-in-the-loop approval between two functions
flow.create()
:func("app:draft")
:signal({ signal_id = "approve_draft" })
:func("app:publish")
:run()
-- Two parallel approvals that must both arrive before release
flow.create()
:with_input({ doc = "release-notes" })
:as("trigger")
:to("legal", "doc")
:to("finance", "doc")
:signal({ signal_id = "legal_ok", inputs = { required = { "doc" } } })
:as("legal")
:to("gate", "legal")
:signal({ signal_id = "finance_ok", inputs = { required = { "doc" } } })
:as("finance")
:to("gate", "finance")
:join({ inputs = { required = { "legal", "finance" } } })
:as("gate")
:to("release")
:func("app:release"):as("release"):to("@success")
:run()
Los datos de la señal se exponen como la salida del nodo, por lo que los nodos descendentes reciben lo que se pasó a client:signal().
Nodo join
Recolecta múltiples entradas antes de proceder:
:join({
inputs = { required = { "source1", "source2" } },
output_mode = "object",
ignored_keys = { "triggered" }
})
| Option | Type | Description |
|---|---|---|
output_mode |
string | "object" (por defecto) o "array" (orden de llegada) |
ignored_keys |
array | Claves de entrada excluidas de la salida |
inputs |
table | Requisitos de entrada |
Plantillas
Las plantillas definen sub-workflows reutilizables. Usa flow.template() para crear, :use() para insertar:
local preprocessor = flow.template()
:func("app:clean")
:func("app:tokenize")
flow.create()
:with_input(data)
:use(preprocessor)
:func("app:process")
:run()
Las plantillas insertan sus operaciones en el flujo padre en tiempo de compilación.
Workflows anidados
Las funciones usadas en nodos de ciclo y paralelo pueden generar workflows hijos devolviendo flow.create():run():
function my_processor(input)
return flow.create()
:with_input(input)
:func("app:step_a")
:func("app:step_b")
:run()
end
Cuando :run() se ejecuta dentro de un contexto dataflow existente, devuelve { _control = { commands = [...] } } en lugar de ejecutarse directamente. El orquestador maneja el workflow hijo a través del mecanismo de yield.
flow.create():run(). Las funciones que devuelven cualquier otra cosa no pueden generar workflows hijos.
Síncrono vs Asíncrono
:run() bloquea hasta que el workflow se completa y devuelve la salida:
local result, err = flow.create()
:with_input({ text = "hello" })
:func("app:process")
:run()
:start() devuelve inmediatamente un ID de workflow:
local dataflow_id, err = flow.create()
:with_input({ text = "hello" })
:func("app:process")
:start()
:start() no se puede usar en contextos anidados.
API del cliente
Para la gestión programática de workflows:
imports:
client: userspace.dataflow:client
local client = require("client")
local c, err = client.new()
| Method | Description |
|---|---|
client.new() |
Crear cliente (requiere actor de seguridad) |
:create_workflow(commands, options?) |
Crear workflow, devuelve dataflow_id |
:execute(dataflow_id, options?) |
Ejecutar sincrónicamente, devuelve el resultado |
:start(dataflow_id, options?) |
Ejecutar asincrónicamente, devuelve dataflow_id |
:output(dataflow_id) |
Obtener salidas del workflow |
:get_status(dataflow_id) |
Obtener estado actual |
:cancel(dataflow_id, timeout?) |
Cancelar con gracia (por defecto: 30s) |
:terminate(dataflow_id) |
Terminación forzada |
:signal(dataflow_id, signal_id, data?) |
Entregar una señal externa a un nodo signal en espera |
Estado del workflow
| Status | Description |
|---|---|
template |
El nodo es una instancia de plantilla |
pending |
Esperando entradas |
ready |
Entradas recopiladas, listo para ejecutar |
running |
Ejecutándose activamente |
paused |
Yield realizado, esperando workflow hijo |
completed |
Terminado con éxito |
failed |
Falló |
cancelled |
Cancelado por el usuario |
skipped |
Rama condicional no tomada |
terminated |
Terminado forzadamente |
Metadatos
flow.create()
:with_title("Document Processing Pipeline")
:with_metadata({ source = "api", priority = "high" })
:func("app:process", { metadata = { title = "Process Document" } })
:run()
El título por defecto es "Flow Builder Workflow" si no se proporciona.
Reglas de validación
El compilador valida los workflows en tiempo de compilación:
- Todos los nombres de
:as(name)deben ser únicos - Todos los destinos
:to()y:error_to()deben referenciar nombres existentes (excepto@success,@fail) - El grafo debe ser acíclico
- Todos los nodos deben tener rutas entrantes (de otro nodo, entrada del workflow o datos estáticos)
:cycle()requierefunc_idotemplate(no ambos):parallel()requieresource_array_keyytemplate- Al menos una ruta debe llevar a
@successo tener auto-salida :when()solo sigue a:to()o:error_to()de nodos (no de datos estáticos)- Los nodos con
argsno pueden recibir entradas con el discriminador"default"
Referencia de expresiones
Las expresiones usan la sintaxis del módulo expr, disponible en las condiciones :when() y los valores de input_transform.
Operadores: +, -, *, /, %, **, ==, !=, <, <=, >, >=, &&, ||, !, contains, startsWith, endsWith
Funciones de array: all(), any(), none(), one(), filter(), map(), count(), len(), first(), last()
Funciones matemáticas: max(), min(), abs(), ceil(), floor(), round(), sqrt(), pow()
Funciones de cadena: len(), upper(), lower(), trim(), split(), join()
Funciones de tipo: type(), int(), float(), string()
Literales: números, cadenas, booleanos (true/false), null (nil), arrays ([1, 2, 3]), objetos ({key: value})
Ternario: output.age >= 18 ? output.verified : false
Encadenamiento opcional: output.data?.nested?.value
Manejo de errores
Tanto :run() como :start() siguen las convenciones estándar de errores de Lua:
- Éxito:
data, nil(run) odataflow_id, nil(start) - Fallo:
nil, error_message
Categorías de error: errores de compilación, errores del cliente, errores de creación de workflow, errores de ejecución y fallos de workflow.
Véase también
- Agents - Framework de agentes usado por los nodos agente
- LLM - Módulo LLM
- Framework Overview - Uso del módulo de framework