Workflows
Los workflows son funciones durables que orquestan activities y mantienen estado a través de fallos y reinicios. Se definen usando el tipo de entrada workflow.lua.
Definición
- name: order_workflow
kind: workflow.lua
source: file://order_workflow.lua
method: main
modules:
- funcs
- time
- workflow
meta:
temporal:
workflow:
worker: app:worker
Campos de Metadatos
| Campo | Requerido | Descripción |
|---|---|---|
worker |
Sí | Referencia a la entrada temporal.worker |
name |
No | Nombre de tipo de workflow personalizado (por defecto el ID de entrada) |
Implementación Básica
local funcs = require("funcs")
local time = require("time")
local function main(order)
local payment, err = funcs.call("app:charge_payment", {
amount = order.total,
customer = order.customer_id
})
if err then
return {status = "failed", error = tostring(err)}
end
time.sleep("1h")
local shipment, err = funcs.call("app:ship_order", {
order_id = order.id,
address = order.shipping_address
})
if err then
funcs.call("app:refund_payment", payment.id)
return {status = "failed", error = tostring(err)}
end
return {
status = "completed",
payment_id = payment.id,
tracking = shipment.tracking_number
}
end
return { main = main }
Módulo workflow
El módulo workflow proporciona operaciones específicas de workflow.
workflow.info()
Obtener información de ejecución del workflow:
local workflow = require("workflow")
local info = workflow.info()
print(info.workflow_id) -- ID de ejecución del workflow
print(info.run_id) -- ID de ejecución actual
print(info.workflow_type) -- Nombre del tipo de workflow
print(info.task_queue) -- Nombre de la cola de tareas
print(info.namespace) -- Namespace de Temporal
print(info.attempt) -- Número de intento actual
print(info.history_length) -- Número de eventos en el historial
print(info.history_size) -- Tamaño del historial en bytes
workflow.exec()
Ejecutar un workflow hijo de forma síncrona y esperar su resultado:
local result, err = workflow.exec("app:child_workflow", input_data)
if err then
return nil, err
end
Esta es la forma más sencilla de ejecutar workflows hijos cuando se necesita esperar el resultado en línea.
workflow.version()
Manejar cambios de código con versionado determinista:
local version = workflow.version("payment-v2", 1, 2)
if version == 1 then
result = funcs.call("app:old_payment", input)
else
result = funcs.call("app:new_payment", input)
end
Parámetros:
change_id- Identificador único para este cambiomin_supported- Versión mínima soportadamax_supported- Versión máxima (actual)
El número de versión es determinista por ejecución de workflow. Los workflows en vuelo existentes continúan usando su versión registrada, mientras que los nuevos workflows usan max_supported.
workflow.attrs()
Actualizar atributos de búsqueda y memo:
workflow.attrs({
search = {
status = "processing",
customer_id = order.customer_id,
order_total = order.total
},
memo = {
notes = "Priority customer",
source = "web"
}
})
Los atributos de búsqueda están indexados y son consultables via las APIs de visibilidad de Temporal. El memo son datos arbitrarios no indexados adjuntos al workflow.
workflow.history_length() / workflow.history_size()
Monitorear el crecimiento del historial del workflow:
local length = workflow.history_length()
local size = workflow.history_size()
if length > 10000 then
-- Considerar continue-as-new para resetear el historial
end
Iniciar Workflows
Spawn Básico
Iniciar un workflow desde cualquier código usando process.spawn():
local pid, err = process.spawn(
"app:order_workflow", -- entrada del workflow
"app:worker", -- worker de temporal
{order_id = "123"} -- input
)
El parámetro host es el worker de temporal (no un host de proceso). El workflow se ejecuta de forma durable en la infraestructura de Temporal.
Spawn con Monitoreo
Monitorear workflows para recibir eventos EXIT cuando se completan:
local pid, err = process.spawn_monitored(
"app:order_workflow",
"app:worker",
{order_id = "123"}
)
local events = process.events()
local event = events:receive()
if event.kind == process.event.EXIT then
local result = event.result.value
local error = event.result.error
end
Spawn con Nombre
Asignar un nombre a un workflow para inicios idempotentes:
local spawner = process
.with_options({})
:with_name("order-" .. order.id)
local pid, err = spawner:spawn_monitored(
"app:order_workflow",
"app:worker",
{order_id = order.id}
)
Cuando se proporciona un nombre, Temporal lo usa para deduplicar inicios de workflow. Hacer spawn con el mismo nombre mientras un workflow está ejecutándose devuelve el PID del workflow existente por defecto.
Spawn con ID de Workflow Explícito
Establecer un ID de workflow de Temporal específico:
local spawner = process
.with_options({
["temporal.workflow.id"] = "order-" .. order.id,
})
local pid, err = spawner:spawn_monitored(
"app:order_workflow",
"app:worker",
order
)
Políticas de Conflicto de ID
Controlar el comportamiento al hacer spawn de un workflow con un ID que ya existe:
-- Fallar si el workflow ya existe
local spawner = process
.with_options({
["temporal.workflow.id"] = "order-123",
["temporal.workflow.id_conflict_policy"] = "fail",
})
local pid, err = spawner:spawn("app:order_workflow", "app:worker", order)
if err then
-- Workflow ya ejecutándose con este ID
end
-- Error cuando ya se inició (enfoque alternativo)
local spawner = process
.with_options({
["temporal.workflow.id"] = "order-123",
["temporal.workflow.execution_error_when_already_started"] = true,
})
local pid, err = spawner:spawn("app:order_workflow", "app:worker", order)
-- Reutilizar existente (comportamiento por defecto con ID explícito)
local spawner = process
.with_options({
["temporal.workflow.id"] = "order-123",
})
local pid, err = spawner:spawn("app:order_workflow", "app:worker", order)
-- Devuelve el PID del workflow existente si ya está ejecutándose
| Política | Comportamiento |
|---|---|
"use_existing" |
Devolver PID del workflow existente (por defecto con ID explícito) |
"fail" |
Devolver error si el workflow existe |
"terminate_existing" |
Terminar el existente e iniciar uno nuevo |
Opciones de Inicio del Workflow
Pasar opciones de workflow de Temporal via with_options():
local spawner = process.with_options({
["temporal.workflow.id"] = "order-123",
["temporal.workflow.execution_timeout"] = "24h",
["temporal.workflow.run_timeout"] = "1h",
["temporal.workflow.task_timeout"] = "30s",
["temporal.workflow.id_conflict_policy"] = "fail",
["temporal.workflow.retry_policy"] = {
initial_interval = 1000,
backoff_coefficient = 2.0,
maximum_interval = 300000,
maximum_attempts = 3,
},
["temporal.workflow.cron_schedule"] = "0 */6 * * *",
["temporal.workflow.search_attributes"] = {
customer_id = "cust-123"
},
["temporal.workflow.memo"] = {
source = "api"
},
["temporal.workflow.start_delay"] = "5m",
["temporal.workflow.parent_close_policy"] = "terminate",
})
Referencia Completa de Opciones
| Opción | Tipo | Descripción |
|---|---|---|
temporal.workflow.id |
string | ID de ejecución del workflow explícito |
temporal.workflow.task_queue |
string | Sobrescribir la cola de tareas |
temporal.workflow.execution_timeout |
duration | Tiempo de espera total de ejecución del workflow |
temporal.workflow.run_timeout |
duration | Tiempo de espera de una sola ejecución |
temporal.workflow.task_timeout |
duration | Tiempo de espera de procesamiento de tarea del workflow |
temporal.workflow.id_conflict_policy |
string | use_existing, fail, terminate_existing |
temporal.workflow.id_reuse_policy |
string | allow_duplicate, allow_duplicate_failed_only, reject_duplicate |
temporal.workflow.execution_error_when_already_started |
boolean | Error si el workflow ya está ejecutándose |
temporal.workflow.retry_policy |
table | Política de reintentos (ver abajo) |
temporal.workflow.cron_schedule |
string | Expresión cron para workflows recurrentes |
temporal.workflow.memo |
table | Metadatos del workflow no indexados |
temporal.workflow.search_attributes |
table | Atributos indexados consultables |
temporal.workflow.enable_eager_start |
boolean | Iniciar ejecución inmediatamente |
temporal.workflow.start_delay |
duration | Retraso antes de que el workflow inicie |
temporal.workflow.parent_close_policy |
string | Comportamiento del hijo al cerrar el padre |
temporal.workflow.wait_for_cancellation |
boolean | Esperar a que la cancelación finalice |
temporal.workflow.namespace |
string | Sobrescribir el namespace de Temporal |
Los valores de duración aceptan strings ("5s", "10m", "1h") o milisegundos como números.
Política de Cierre del Padre
Controla qué sucede con los workflows hijos cuando el padre se cierra:
| Política | Comportamiento |
|---|---|
"terminate" |
Terminar el workflow hijo |
"abandon" |
Dejar que el hijo continúe independientemente |
"request_cancel" |
Enviar solicitud de cancelación al hijo |
Mensajes de Inicio
Encolar señales para enviar a un workflow inmediatamente después de que inicie. Los mensajes se entregan antes de cualquier señal externa:
local spawner = process
.with_options({})
:with_name("counter-workflow")
:with_message("increment", {amount = 2})
:with_message("increment", {amount = 1})
:with_message("increment", {amount = 4})
local pid, err = spawner:spawn_monitored(
"app:counter_workflow",
"app:worker",
{initial = 0}
)
Los mensajes de inicio son especialmente útiles con la política de conflicto use_existing. Cuando un segundo spawn resuelve a un workflow existente, los mensajes de inicio aún se entregan:
-- El primer spawn inicia el workflow con los mensajes iniciales
local first = process
.with_options({})
:with_name("my-counter")
:with_message("increment", {amount = 3})
local pid, err = first:spawn("app:counter_workflow", "app:worker", {initial = 0})
-- El segundo spawn reutiliza el workflow existente y entrega nuevos mensajes
local second = process
.with_options({})
:with_name("my-counter")
:with_message("increment", {amount = 2})
local pid2, err = second:spawn("app:counter_workflow", "app:worker", {initial = 999})
-- pid2 == pid (mismo workflow), el input {initial = 999} se ignora
-- Pero el mensaje increment con amount=2 se entrega
Propagación de Contexto
Pasar valores de contexto accesibles dentro del workflow y sus activities:
local spawner = process.with_context({
user_id = "user-1",
tenant = "tenant-1",
request_id = "req-abc",
})
local pid, err = spawner:spawn_monitored(
"app:order_workflow",
"app:worker",
order
)
Dentro del workflow (o cualquier activity que llame), leer el contexto via el módulo ctx:
local ctx = require("ctx")
local user_id = ctx.get("user_id") -- "user-1"
local tenant = ctx.get("tenant") -- "tenant-1"
local all = ctx.all() -- {user_id="user-1", tenant="tenant-1", request_id="req-abc"}
Desde Handlers HTTP
local function handler()
local req = http.request()
local order = json.decode(req:body())
local spawner = process
.with_context({request_id = req:header("X-Request-ID")})
:with_options({
["temporal.workflow.id"] = "order-" .. order.id,
["temporal.workflow.id_conflict_policy"] = "fail",
})
local pid, err = spawner:spawn(
"app:order_workflow",
"app:worker",
order
)
local res = http.response()
if err then
res:set_status(409)
return res:write_json({error = tostring(err)})
end
res:set_status(202)
return res:write_json({
workflow_id = tostring(pid),
status = "started"
})
end
Señales
Los workflows reciben señales a través del sistema de mensajería de procesos. Las señales son durables — sobreviven a los replays del workflow.
Patrón de Buzón
Recibir todos los mensajes a través del buzón del proceso:
local function main(order)
local inbox = process.inbox()
while true do
local msg = inbox:receive()
local topic = msg:topic()
local data = msg:payload():data()
if topic == "approve" then
break
elseif topic == "cancel" then
return {status = "cancelled", reason = data.reason}
end
end
return process_order(order)
end
Suscripción por Tema
Suscribirse a temas específicos usando process.listen():
local function main(input)
local results = {}
local job_ch = process.listen("add_job")
local exit_ch = process.listen("exit")
while true do
local result = channel.select{
job_ch:case_receive(),
exit_ch:case_receive()
}
if result.channel == exit_ch then
break
elseif result.channel == job_ch then
local job_data = result.value
local activity_result, err = funcs.call(
"app:echo_activity",
{job_id = job_data.id, data = job_data}
)
table.insert(results, {
job_id = job_data.id,
result = activity_result
})
end
end
return {total_jobs = #results, results = results}
end
Por defecto, process.listen() devuelve datos de payload raw. Usar {message = true} para recibir objetos Message con información del remitente:
local ch = process.listen("request", {message = true})
local msg = ch:receive()
local sender = msg:from()
local data = msg:payload():data()
Múltiples Handlers de Señales
Usar coroutine.spawn() para manejar diferentes tipos de señales concurrentemente:
local function main(input)
local counter = input.initial or 0
local done = false
coroutine.spawn(function()
local ch = process.listen("increment", {message = true})
while not done do
local msg, ok = ch:receive()
if not ok then break end
local data = msg:payload():data()
local reply_to = msg:from()
if type(data) ~= "table" or type(data.amount) ~= "number" then
process.send(reply_to, "nak", "amount must be a number")
else
process.send(reply_to, "ack")
counter = counter + data.amount
process.send(reply_to, "ok", {value = counter})
end
end
end)
coroutine.spawn(function()
local ch = process.listen("decrement", {message = true})
while not done do
local msg, ok = ch:receive()
if not ok then break end
local data = msg:payload():data()
local reply_to = msg:from()
if counter - data.amount < 0 then
process.send(reply_to, "nak", "would result in negative value")
else
process.send(reply_to, "ack")
counter = counter - data.amount
process.send(reply_to, "ok", {value = counter})
end
end
end)
-- La coroutine principal espera la señal de finalización
local finish_ch = process.listen("finish", {message = true})
local msg = finish_ch:receive()
process.send(msg:from(), "ack")
process.send(msg:from(), "ok", {message = "finishing"})
done = true
return {final_counter = counter}
end
Reconocimiento de Señales
Implementar patrones de solicitud-respuesta enviando respuestas de vuelta al remitente:
-- Lado del workflow
local ch = process.listen("get_status", {message = true})
local msg = ch:receive()
process.send(msg:from(), "status_response", {status = "processing", progress = 75})
-- Lado del caller
local response_ch = process.listen("status_response")
process.send(workflow_pid, "get_status", {})
local timeout = time.after("5s")
local result = channel.select{
response_ch:case_receive(),
timeout:case_receive()
}
if result.channel == response_ch then
local status = result.value
end
Señalización Entre Workflows
Los workflows pueden enviar señales a otros workflows usando su PID:
-- Workflow remitente
local function main(input)
local target_pid = input.target
local ok, err = process.send(target_pid, "cross_host_ping", {data = "hello"})
if err then
return {ok = false, error = tostring(err)}
end
local response_ch = process.listen("cross_host_pong")
local response = response_ch:receive()
return {ok = true, received = response}
end
Workflows Hijos
Hijo Síncrono (workflow.exec)
Ejecutar un workflow hijo y esperar el resultado:
local result, err = workflow.exec("app:child_workflow", input_data)
if err then
return nil, err
end
Hijo Asíncrono (process.spawn)
Lanzar un workflow hijo sin bloquear, luego esperar su completación via eventos:
local events_ch = process.events()
local child_pid, err = process.spawn(
"app:child_workflow",
"app:worker",
{message = "hello from parent"}
)
if err then
return {status = "spawn_failed", error = tostring(err)}
end
-- Esperar el evento EXIT del hijo
local event = events_ch:receive()
if event.kind == process.event.EXIT then
local child_result = event.result.value
local child_error = event.result.error
end
Propagación de Errores desde Hijos
Cuando un workflow hijo devuelve un error, aparece en el evento EXIT:
local events_ch = process.events()
local child_pid, err = process.spawn(
"app:error_child_workflow",
"app:worker"
)
local event = events_ch:receive()
if event.result.error then
local child_err = event.result.error
-- Los objetos de error tienen métodos kind(), retryable(), message()
print(child_err:kind()) -- ej. "NOT_FOUND"
print(child_err:retryable()) -- false
print(child_err:message()) -- texto del mensaje de error
end
Ejecutar Workflows Síncronamente (process.exec)
Ejecutar un workflow y esperar su resultado en una sola llamada:
local result, err = process.exec(
"app:hello_workflow",
"app:worker",
{name = "world"}
)
if err then
return nil, err
end
-- result contiene el valor de retorno del workflow
Monitoreo y Enlace
Monitoreo Posterior al Inicio
Monitorear un workflow después de que ya ha iniciado:
local pid, err = process.spawn(
"app:long_workflow",
"app:worker",
{iterations = 100}
)
-- Monitorear más tarde
local ok, err = process.monitor(pid)
local events_ch = process.events()
local event = events_ch:receive() -- EXIT cuando el workflow completa
Enlace Posterior al Inicio
Enlazar a un workflow en ejecución para recibir LINK_DOWN en terminación anormal:
local ok, err = process.set_options({trap_links = true})
local pid, err = process.spawn(
"app:long_workflow",
"app:worker",
{iterations = 100}
)
-- Enlazar después de que el workflow ha iniciado
time.sleep("200ms")
local ok, err = process.link(pid)
-- Si el workflow es terminado, recibir LINK_DOWN
process.terminate(pid)
local events_ch = process.events()
local event = events_ch:receive()
-- event.kind == process.event.LINK_DOWN
Los eventos LINK_DOWN requieren trap_links = true en las opciones del proceso. Sin él, la terminación de un proceso enlazado propaga el fallo.
Desmonitorear / Desenlazar
Eliminar monitoreo o enlace:
process.unmonitor(pid) -- dejar de recibir eventos EXIT
process.unlink(pid) -- eliminar enlace bidireccional
Tras desmonitorear o desenlazar, los eventos para ese proceso ya no se entregan.
Terminación y Cancelación
Terminar
Terminar forzosamente un workflow en ejecución:
local ok, err = process.terminate(workflow_pid)
Los callers monitorizados reciben un evento EXIT con un error.
Cancelar
Solicitar cancelación controlada con un motivo opcional:
local ok, err = process.cancel(workflow_pid, "cancelled by operator")
Trabajo Concurrente
Usar coroutine.spawn() y canales para trabajo paralelo dentro de workflows:
local function main(input)
local worker_count = input.workers or 3
local job_count = input.jobs or 6
local work_queue = channel.new(10)
local results = channel.new(10)
for w = 1, worker_count do
coroutine.spawn(function()
while true do
local job, ok = work_queue:receive()
if not ok then break end
time.sleep(10 * time.MILLISECOND)
results:send({worker = w, job = job, result = job * 2})
end
end)
end
for j = 1, job_count do
work_queue:send(j)
end
work_queue:close()
local total = 0
local processed = {}
for _ = 1, job_count do
local r = results:receive()
total = total + r.result
table.insert(processed, r)
end
return {total = total, processed = processed}
end
Todas las operaciones de canal y sleeps dentro de coroutines son seguras para replay.
Temporizadores
Los temporizadores durables sobreviven a los reinicios:
local time = require("time")
time.sleep("24h")
time.sleep("5m")
time.sleep("30s")
time.sleep(100 * time.MILLISECOND)
Rastrear tiempo transcurrido:
local start = time.now()
time.sleep("1s")
local elapsed = time.now():sub(start):milliseconds()
Determinismo
El código del workflow debe ser determinista. Las mismas entradas deben producir la misma secuencia de comandos.
Operaciones Seguras para Replay
Estas operaciones se interceptan automáticamente y sus resultados se registran. En el replay, se devuelven los valores registrados:
-- Llamadas a activities
local data = funcs.call("app:fetch_data", id)
-- Sleep durable
time.sleep("1h")
-- Tiempo actual
local now = time.now()
-- Generación de UUID
local id = uuid.v4()
-- Operaciones criptográficas
local bytes = crypto.random_bytes(32)
-- Workflows hijos
local result = workflow.exec("app:child", input)
-- Versionado
local v = workflow.version("change-1", 1, 2)
No Deterministas (Evitar)
-- No usar tiempo de reloj de pared
local now = os.time() -- no determinista
-- No usar random directamente
local r = math.random() -- no determinista
-- No hacer I/O en código de workflow
local file = io.open("data.txt") -- no determinista
-- No usar estado mutable global
counter = counter + 1 -- no determinista entre replays
Manejo de Errores
Errores de Activities
Los errores de activities llevan metadatos estructurados:
local result, err = funcs.call("app:risky_activity", order)
if err then
print(err:kind()) -- clasificación del error (ej. "NOT_FOUND", "INTERNAL")
print(err:retryable()) -- si el error es reintentable
print(err:message()) -- mensaje de error legible por humanos
end
Modos de Fallo de Activities
Configurar el comportamiento de reintento para llamadas a activities:
local executor = funcs.new():with_options({
["activity.retry_policy"] = {
maximum_attempts = 1,
}
})
local result, err = executor:call("app:unreliable_activity", input)
if err then
local kind = err:kind() -- "INTERNAL" para errores del runtime
local retryable = err:retryable()
end
Errores de Workflows Hijos
Los errores de workflows hijos (via process.exec o eventos EXIT) llevan los mismos metadatos:
local result, err = process.exec("app:error_workflow", "app:worker")
if err then
print(err:kind()) -- ej. "NOT_FOUND"
print(err:retryable()) -- false
print(err:message()) -- detalles del error
end
Patrón de Compensación (Saga)
local function run_compensations(compensations)
for _, comp in ipairs(compensations) do
funcs.call(comp.action, comp.args)
end
end
local function main(order)
local compensations = {}
local reservation, err = funcs.call("app:reserve_inventory", order.items)
if err then
return {status = "failed", step = "inventory", error = tostring(err)}
end
table.insert(compensations, 1, {
action = "app:release_inventory",
args = reservation.id
})
local payment, err = funcs.call("app:charge_payment", order.payment)
if err then
run_compensations(compensations)
return {status = "failed", step = "payment", error = tostring(err)}
end
table.insert(compensations, 1, {
action = "app:refund_payment",
args = payment.id
})
local shipment, err = funcs.call("app:ship_order", order.shipping)
if err then
run_compensations(compensations)
return {status = "failed", step = "shipping", error = tostring(err)}
end
return {status = "completed", tracking = shipment.tracking}
end
Ver También
- Visión General - Configuración de cliente y worker
- Activities - Definiciones y opciones de activities
- Process - API de gestión de procesos
- Funciones - Invocación de funciones
- Canales - Operaciones de canales