Очереди
Wippy предоставляет систему очередей для асинхронной обработки сообщений с настраиваемыми драйверами и консьюмерами.
Архитектура
flowchart LR
P[Publisher] --> D[Driver]
D --> Q[Queue]
Q --> C[Consumer]
C --> W[Worker Pool]
W --> F[Function]
- Драйвер — реализация бэкенда (память, AMQP, Redis)
- Очередь — логическая очередь, привязанная к драйверу
- Консьюмер — связывает очередь с обработчиком, настраивает параллелизм
- Пул воркеров — параллельная обработка сообщений
Несколько очередей могут использовать один драйвер. Несколько консьюмеров могут обрабатывать одну очередь.
Типы записей
| Тип | Описание |
|---|---|
queue.driver.memory |
Драйвер очереди в памяти |
queue.queue |
Объявление очереди с привязкой к драйверу |
queue.consumer |
Консьюмер для обработки сообщений |
Настройка драйвера
Драйвер в памяти
Драйвер в памяти для разработки и тестирования:
- name: memory_driver
kind: queue.driver.memory
lifecycle:
auto_start: true
Настройка очереди
- name: tasks
kind: queue.queue
driver: app.queue:memory_driver
| Поле | Тип | Обязательно | Описание |
|---|---|---|---|
driver |
Registry ID | Да | Ссылка на драйвер очереди |
options |
Map | Нет | Опции, специфичные для драйвера |
Настройка консьюмера
- name: task_consumer
kind: queue.consumer
queue: app.queue:tasks
func: app.queue:task_handler
concurrency: 4
prefetch: 20
lifecycle:
auto_start: true
depends_on:
- app.queue:tasks
| Поле | По умолчанию | Максимум | Описание |
|---|---|---|---|
queue |
Обязательно | - | ID очереди в реестре |
func |
Обязательно | - | ID функции-обработчика |
concurrency |
1 | 1000 | Количество параллельных воркеров |
prefetch |
10 | 10000 | Размер буфера сообщений |
Пул воркеров
Воркеры работают как параллельные горутины:
concurrency: 3, prefetch: 10
1. Драйвер доставляет до 10 сообщений в буфер
2. 3 воркера параллельно забирают из буфера
3. По мере завершения воркеров буфер пополняется
4. Обратное давление при занятых воркерах и полном буфере
Функция-обработчик
Функции консьюмера получают данные сообщения и возвращают успех или ошибку:
local json = require("json")
local logger = require("logger")
local function handler(body)
local data = json.decode(body)
logger.info("Обработка", {task_id = data.id})
local result, err = process_task(data)
if err then
return nil, err -- Nack: вернуть сообщение в очередь
end
return result -- Ack: удалить из очереди
end
return handler
- name: task_handler
kind: function.lua
source: file://task_handler.lua
modules:
- json
- logger
Подтверждение
| Результат обработчика | Действие | Эффект |
|---|---|---|
| Возврат значения | Ack | Сообщение удалено из очереди |
| Возврат ошибки | Nack | Сообщение возвращено в очередь (зависит от драйвера) |
Публикация сообщений
Из Lua-кода:
local queue = require("queue")
queue.publish("app.queue:tasks", {
id = "task-123",
action = "process",
data = payload
})
См. Модуль Queue для полного API.
Корректное завершение
При остановке консьюмера:
- Прекращение приёма новых сообщений
- Отмена контекстов воркеров
- Ожидание завершения обрабатываемых сообщений (с тайм-аутом)
- Ошибка, если воркеры не успели завершиться
См. также
- Модуль Queue — справочник Lua API
- Руководство по консьюмерам — паттерны консьюмеров и пулы воркеров
- Супервизия — управление жизненным циклом консьюмеров