Потребители очередей
Потребители очередей обрабатывают сообщения с помощью пулов воркеров.
Обзор
flowchart LR
subgraph Consumer[Потребитель]
QD[Драйвер очереди] --> DC[Канал доставки
prefetch=10]
DC --> WP[Пул воркеров
concurrency]
WP --> FH[Обработчик функции]
FH --> AN[Ack/Nack]
end
Конфигурация
| Параметр | По умолчанию | Максимум | Описание |
|---|---|---|---|
queue |
Обязательно | - | ID очереди в реестре |
func |
Обязательно | - | ID функции-обработчика в реестре |
concurrency |
1 | 1000 | Количество воркеров |
prefetch |
10 | 10000 | Размер буфера сообщений |
Определение записи
- name: order_consumer
kind: queue.consumer
queue: app:orders
func: app:process_order
concurrency: 5
prefetch: 20
lifecycle:
auto_start: true
depends_on:
- app:orders
Функция-обработчик
Обработчик получает тело сообщения:
-- process_order.lua
local json = require("json")
local function handler(body)
local order = json.decode(body)
-- Обработка заказа
local result, err = process_order(order)
if err then
-- Возврат ошибки вызывает Nack (повторная постановка в очередь)
return nil, err
end
-- Успех вызывает Ack
return result
end
return handler
- name: process_order
kind: function.lua
source: file://process_order.lua
modules:
- json
Подтверждение
| Результат | Действие | Эффект |
|---|---|---|
| Успех | Ack | Сообщение удаляется из очереди |
| Ошибка | Nack | Сообщение возвращается в очередь (зависит от драйвера) |
Пул воркеров
- Воркеры работают как параллельные горутины
- Каждый воркер обрабатывает одно сообщение за раз
- Сообщения распределяются round-robin из канала доставки
- Буфер prefetch позволяет драйверу доставлять сообщения заранее
Пример
concurrency: 3
prefetch: 10
Поток:
1. Драйвер доставляет до 10 сообщений в буфер
2. 3 воркера параллельно забирают из буфера
3. По мере завершения воркеров буфер пополняется
4. Backpressure, когда все воркеры заняты и буфер полон
Корректное завершение
При остановке:
- Прекращение приёма новых сообщений
- Отмена контекстов воркеров
- Ожидание обрабатываемых сообщений (с таймаутом)
- Возврат ошибки таймаута, если воркеры не завершились
Объявление очереди
# Драйвер очереди (memory для разработки/тестов)
- name: queue_driver
kind: queue.driver.memory
lifecycle:
auto_start: true
# Определение очереди
- name: orders
kind: queue.queue
driver: app:queue_driver
options:
queue_name: orders # Переопределить имя (по умолчанию: имя записи)
max_length: 10000 # Максимальный размер очереди
durable: true # Переживает перезапуски
| Параметр | Описание |
|---|---|
queue_name |
Переопределить имя очереди (по умолчанию: имя записи) |
max_length |
Максимальный размер очереди |
durable |
Переживает перезапуски (зависит от драйвера) |
Memory-драйвер
Встроенная in-memory очередь для разработки и тестирования:
- Тип:
queue.driver.memory - Сообщения хранятся в памяти
- Nack возвращает сообщение в начало очереди
- Не сохраняется между перезапусками
См. также
- Очереди сообщений — справочник модуля Queue
- Конфигурация очередей — драйверы и определения записей
- Деревья супервизии — жизненный цикл потребителей
- Управление процессами — создание процессов и взаимодействие