チャネルとコルーチン

コルーチン間通信のためのGo形式チャネルを提供します。バッファ付きまたはアンバッファードチャネルを作成し、値を送受信し、select文を使用して並行プロセス間で調整できます。

channelグローバルは常に利用可能です。

チャネルの作成

アンバッファードチャネル(サイズ0)は送信者と受信者の両方が準備できてから転送が完了する必要があります。バッファ付きチャネルは空きがある限り即座に送信が完了します:

-- アンバッファード:送信者と受信者を同期
local sync_ch = channel.new()

-- バッファ付き:最大10メッセージをキュー
local work_queue = channel.new(10)
パラメータ 説明
size integer バッファ容量(デフォルト:0でアンバッファード)

戻り値: channel

値の送信

チャネルに値を送信します。受信者が準備できるまで(アンバッファード)またはバッファスペースが利用可能になるまで(バッファ付き)ブロックします:

-- ワーカープールに作業を送信
local jobs = channel.new(100)
for i, task in ipairs(tasks) do
    jobs:send(task)  -- バッファがいっぱいの場合ブロック
end
jobs:close()  -- これ以上作業がないことをシグナル
パラメータ 説明
value any 送信する値

戻り値: boolean

チャネルがクローズされている場合はエラーが発生します。

値の受信

チャネルから値を受信します。値が利用可能になるかチャネルがクローズされるまでブロックします:

-- ジョブキューからコンシュームするワーカー
while true do
    local job, ok = work:receive()
    if not ok then
        break  -- チャネルがクローズ、これ以上作業なし
    end
    process(job)
end

戻り値: any, boolean

  • value, true - 値を受信
  • nil, false - チャネルがクローズされ空

チャネルのクローズ

チャネルをクローズします。保留中の送信者はエラーを取得し、保留中の受信者はnil, falseを取得します。既にクローズされている場合はエラーが発生します:

local results = channel.new(10)

-- プロデューサーが結果を埋める
for _, item in ipairs(data) do
    results:send(process(item))
end
results:close()  -- 完了をシグナル

複数チャネルからのSelect

複数のチャネル操作を同時に待機します。複数のイベントソースの処理、タイムアウトの実装、レスポンシブなシステムの構築に不可欠です:

local result = channel.select(cases)
パラメータ 説明
cases table selectケースの配列
default boolean trueなら、ケースが準備できていない場合即座に戻る

戻り値: フィールド付きtablechannelvalueokdefault

タイムアウトパターン

time.after()を使用してタイムアウト付きで結果を待機します。

local time = require("time")

local result_ch = worker:response()
local timeout = time.after("5s")

local r = channel.select {
    result_ch:case_receive(),
    timeout:case_receive()
}

if r.channel == timeout then
    return nil, errors.new("TIMEOUT", "Operation timed out")
end
return r.value

ファンインパターン

複数のソースを1つのハンドラにマージします。

local events = process.events()
local inbox = process.inbox()
local shutdown = channel.new()

while true do
    local r = channel.select {
        events:case_receive(),
        inbox:case_receive(),
        shutdown:case_receive()
    }

    if r.channel == shutdown then
        break
    elseif r.channel == events then
        handle_event(r.value)
    else
        handle_message(r.value)
    end
end

ノンブロッキングチェック

ブロックせずにデータが利用可能かチェックします。

local r = channel.select {
    ch:case_receive(),
    default = true
}

if r.default then
    -- 利用可能なものがない、他のことを実行
else
    process(r.value)
end

Selectケースの作成

channel.selectで使用するためのケースを作成します:

-- 送信ケース - チャネルが値を受け付けられるときに完了
ch:case_send(value)

-- 受信ケース - 値が利用可能なときに完了
ch:case_receive()

ワーカープールパターン

local work = channel.new(100)
local results = channel.new(100)

-- ワーカーをスポーン
for i = 1, num_workers do
    process.spawn("app.workers:processor", "app:processes", work, results)
end

-- 作業を送る
for _, item in ipairs(items) do
    work:send(item)
end
work:close()

-- 結果を収集
local processed = {}
while #processed < #items do
    local result, ok = results:receive()
    if not ok then break end
    table.insert(processed, result)
end

エラー

条件 種別 再試行可能
クローズされたチャネルへの送信 runtime error no
クローズされたチャネルのクローズ runtime error no
selectで無効なケース runtime error no

関連項目