Workflows
Workflows são funções duráveis que orquestram atividades e mantêm estado através de falhas e reinicializações. São definidos usando o tipo de entrada workflow.lua.
Definição
- name: order_workflow
kind: workflow.lua
source: file://order_workflow.lua
method: main
modules:
- funcs
- time
- workflow
meta:
temporal:
workflow:
worker: app:worker
Campos de Metadados
| Campo | Obrigatório | Descrição |
|---|---|---|
worker |
Sim | Referência à entrada temporal.worker |
name |
Não | Nome personalizado do tipo de workflow (padrão: ID da entrada) |
Implementação Básica
local funcs = require("funcs")
local time = require("time")
local function main(order)
local payment, err = funcs.call("app:charge_payment", {
amount = order.total,
customer = order.customer_id
})
if err then
return {status = "failed", error = tostring(err)}
end
time.sleep("1h")
local shipment, err = funcs.call("app:ship_order", {
order_id = order.id,
address = order.shipping_address
})
if err then
funcs.call("app:refund_payment", payment.id)
return {status = "failed", error = tostring(err)}
end
return {
status = "completed",
payment_id = payment.id,
tracking = shipment.tracking_number
}
end
return { main = main }
Módulo Workflow
O módulo workflow fornece operações específicas de workflow.
workflow.info()
Obtém informações de execução do workflow:
local workflow = require("workflow")
local info = workflow.info()
print(info.workflow_id) -- ID de execução do workflow
print(info.run_id) -- ID da execução atual
print(info.workflow_type) -- Nome do tipo de workflow
print(info.task_queue) -- Nome da task queue
print(info.namespace) -- Namespace Temporal
print(info.attempt) -- Número da tentativa atual
print(info.history_length) -- Número de eventos no histórico
print(info.history_size) -- Tamanho do histórico em bytes
workflow.exec()
Executa um workflow filho de forma síncrona e aguarda seu resultado:
local result, err = workflow.exec("app:child_workflow", input_data)
if err then
return nil, err
end
Esta é a forma mais simples de executar workflows filhos quando você precisa aguardar o resultado de forma síncrona.
workflow.version()
Trata mudanças de código com versionamento determinístico:
local version = workflow.version("payment-v2", 1, 2)
if version == 1 then
result = funcs.call("app:old_payment", input)
else
result = funcs.call("app:new_payment", input)
end
Parâmetros:
change_id- Identificador único para esta mudançamin_supported- Versão mínima suportadamax_supported- Versão máxima (atual)
O número da versão é determinístico por execução de workflow. Workflows em andamento continuam usando sua versão registrada, enquanto novos workflows usam max_supported.
workflow.attrs()
Atualiza atributos de busca e memo:
workflow.attrs({
search = {
status = "processing",
customer_id = order.customer_id,
order_total = order.total
},
memo = {
notes = "Priority customer",
source = "web"
}
})
Atributos de busca são indexados e consultáveis via APIs de visibilidade do Temporal. Memo são dados arbitrários não indexados anexados ao workflow.
workflow.history_length() / workflow.history_size()
Monitora o crescimento do histórico do workflow:
local length = workflow.history_length()
local size = workflow.history_size()
if length > 10000 then
-- Considere continue-as-new para resetar o histórico
end
Iniciando Workflows
Spawn Básico
Inicie um workflow de qualquer código usando process.spawn():
local pid, err = process.spawn(
"app:order_workflow", -- workflow entry
"app:worker", -- temporal worker
{order_id = "123"} -- input
)
O segundo parâmetro é o worker temporal (não um host de processo). O workflow executa de forma durável na infraestrutura Temporal.
Spawn com Monitoramento
Monitore workflows para receber eventos EXIT quando eles completarem:
local pid, err = process.spawn_monitored(
"app:order_workflow",
"app:worker",
{order_id = "123"}
)
local events = process.events()
local event = events:receive()
if event.kind == process.event.EXIT then
local result = event.result.value
local error = event.result.error
end
Spawn com Nome
Atribua um nome a um workflow para inicializações idempotentes:
local spawner = process
.with_options({})
:with_name("order-" .. order.id)
local pid, err = spawner:spawn_monitored(
"app:order_workflow",
"app:worker",
{order_id = order.id}
)
Quando um nome é fornecido, o Temporal o utiliza para deduplicar inicializações de workflow. Iniciar com o mesmo nome enquanto um workflow está em execução retorna o PID do workflow existente por padrão.
Spawn com Workflow ID Explícito
Define um ID de workflow Temporal específico:
local spawner = process
.with_options({
["temporal.workflow.id"] = "order-" .. order.id,
})
local pid, err = spawner:spawn_monitored(
"app:order_workflow",
"app:worker",
order
)
Políticas de Conflito de ID
Controla o comportamento ao iniciar um workflow com um ID que já existe:
-- Falha se o workflow já existe
local spawner = process
.with_options({
["temporal.workflow.id"] = "order-123",
["temporal.workflow.id_conflict_policy"] = "fail",
})
local pid, err = spawner:spawn("app:order_workflow", "app:worker", order)
if err then
-- Workflow já em execução com este ID
end
-- Retorna erro se já iniciado (abordagem alternativa)
local spawner = process
.with_options({
["temporal.workflow.id"] = "order-123",
["temporal.workflow.execution_error_when_already_started"] = true,
})
local pid, err = spawner:spawn("app:order_workflow", "app:worker", order)
-- Reutiliza existente (comportamento padrão com ID explícito)
local spawner = process
.with_options({
["temporal.workflow.id"] = "order-123",
})
local pid, err = spawner:spawn("app:order_workflow", "app:worker", order)
-- Retorna PID do workflow existente se já estiver em execução
| Política | Comportamento |
|---|---|
"use_existing" |
Retorna PID do workflow existente (padrão com ID explícito) |
"fail" |
Retorna erro se o workflow existe |
"terminate_existing" |
Encerra o existente e inicia um novo |
Opções de Inicialização de Workflow
Passa opções de workflow Temporal via with_options():
local spawner = process.with_options({
["temporal.workflow.id"] = "order-123",
["temporal.workflow.execution_timeout"] = "24h",
["temporal.workflow.run_timeout"] = "1h",
["temporal.workflow.task_timeout"] = "30s",
["temporal.workflow.id_conflict_policy"] = "fail",
["temporal.workflow.retry_policy"] = {
initial_interval = 1000,
backoff_coefficient = 2.0,
maximum_interval = 300000,
maximum_attempts = 3,
},
["temporal.workflow.cron_schedule"] = "0 */6 * * *",
["temporal.workflow.search_attributes"] = {
customer_id = "cust-123"
},
["temporal.workflow.memo"] = {
source = "api"
},
["temporal.workflow.start_delay"] = "5m",
["temporal.workflow.parent_close_policy"] = "terminate",
})
Referência Completa de Opções
| Opção | Tipo | Descrição |
|---|---|---|
temporal.workflow.id |
string | ID explícito de execução do workflow |
temporal.workflow.task_queue |
string | Sobrescreve a task queue |
temporal.workflow.execution_timeout |
duration | Timeout total de execução do workflow |
temporal.workflow.run_timeout |
duration | Timeout de execução única |
temporal.workflow.task_timeout |
duration | Timeout de processamento da tarefa de workflow |
temporal.workflow.id_conflict_policy |
string | use_existing, fail, terminate_existing |
temporal.workflow.id_reuse_policy |
string | allow_duplicate, allow_duplicate_failed_only, reject_duplicate |
temporal.workflow.execution_error_when_already_started |
boolean | Erro se o workflow já está em execução |
temporal.workflow.retry_policy |
table | Política de retry (veja abaixo) |
temporal.workflow.cron_schedule |
string | Expressão cron para workflows recorrentes |
temporal.workflow.memo |
table | Metadados não indexados do workflow |
temporal.workflow.search_attributes |
table | Atributos indexados consultáveis |
temporal.workflow.enable_eager_start |
boolean | Inicia execução imediatamente |
temporal.workflow.start_delay |
duration | Atraso antes do workflow iniciar |
temporal.workflow.parent_close_policy |
string | Comportamento do filho quando o pai é encerrado |
temporal.workflow.wait_for_cancellation |
boolean | Aguarda conclusão do cancelamento |
temporal.workflow.namespace |
string | Sobrescrita de namespace Temporal |
Valores de duração aceitam strings ("5s", "10m", "1h") ou milissegundos como números.
Política de Fechamento do Pai
Controla o que acontece com workflows filhos quando o pai é encerrado:
| Política | Comportamento |
|---|---|
"terminate" |
Encerra o workflow filho |
"abandon" |
Permite que o filho continue independentemente |
"request_cancel" |
Envia solicitação de cancelamento ao filho |
Mensagens de Inicialização
Enfileira sinais para serem enviados a um workflow imediatamente após ele iniciar. As mensagens são entregues antes de quaisquer sinais externos:
local spawner = process
.with_options({})
:with_name("counter-workflow")
:with_message("increment", {amount = 2})
:with_message("increment", {amount = 1})
:with_message("increment", {amount = 4})
local pid, err = spawner:spawn_monitored(
"app:counter_workflow",
"app:worker",
{initial = 0}
)
Mensagens de inicialização são especialmente úteis com a política de conflito use_existing. Quando um segundo spawn resolve para um workflow existente, as mensagens de inicialização ainda são entregues:
-- Primeiro spawn inicia o workflow com mensagens iniciais
local first = process
.with_options({})
:with_name("my-counter")
:with_message("increment", {amount = 3})
local pid, err = first:spawn("app:counter_workflow", "app:worker", {initial = 0})
-- Segundo spawn reutiliza workflow existente e entrega novas mensagens
local second = process
.with_options({})
:with_name("my-counter")
:with_message("increment", {amount = 2})
local pid2, err = second:spawn("app:counter_workflow", "app:worker", {initial = 999})
-- pid2 == pid (mesmo workflow), input {initial = 999} é ignorado
-- Mas a mensagem increment com amount=2 é entregue
Propagação de Contexto
Passa valores de contexto acessíveis dentro do workflow e suas atividades:
local spawner = process.with_context({
user_id = "user-1",
tenant = "tenant-1",
request_id = "req-abc",
})
local pid, err = spawner:spawn_monitored(
"app:order_workflow",
"app:worker",
order
)
Dentro do workflow (ou qualquer activity que ele chame), leia o contexto via módulo ctx:
local ctx = require("ctx")
local user_id = ctx.get("user_id") -- "user-1"
local tenant = ctx.get("tenant") -- "tenant-1"
local all = ctx.all() -- {user_id="user-1", tenant="tenant-1", request_id="req-abc"}
A Partir de Handlers HTTP
local function handler()
local req = http.request()
local order = json.decode(req:body())
local spawner = process
.with_context({request_id = req:header("X-Request-ID")})
:with_options({
["temporal.workflow.id"] = "order-" .. order.id,
["temporal.workflow.id_conflict_policy"] = "fail",
})
local pid, err = spawner:spawn(
"app:order_workflow",
"app:worker",
order
)
if err then
return http.response():status(409):json({error = tostring(err)})
end
return http.response():status(202):json({
workflow_id = tostring(pid),
status = "started"
})
end
Sinais
Workflows recebem sinais via o sistema de mensagens de processo. Sinais são duráveis - sobrevivem a replays de workflow.
Padrão Inbox
Recebe todas as mensagens através do inbox do processo:
local function main(order)
local inbox = process.inbox()
while true do
local msg = inbox:receive()
local topic = msg:topic()
local data = msg:payload():data()
if topic == "approve" then
break
elseif topic == "cancel" then
return {status = "cancelled", reason = data.reason}
end
end
return process_order(order)
end
Assinatura Baseada em Tópicos
Assine tópicos específicos usando process.listen():
local function main(input)
local results = {}
local job_ch = process.listen("add_job")
local exit_ch = process.listen("exit")
while true do
local result = channel.select{
job_ch:case_receive(),
exit_ch:case_receive()
}
if result.channel == exit_ch then
break
elseif result.channel == job_ch then
local job_data = result.value
local activity_result, err = funcs.call(
"app:echo_activity",
{job_id = job_data.id, data = job_data}
)
table.insert(results, {
job_id = job_data.id,
result = activity_result
})
end
end
return {total_jobs = #results, results = results}
end
Por padrão, process.listen() retorna dados brutos do payload. Use {message = true} para receber objetos Message com informações do remetente:
local ch = process.listen("request", {message = true})
local msg = ch:receive()
local sender = msg:from()
local data = msg:payload():data()
Múltiplos Handlers de Sinal
Use coroutine.spawn() para tratar diferentes tipos de sinal concorrentemente:
local function main(input)
local counter = input.initial or 0
local done = false
coroutine.spawn(function()
local ch = process.listen("increment", {message = true})
while not done do
local msg, ok = ch:receive()
if not ok then break end
local data = msg:payload():data()
local reply_to = msg:from()
if type(data) ~= "table" or type(data.amount) ~= "number" then
process.send(reply_to, "nak", "amount must be a number")
else
process.send(reply_to, "ack")
counter = counter + data.amount
process.send(reply_to, "ok", {value = counter})
end
end
end)
coroutine.spawn(function()
local ch = process.listen("decrement", {message = true})
while not done do
local msg, ok = ch:receive()
if not ok then break end
local data = msg:payload():data()
local reply_to = msg:from()
if counter - data.amount < 0 then
process.send(reply_to, "nak", "would result in negative value")
else
process.send(reply_to, "ack")
counter = counter - data.amount
process.send(reply_to, "ok", {value = counter})
end
end
end)
-- Coroutine principal aguarda sinal de finalização
local finish_ch = process.listen("finish", {message = true})
local msg = finish_ch:receive()
process.send(msg:from(), "ack")
process.send(msg:from(), "ok", {message = "finishing"})
done = true
return {final_counter = counter}
end
Confirmação de Sinal
Implemente padrões de requisição-resposta enviando respostas de volta ao remetente:
-- Lado do workflow
local ch = process.listen("get_status", {message = true})
local msg = ch:receive()
process.send(msg:from(), "status_response", {status = "processing", progress = 75})
-- Lado do chamador
local response_ch = process.listen("status_response")
process.send(workflow_pid, "get_status", {})
local timeout = time.after("5s")
local result = channel.select{
response_ch:case_receive(),
timeout:case_receive()
}
if result.channel == response_ch then
local status = result.value
end
Sinalização Entre Workflows
Workflows podem enviar sinais a outros workflows usando seu PID:
-- Workflow remetente
local function main(input)
local target_pid = input.target
local ok, err = process.send(target_pid, "cross_host_ping", {data = "hello"})
if err then
return {ok = false, error = tostring(err)}
end
local response_ch = process.listen("cross_host_pong")
local response = response_ch:receive()
return {ok = true, received = response}
end
Workflows Filhos
Filho Síncrono (workflow.exec)
Executa um workflow filho e aguarda o resultado:
local result, err = workflow.exec("app:child_workflow", input_data)
if err then
return nil, err
end
Filho Assíncrono (process.spawn)
Inicia um workflow filho sem bloquear, depois aguarda sua conclusão via eventos:
local events_ch = process.events()
local child_pid, err = process.spawn(
"app:child_workflow",
"app:worker",
{message = "hello from parent"}
)
if err then
return {status = "spawn_failed", error = tostring(err)}
end
-- Aguarda evento EXIT do filho
local event = events_ch:receive()
if event.kind == process.event.EXIT then
local child_result = event.result.value
local child_error = event.result.error
end
Propagação de Erros de Filhos
Quando um workflow filho retorna um erro, ele aparece no evento EXIT:
local events_ch = process.events()
local child_pid, err = process.spawn(
"app:error_child_workflow",
"app:worker"
)
local event = events_ch:receive()
if event.result.error then
local child_err = event.result.error
-- Objetos de erro possuem métodos kind(), retryable(), message()
print(child_err:kind()) -- ex: "NOT_FOUND"
print(child_err:retryable()) -- false
print(child_err:message()) -- texto da mensagem de erro
end
Executando Workflows de Forma Síncrona (process.exec)
Executa um workflow e aguarda seu resultado em uma única chamada:
local result, err = process.exec(
"app:hello_workflow",
"app:worker",
{name = "world"}
)
if err then
return nil, err
end
-- result contém o valor de retorno do workflow
Monitoramento e Vinculação
Monitoramento Pós-Início
Monitora um workflow após ele já ter iniciado:
local pid, err = process.spawn(
"app:long_workflow",
"app:worker",
{iterations = 100}
)
-- Monitora depois
local ok, err = process.monitor(pid)
local events_ch = process.events()
local event = events_ch:receive() -- EXIT quando o workflow completar
Vinculação Pós-Início
Vincula-se a um workflow em execução para receber LINK_DOWN em terminação anormal:
local ok, err = process.set_options({trap_links = true})
local pid, err = process.spawn(
"app:long_workflow",
"app:worker",
{iterations = 100}
)
-- Vincula após o workflow ter iniciado
time.sleep("200ms")
local ok, err = process.link(pid)
-- Se o workflow for terminado, recebe LINK_DOWN
process.terminate(pid)
local events_ch = process.events()
local event = events_ch:receive()
-- event.kind == process.event.LINK_DOWN
Eventos LINK_DOWN requerem trap_links = true nas opções do processo. Sem isso, a terminação de um processo vinculado propaga a falha.
Desmonitorar / Desvincular
Remove monitoramento ou vinculação:
process.unmonitor(pid) -- para de receber eventos EXIT
process.unlink(pid) -- remove vínculo bidirecional
Após desmonitorar ou desvincular, eventos para aquele processo não são mais entregues.
Terminação e Cancelamento
Terminar
Força a terminação de um workflow em execução:
local ok, err = process.terminate(workflow_pid)
Chamadores monitorados recebem um evento EXIT com erro.
Cancelar
Solicita cancelamento gracioso com um deadline opcional:
local ok, err = process.cancel(workflow_pid, "5s")
Trabalho Concorrente
Use coroutine.spawn() e channels para trabalho paralelo dentro de workflows:
local function main(input)
local worker_count = input.workers or 3
local job_count = input.jobs or 6
local work_queue = channel.new(10)
local results = channel.new(10)
for w = 1, worker_count do
coroutine.spawn(function()
while true do
local job, ok = work_queue:receive()
if not ok then break end
time.sleep(10 * time.MILLISECOND)
results:send({worker = w, job = job, result = job * 2})
end
end)
end
for j = 1, job_count do
work_queue:send(j)
end
work_queue:close()
local total = 0
local processed = {}
for _ = 1, job_count do
local r = results:receive()
total = total + r.result
table.insert(processed, r)
end
return {total = total, processed = processed}
end
Todas as operações de channel e sleeps dentro de coroutines são seguras para replay.
Timers
Timers duráveis sobrevivem a reinicializações:
local time = require("time")
time.sleep("24h")
time.sleep("5m")
time.sleep("30s")
time.sleep(100 * time.MILLISECOND)
Rastreie o tempo decorrido:
local start = time.now()
time.sleep("1s")
local elapsed = time.now():sub(start):milliseconds()
Determinismo
Código de workflow deve ser determinístico. As mesmas entradas devem produzir a mesma sequência de comandos.
Operações Seguras para Replay
Estas operações são automaticamente interceptadas e seus resultados registrados. No replay, os valores registrados são retornados:
-- Activity calls
local data = funcs.call("app:fetch_data", id)
-- Durable sleep
time.sleep("1h")
-- Current time
local now = time.now()
-- UUID generation
local id = uuid.v4()
-- Crypto operations
local bytes = crypto.random_bytes(32)
-- Child workflows
local result = workflow.exec("app:child", input)
-- Versioning
local v = workflow.version("change-1", 1, 2)
Não-Determinístico (Evitar)
-- Não use tempo de relógio de parede
local now = os.time() -- não-determinístico
-- Não use random diretamente
local r = math.random() -- não-determinístico
-- Não faça I/O no código do workflow
local file = io.open("data.txt") -- não-determinístico
-- Não use estado global mutável
counter = counter + 1 -- não-determinístico entre replays
Tratamento de Erros
Erros de Activity
Erros de activity carregam metadados estruturados:
local result, err = funcs.call("app:risky_activity", order)
if err then
print(err:kind()) -- classificação do erro (ex: "NOT_FOUND", "INTERNAL")
print(err:retryable()) -- se o erro permite nova tentativa
print(err:message()) -- mensagem de erro legível
end
Modos de Falha de Activity
Configure o comportamento de retry para chamadas de activity:
local executor = funcs.new():with_options({
["activity.retry_policy"] = {
maximum_attempts = 1,
}
})
local result, err = executor:call("app:unreliable_activity", input)
if err then
local kind = err:kind() -- "INTERNAL" para erros de runtime
local retryable = err:retryable()
end
Erros de Workflow Filho
Erros de workflows filhos (via process.exec ou eventos EXIT) carregam os mesmos metadados:
local result, err = process.exec("app:error_workflow", "app:worker")
if err then
print(err:kind()) -- ex: "NOT_FOUND"
print(err:retryable()) -- false
print(err:message()) -- detalhes do erro
end
Padrão de Compensação (Saga)
local function main(order)
local compensations = {}
local reservation, err = funcs.call("app:reserve_inventory", order.items)
if err then
return {status = "failed", step = "inventory", error = tostring(err)}
end
table.insert(compensations, 1, {
action = "app:release_inventory",
args = reservation.id
})
local payment, err = funcs.call("app:charge_payment", order.payment)
if err then
run_compensations(compensations)
return {status = "failed", step = "payment", error = tostring(err)}
end
table.insert(compensations, 1, {
action = "app:refund_payment",
args = payment.id
})
local shipment, err = funcs.call("app:ship_order", order.shipping)
if err then
run_compensations(compensations)
return {status = "failed", step = "shipping", error = tostring(err)}
end
return {status = "completed", tracking = shipment.tracking}
end
local function run_compensations(compensations)
for _, comp in ipairs(compensations) do
funcs.call(comp.action, comp.args)
end
end
Veja Também
- Visão Geral - Configuração de cliente e worker
- Atividades - Definições e opções de atividades
- Processo - API de gerenciamento de processos
- Funções - Invocação de funções
- Channels - Operações de channel