队列消费者
队列消费者使用工作池处理队列中的消息。
概述
flowchart LR
subgraph Consumer[消费者]
QD[队列驱动] --> DC[投递通道
prefetch=10]
DC --> WP[工作池
concurrency]
WP --> FH[函数处理器]
FH --> AN[Ack/Nack]
end
配置
| 选项 | 默认值 | 最大值 | 说明 |
|---|---|---|---|
queue |
必填 | - | 队列注册表 ID |
func |
必填 | - | 处理函数注册表 ID |
concurrency |
1 | 1000 | 工作线程数 |
prefetch |
10 | 10000 | 消息缓冲区大小 |
入口定义
- name: order_consumer
kind: queue.consumer
queue: app:orders
func: app:process_order
concurrency: 5
prefetch: 20
lifecycle:
auto_start: true
depends_on:
- app:orders
处理函数
处理函数接收消息体:
-- process_order.lua
local json = require("json")
local function handler(body)
local order = json.decode(body)
-- 处理订单
local result, err = process_order(order)
if err then
-- 返回错误触发 Nack(重新入队)
return nil, err
end
-- 成功触发 Ack
return result
end
return handler
- name: process_order
kind: function.lua
source: file://process_order.lua
modules:
- json
确认机制
| 结果 | 动作 | 效果 |
|---|---|---|
| 成功 | Ack | 消息从队列移除 |
| 错误 | Nack | 消息重新入队(取决于驱动) |
工作池
- 工作线程作为并发 goroutine 运行
- 每个工作线程一次处理一条消息
- 消息从投递通道轮询分发
- 预取缓冲区允许驱动提前投递
- 当所有工作线程繁忙且缓冲区满时产生背压
示例
concurrency: 3
prefetch: 10
流程:
1. 驱动向缓冲区投递最多 10 条消息
2. 3 个工作线程并发从缓冲区拉取
3. 工作线程完成后,缓冲区重新填充
4. 所有工作线程繁忙且缓冲区满时产生背压
优雅关闭
停止时:
- 停止接收新投递
- 取消工作线程上下文
- 等待处理中的消息(带超时)
- 如果工作线程未完成则返回超时错误
队列声明
# 队列驱动(开发/测试用内存驱动)
- name: queue_driver
kind: queue.driver.memory
lifecycle:
auto_start: true
# 队列定义
- name: orders
kind: queue.queue
driver: app:queue_driver
options:
queue_name: orders # 覆盖名称(默认:入口名称)
max_length: 10000 # 最大队列大小
durable: true # 重启后持久化
| 选项 | 说明 |
|---|---|
queue_name |
覆盖队列名称(默认:入口 ID 名称) |
max_length |
最大队列大小 |
durable |
重启后持久化(取决于驱动) |
内存驱动
用于开发/测试的内置内存队列:
- 类型:
queue.driver.memory - 消息存储在内存中
- Nack 将消息重新放入队列前端
- 重启后不持久化