Очереди

Wippy предоставляет систему очередей для асинхронной обработки сообщений с настраиваемыми драйверами и консьюмерами.

Архитектура

flowchart LR
    P[Publisher] --> D[Driver]
    D --> Q[Queue]
    Q --> C[Consumer]
    C --> W[Worker Pool]
    W --> F[Function]
  • Драйвер — реализация бэкенда (память, AMQP, Redis)
  • Очередь — логическая очередь, привязанная к драйверу
  • Консьюмер — связывает очередь с обработчиком, настраивает параллелизм
  • Пул воркеров — параллельная обработка сообщений

Несколько очередей могут использовать один драйвер. Несколько консьюмеров могут обрабатывать одну очередь.

Типы записей

Тип Описание
queue.driver.memory Драйвер очереди в памяти
queue.queue Объявление очереди с привязкой к драйверу
queue.consumer Консьюмер для обработки сообщений

Настройка драйвера

Драйвер в памяти

Драйвер в памяти для разработки и тестирования:

- name: memory_driver
  kind: queue.driver.memory
  lifecycle:
    auto_start: true
Дополнительные драйверы (AMQP, Redis, SQS) планируются. Интерфейс драйвера позволяет менять бэкенд без изменения конфигурации очередей и консьюмеров.

Настройка очереди

- name: tasks
  kind: queue.queue
  driver: app.queue:memory_driver
Поле Тип Обязательно Описание
driver Registry ID Да Ссылка на драйвер очереди
options Map Нет Опции, специфичные для драйвера
Драйвер в памяти не имеет опций. Внешние драйверы (AMQP, Redis, SQS) определяют свои опции для настройки поведения: durability, max length, TTL и т.д.

Настройка консьюмера

- name: task_consumer
  kind: queue.consumer
  queue: app.queue:tasks
  func: app.queue:task_handler
  concurrency: 4
  prefetch: 20
  lifecycle:
    auto_start: true
    depends_on:
      - app.queue:tasks
Поле По умолчанию Максимум Описание
queue Обязательно - ID очереди в реестре
func Обязательно - ID функции-обработчика
concurrency 1 1000 Количество параллельных воркеров
prefetch 10 10000 Размер буфера сообщений
Консьюмеры учитывают контекст вызова и могут подчиняться политикам безопасности. Настройте актёра и политики на уровне lifecycle. См. Безопасность.

Пул воркеров

Воркеры работают как параллельные горутины:

concurrency: 3, prefetch: 10

1. Драйвер доставляет до 10 сообщений в буфер
2. 3 воркера параллельно забирают из буфера
3. По мере завершения воркеров буфер пополняется
4. Обратное давление при занятых воркерах и полном буфере

Функция-обработчик

Функции консьюмера получают данные сообщения и возвращают успех или ошибку:

local json = require("json")
local logger = require("logger")

local function handler(body)
    local data = json.decode(body)

    logger.info("Обработка", {task_id = data.id})

    local result, err = process_task(data)
    if err then
        return nil, err  -- Nack: вернуть сообщение в очередь
    end

    return result  -- Ack: удалить из очереди
end

return handler
- name: task_handler
  kind: function.lua
  source: file://task_handler.lua
  modules:
    - json
    - logger

Подтверждение

Результат обработчика Действие Эффект
Возврат значения Ack Сообщение удалено из очереди
Возврат ошибки Nack Сообщение возвращено в очередь (зависит от драйвера)

Публикация сообщений

Из Lua-кода:

local queue = require("queue")

queue.publish("app.queue:tasks", {
    id = "task-123",
    action = "process",
    data = payload
})

См. Модуль Queue для полного API.

Корректное завершение

При остановке консьюмера:

  1. Прекращение приёма новых сообщений
  2. Отмена контекстов воркеров
  3. Ожидание завершения обрабатываемых сообщений (с тайм-аутом)
  4. Ошибка, если воркеры не успели завершиться

См. также