Очередь сообщений
Публикация и потребление сообщений из распределённых очередей. Поддержка нескольких бэкендов, включая RabbitMQ и другие AMQP-совместимые брокеры.
Настройку очередей см. в Queue.
Загрузка
local queue = require("queue")
Публикация сообщений
Отправка сообщений в очередь по ID:
local ok, err = queue.publish("app:tasks", {
action = "send_email",
user_id = 456,
template = "welcome"
})
if err then
return nil, err
end
| Параметр | Тип | Описание |
|---|---|---|
queue_id |
string | Идентификатор очереди (формат: "namespace:name") |
data |
any | Данные сообщения (таблицы, строки, числа, булевы) |
headers |
table | Опциональные заголовки сообщения |
Возвращает: boolean, error
Заголовки сообщений
Заголовки обеспечивают маршрутизацию, приоритезацию и трассировку:
queue.publish("app:notifications", {
type = "order_shipped",
order_id = order.id
}, {
priority = "high",
correlation_id = request_id
})
Доступ к контексту доставки
В консьюмере очереди доступ к текущему сообщению:
local msg, err = queue.message()
if err then
return nil, err
end
local msg_id = msg:id()
local priority = msg:header("priority")
local all_headers = msg:headers()
Возвращает: Message, error
Доступен только при обработке сообщений в контексте консьюмера.
Методы Message
| Метод | Возвращает | Описание |
|---|---|---|
id() |
string, error |
Уникальный идентификатор сообщения |
header(key) |
any, error |
Одно значение заголовка (nil если отсутствует) |
headers() |
table, error |
Все заголовки сообщения |
Паттерн консьюмера
Консьюмеры очередей определяются как точки входа, получающие payload напрямую:
entries:
- kind: queue.consumer
id: email_worker
queue: app:emails
method: handle_email
function handle_email(payload)
local msg = queue.message()
logger:info("Processing", {
message_id = msg:id(),
to = payload.to
})
local ok, err = email.send(payload.to, payload.template, payload.data)
if err then
return nil, err -- Сообщение будет возвращено в очередь или отправлено в dead-letter
end
end
Разрешения
Операции очереди подчиняются вычислению политики безопасности.
| Действие | Ресурс | Описание |
|---|---|---|
queue.publish |
- | Общее разрешение на публикацию сообщений |
queue.publish.queue |
ID очереди | Публикация в конкретную очередь |
Проверяются оба разрешения: сначала общее, затем для конкретной очереди.
Ошибки
| Условие | Kind | Повторяемо |
|---|---|---|
| Пустой ID очереди | errors.INVALID |
нет |
| Пустые данные сообщения | errors.INVALID |
нет |
| Нет контекста доставки | errors.INVALID |
нет |
| Доступ запрещён | errors.PERMISSION_DENIED |
нет |
| Ошибка публикации | errors.INTERNAL |
да |
См. Обработка ошибок для работы с ошибками.
См. также
- Настройка очередей — драйверы очередей и определения точек входа
- Руководство по консьюмерам — паттерны консьюмеров и пулы воркеров
- Управление процессами — создание процессов и коммуникация
- Каналы — паттерны межпроцессной коммуникации
- Функции — асинхронный вызов функций