メッセージキュー

分散キューからメッセージをパブリッシュおよびコンシュームします。RabbitMQなどのAMQP互換ブローカーを含む複数のバックエンドをサポートしています。

キュー設定についてはキューを参照。

ロード

local queue = require("queue")

メッセージのパブリッシュ

IDでキューにメッセージを送信します:

local ok, err = queue.publish("app:tasks", {
    action = "send_email",
    user_id = 456,
    template = "welcome"
})
if err then
    return nil, err
end
パラメータ 説明
queue_id string キュー識別子(形式: "namespace:name")
data any メッセージデータ(テーブル、文字列、数値、ブール値)
headers table オプションのメッセージヘッダー

戻り値: boolean, error

メッセージヘッダー

ヘッダーはルーティング、優先度、トレースを有効化します:

queue.publish("app:notifications", {
    type = "order_shipped",
    order_id = order.id
}, {
    priority = "high",
    correlation_id = request_id
})

デリバリーコンテキストへのアクセス

キューコンシューマ内で、現在のメッセージにアクセスします:

local msg, err = queue.message()
if err then
    return nil, err
end

local msg_id = msg:id()
local priority = msg:header("priority")
local all_headers = msg:headers()

戻り値: Message, error

コンシューマコンテキストでキューメッセージを処理する場合のみ利用可能です。

メッセージメソッド

メソッド 戻り値 説明
id() string, error 一意のメッセージ識別子
header(key) any, error 単一のヘッダー値(存在しない場合nil)
headers() table, error すべてのメッセージヘッダー

コンシューマパターン

キューコンシューマはペイロードを直接受け取るエントリポイントとして定義します:

entries:
  - kind: queue.consumer
    id: email_worker
    queue: app:emails
    method: handle_email
function handle_email(payload)
    local msg = queue.message()

    logger:info("Processing", {
        message_id = msg:id(),
        to = payload.to
    })

    local ok, err = email.send(payload.to, payload.template, payload.data)
    if err then
        return nil, err  -- メッセージは再キューまたはデッドレター
    end
end

権限

キュー操作はセキュリティポリシー評価の対象です。

アクション リソース 説明
queue.publish - メッセージをパブリッシュする一般的な権限
queue.publish.queue Queue ID 特定のキューへのパブリッシュ

両方の権限がチェックされます。まず一般的な権限、次にキュー固有の権限の順です。

エラー

条件 種別 再試行可能
キューIDが空 errors.INVALID no
メッセージデータが空 errors.INVALID no
デリバリーコンテキストがない errors.INVALID no
権限拒否 errors.PERMISSION_DENIED no
パブリッシュ失敗 errors.INTERNAL yes

エラーの処理についてはエラー処理を参照。

関連項目