工作流
工作流是持久的、长时间运行的操作,能在崩溃和重启后存活。它们为支付、订单履行和多步骤审批等关键业务流程提供可靠性保证。
为什么使用工作流?
函数是临时性的——如果宿主崩溃,进行中的工作就会丢失。工作流持久化它们的状态:
| 方面 | 函数 | 工作流 |
|---|---|---|
| 状态 | 内存中 | 持久化 |
| 崩溃 | 工作丢失 | 恢复 |
| 持续时间 | 秒到分钟 | 小时到月 |
| 完成 | 尽力而为 | 保证 |
工作流如何运作
工作流代码看起来像普通的 Lua 代码:
local funcs = require("funcs")
local time = require("time")
local result = funcs.call("app.api:charge_card", payment)
time.sleep("24h")
local status = funcs.call("app.api:check_status", result.id)
if status == "failed" then
funcs.call("app.api:refund", result.id)
end
工作流引擎拦截调用并记录结果。如果进程崩溃,执行从历史记录重放——相同的代码,相同的结果。
funcs.call()、time.sleep()、uuid.v4() 和 time.now() 等操作被拦截并记录其结果。在重放时,返回记录的值而不是重新执行。
工作流模式
Saga 模式
失败时补偿:
local funcs = require("funcs")
local inventory = funcs.call("app.inventory:reserve", items)
if inventory.error then
return nil, inventory.error
end
local payment = funcs.call("app.payments:charge", amount)
if payment.error then
funcs.call("app.inventory:release", inventory.id)
return nil, payment.error
end
local shipping = funcs.call("app.shipping:create", order)
if shipping.error then
funcs.call("app.payments:refund", payment.id)
funcs.call("app.inventory:release", inventory.id)
return nil, shipping.error
end
return {inventory = inventory, payment = payment, shipping = shipping}
等待信号
等待外部事件(审批决定、webhook、用户操作):
local funcs = require("funcs")
funcs.call("app.approvals:submit", request)
local inbox = process.inbox()
local msg = inbox:receive() -- 阻塞直到信号到达
if msg.approved then
funcs.call("app.orders:fulfill", request.order_id)
else
funcs.call("app.notifications:send_rejection", request)
end
何时使用什么
| 用例 | 选择 |
|---|---|
| HTTP 请求处理 | 函数 |
| 数据转换 | 函数 |
| 后台作业 | 进程 |
| 用户会话状态 | 进程 |
| 实时消息 | 进程 |
| 支付处理 | 工作流 |
| 订单履行 | 工作流 |
| 多天审批 | 工作流 |
启动工作流
工作流的生成方式与进程相同——使用 process.spawn() 但使用不同的宿主:
-- 在 temporal worker 上生成工作流
local pid = process.spawn("app.workflows:order_processor", "app:temporal_worker", order_data)
-- 向工作流发送信号
process.send(pid, "update", {status = "approved"})
从调用者的角度来看,API 是相同的。区别在于宿主:工作流运行在 temporal.worker 而不是 process.host 上。
process.spawn() 生成子进程时,它们成为同一提供者上的子工作流,保持持久性保证。
失败和监管
进程可以使用 process.service 作为受监管的服务运行:
# 进程定义
- name: session_handler
kind: process.lua
source: file://session_handler.lua
method: main
# 包装进程的受监管服务
- name: session_manager
kind: process.service
process: app:session_handler
host: app:processes
lifecycle:
auto_start: true
restart:
max_attempts: 10
工作流不使用监管树——它们由工作流提供者(Temporal)自动管理。提供者处理持久化、重试和恢复。
配置
进程定义(动态生成):
- name: order_processor
kind: workflow.lua
source: file://order_processor.lua
method: main
modules:
- funcs
- time
工作流提供者:
- name: temporal_worker
kind: temporal.worker
client: app:temporal_client
task_queue: "orders"
lifecycle:
auto_start: true
参见 Temporal 了解生产级工作流基础设施。