プロセススーパービジョン

プロセスをモニタリングおよびリンクして、フォールトトレラントなシステムを構築します。

モニタリングとリンク

モニタリングは一方向の監視を提供:

  • 親が子をモニタリング
  • 子が終了すると、親がEXITイベントを受信
  • 親は実行を継続

リンクは双方向の運命共有を作成:

  • 親と子がリンクされる
  • どちらかのプロセスが失敗すると、両方が終了
  • trap_links=trueが設定されていない限り
flowchart TB
    subgraph Monitoring["モニタリング(一方向)"]
        direction TB
        P1[親がモニタリング] -->|EXITイベント
親は継続| C1[子が終了] end subgraph Linking["リンク(双方向)"] direction TB P2[親がリンク] <-->|LINK_DOWN
両方終了| C2[子が終了] end

プロセスモニタリング

モニタリング付き生成

process.spawn_monitored()を使用して生成とモニタリングを1回の呼び出しで実行:

local function main()
    local events_ch = process.events()

    -- ワーカーを生成してモニタリング開始
    local worker_pid, err = process.spawn_monitored(
        "app.workers:task_worker",
        "app:processes"
    )
    if err then
        return nil, "spawn failed: " .. tostring(err)
    end

    -- ワーカーの完了を待機
    local event = events_ch:receive()

    if event.kind == process.event.EXIT then
        print("Worker exited:", event.from)
        if event.result then
            print("Result:", event.result.value)
        end
        if event.result and event.result.error then
            print("Error:", event.result.error)
        end
    end
end

既存プロセスのモニタリング

すでに実行中のプロセスのモニタリングを開始するにはprocess.monitor()を呼び出す:

local function main()
    local time = require("time")
    local events_ch = process.events()

    -- モニタリングなしで生成
    local worker_pid, err = process.spawn(
        "app.workers:long_worker",
        "app:processes"
    )
    if err then
        return nil, "spawn failed: " .. tostring(err)
    end

    -- 後でモニタリングを開始
    local ok, monitor_err = process.monitor(worker_pid)
    if monitor_err then
        return nil, "monitor failed: " .. tostring(monitor_err)
    end

    -- ワーカーをキャンセル
    time.sleep("5ms")
    process.cancel(worker_pid, "100ms")

    -- EXITイベントを受信
    local event = events_ch:receive()
    if event.kind == process.event.EXIT then
        print("Worker terminated:", event.from)
    end
end

モニタリングの停止

EXITイベントの受信を停止するにはprocess.unmonitor()を使用:

local function main()
    local time = require("time")
    local events_ch = process.events()

    -- 生成してモニタリング
    local worker_pid, err = process.spawn_monitored(
        "app.workers:long_worker",
        "app:processes"
    )

    time.sleep("5ms")

    -- モニタリングを停止
    local ok, unmon_err = process.unmonitor(worker_pid)
    if unmon_err then
        return nil, "unmonitor failed: " .. tostring(unmon_err)
    end

    -- ワーカーをキャンセル
    process.cancel(worker_pid, "100ms")

    -- EXITイベントは受信されない(モニタリング解除したため)
    local timeout = time.after("200ms")
    local result = channel.select {
        events_ch:case_receive(),
        timeout:case_receive(),
    }

    if result.channel == events_ch then
        return nil, "should not receive event after unmonitor"
    end
end

プロセスリンク

明示的なリンク

双方向リンクを作成するにはprocess.link()を使用:

-- ターゲットプロセスにリンクするワーカー
local function worker_main()
    local time = require("time")
    local events_ch = process.events()
    local inbox_ch = process.inbox()

    -- LINK_DOWNイベントを受信するためにtrap_linksを有効化
    process.set_options({ trap_links = true })

    -- 送信者からターゲットPIDを受信
    local msg = inbox_ch:receive()
    local target_pid = msg:payload():data()
    local sender = msg:from()

    -- 双方向リンクを作成
    local ok, err = process.link(target_pid)
    if err then
        return nil, "link failed: " .. tostring(err)
    end

    -- 送信者にリンク完了を通知
    process.send(sender, "linked", process.pid())

    -- ターゲット終了時のLINK_DOWNを待機
    local timeout = time.after("3s")
    local result = channel.select {
        events_ch:case_receive(),
        timeout:case_receive(),
    }

    if result.channel == events_ch then
        local event = result.value
        if event.kind == process.event.LINK_DOWN then
            return "LINK_DOWN_RECEIVED"
        end
    end

    return nil, "no LINK_DOWN received"
end

リンク付き生成

process.spawn_linked()を使用して生成とリンクを1回の呼び出しで実行:

local function parent_main()
    -- 子の死亡を処理するためにtrap_linksを有効化
    process.set_options({ trap_links = true })

    local events_ch = process.events()

    -- 子を生成してリンク
    local child_pid, err = process.spawn_linked(
        "app.workers:child_worker",
        "app:processes"
    )
    if err then
        return nil, "spawn_linked failed: " .. tostring(err)
    end

    -- 子が死亡するとLINK_DOWNを受信
    local event = events_ch:receive()
    if event.kind == process.event.LINK_DOWN then
        print("Child died:", event.from)
    end
end

トラップリンク

デフォルトでは、リンクされたプロセスが失敗すると、現在のプロセスも失敗します。代わりにLINK_DOWNイベントを受信するにはtrap_links=trueを設定します。

デフォルト動作(trap_links=false)

trap_linksなしでは、リンクされたプロセスの失敗は現在のプロセスを終了させます:

local function worker_main()
    local events_ch = process.events()

    -- trap_linksはデフォルトでfalse
    local opts = process.get_options()
    print("trap_links:", opts.trap_links)  -- false

    -- 失敗するリンクされたワーカーを生成
    local child_pid, err = process.spawn_linked(
        "app.workers:error_worker",
        "app:processes"
    )

    -- 子がエラーを起こすと、このプロセスは終了
    -- この地点には到達しない
    local event = events_ch:receive()
end

trap_links=trueの場合

LINK_DOWNイベントを受信して生存するにはtrap_linksを有効化:

local function worker_main()
    -- trap_linksを有効化
    process.set_options({ trap_links = true })

    local events_ch = process.events()

    -- 失敗するリンクされたワーカーを生成
    local child_pid, err = process.spawn_linked(
        "app.workers:error_worker",
        "app:processes"
    )

    -- LINK_DOWNイベントを待機
    local event = events_ch:receive()

    if event.kind == process.event.LINK_DOWN then
        print("Child failed, handling gracefully")
        return "LINK_DOWN_RECEIVED"
    end
end

キャンセル

キャンセルシグナルの送信

プロセスを適切に終了させるにはprocess.cancel()を使用:

local function main()
    local time = require("time")
    local events_ch = process.events()

    -- ワーカーを生成してモニタリング
    local worker_pid, err = process.spawn_monitored(
        "app.workers:long_worker",
        "app:processes"
    )

    time.sleep("5ms")

    -- クリーンアップ用に100msのタイムアウトでキャンセル
    local ok, cancel_err = process.cancel(worker_pid, "100ms")
    if cancel_err then
        return nil, "cancel failed: " .. tostring(cancel_err)
    end

    -- EXITイベントを待機
    local event = events_ch:receive()
    if event.kind == process.event.EXIT then
        print("Worker cancelled:", event.from)
    end
end

キャンセルの処理

ワーカーはprocess.events()経由でCANCELイベントを受信:

local function worker_main()
    local events_ch = process.events()
    local inbox_ch = process.inbox()

    while true do
        local result = channel.select {
            inbox_ch:case_receive(),
            events_ch:case_receive(),
        }

        if result.channel == events_ch then
            local event = result.value
            if event.kind == process.event.CANCEL then
                -- リソースをクリーンアップ
                cleanup()
                return "cancelled gracefully"
            end
        else
            -- inboxメッセージを処理
            handle_message(result.value)
        end
    end
end

スーパービジョントポロジー

スタートポロジー

親に戻ってリンクする複数の子を持つ親:

-- 親ワーカーは親にリンクする子を生成
local function star_parent_main()
    local time = require("time")
    local events_ch = process.events()
    local child_count = 10

    -- 子の死亡を確認するためにtrap_linksを有効化
    process.set_options({ trap_links = true })

    local children = {}

    -- 子を生成
    for i = 1, child_count do
        local child_pid, err = process.spawn(
            "app.workers:linker_child",
            "app:processes"
        )
        if err then
            error("spawn child failed: " .. tostring(err))
        end

        -- 親のPIDを子に送信
        process.send(child_pid, "inbox", process.pid())
        children[child_pid] = true
    end

    -- すべての子がリンク確認を待機
    for i = 1, child_count do
        local msg = process.inbox():receive()
        if msg:topic() ~= "linked" then
            error("expected linked confirmation")
        end
    end

    -- 失敗をトリガー - すべての子がLINK_DOWNを受信
    error("PARENT_STAR_FAILURE")
end

親にリンクする子ワーカー:

local function linker_child_main()
    local events_ch = process.events()
    local inbox_ch = process.inbox()

    -- 親のPIDを受信
    local msg = inbox_ch:receive()
    local parent_pid = msg:payload():data()

    -- 親にリンク
    process.link(parent_pid)

    -- リンク確認
    process.send(parent_pid, "linked", process.pid())

    -- 親が死亡したときのLINK_DOWNを待機
    local event = events_ch:receive()
    if event.kind == process.event.LINK_DOWN then
        return "parent_died"
    end
end

チェーントポロジー

各ノードが親にリンクする線形チェーン:

-- チェーンルート: A -> B -> C -> D -> E
local function chain_root_main()
    local time = require("time")

    -- 最初の子を生成
    local child_pid, err = process.spawn_linked(
        "app.workers:chain_node",
        "app:processes",
        4  -- 残りの深さ
    )
    if err then
        error("spawn failed: " .. tostring(err))
    end

    -- チェーン構築を待機
    time.sleep("100ms")

    -- カスケードをトリガー - すべてのリンクされたプロセスが終了
    error("CHAIN_ROOT_FAILURE")
end

チェーンノードは次のノードを生成してリンク:

local function chain_node_main(depth)
    local time = require("time")

    if depth > 0 then
        -- チェーンの次を生成
        local child_pid, err = process.spawn_linked(
            "app.workers:chain_node",
            "app:processes",
            depth - 1
        )
        if err then
            error("spawn failed: " .. tostring(err))
        end
    end

    -- 親の死亡を待機(LINK_DOWNで自身の終了をトリガー)
    time.sleep("5s")
end

スーパービジョン付きワーカープール

設定

# src/_index.yaml
version: "1.0"
namespace: app

entries:
  - name: processes
    kind: process.host
    host:
      workers: 16
    lifecycle:
      auto_start: true
# src/supervisor/_index.yaml
version: "1.0"
namespace: app.supervisor

entries:
  - name: pool
    kind: process.lua
    source: file://pool.lua
    method: main
    modules:
      - time
    lifecycle:
      auto_start: true

スーパーバイザー実装

-- src/supervisor/pool.lua
local function main(worker_count)
    local time = require("time")
    worker_count = worker_count or 4

    -- ワーカーの死亡を処理するためにtrap_linksを有効化
    process.set_options({ trap_links = true })

    local events_ch = process.events()
    local workers = {}

    local function start_worker(id)
        local pid, err = process.spawn_linked(
            "app.workers:task_worker",
            "app:processes",
            id
        )
        if err then
            print("Failed to start worker " .. id .. ": " .. tostring(err))
            return nil
        end

        workers[pid] = {id = id, started_at = os.time()}
        print("Worker " .. id .. " started: " .. pid)
        return pid
    end

    -- 初期プールを起動
    for i = 1, worker_count do
        start_worker(i)
    end

    print("Supervisor started with " .. worker_count .. " workers")

    -- スーパービジョンループ
    while true do
        local timeout = time.after("60s")
        local result = channel.select {
            events_ch:case_receive(),
            timeout:case_receive(),
        }

        if result.channel == timeout then
            -- 定期的なヘルスチェック
            local count = 0
            for _ in pairs(workers) do count = count + 1 end
            print("Health check: " .. count .. " active workers")

        elseif result.channel == events_ch then
            local event = result.value

            if event.kind == process.event.LINK_DOWN then
                local dead_worker = workers[event.from]
                if dead_worker then
                    workers[event.from] = nil
                    local uptime = os.time() - dead_worker.started_at
                    print("Worker " .. dead_worker.id .. " died after " .. uptime .. "s, restarting")

                    -- 再起動前の短い遅延
                    time.sleep("100ms")
                    start_worker(dead_worker.id)
                end
            end
        end
    end
end

return { main = main }

プロセス設定

ワーカー定義

# src/workers/_index.yaml
version: "1.0"
namespace: app.workers

entries:
  - name: task_worker
    kind: process.lua
    source: file://task_worker.lua
    method: main
    modules:
      - time

ワーカー実装

-- src/workers/task_worker.lua
local function main(worker_id)
    local time = require("time")
    local events_ch = process.events()
    local inbox_ch = process.inbox()

    print("Task worker " .. worker_id .. " started")

    while true do
        local timeout = time.after("5s")
        local result = channel.select {
            inbox_ch:case_receive(),
            events_ch:case_receive(),
            timeout:case_receive(),
        }

        if result.channel == events_ch then
            local event = result.value
            if event.kind == process.event.CANCEL then
                print("Worker " .. worker_id .. " cancelled")
                return "cancelled"
            elseif event.kind == process.event.LINK_DOWN then
                print("Worker " .. worker_id .. " linked process died")
                return nil, "linked_process_died"
            end

        elseif result.channel == inbox_ch then
            local msg = result.value
            local topic = msg:topic()
            local payload = msg:payload():data()

            if topic == "work" then
                print("Worker " .. worker_id .. " processing: " .. payload)
                time.sleep("100ms")
                process.send(msg:from(), "result", "completed: " .. payload)
            end

        elseif result.channel == timeout then
            -- アイドルタイムアウト
            print("Worker " .. worker_id .. " idle")
        end
    end
end

return { main = main }

プロセスホスト設定

プロセスホストは、プロセスを実行するOSスレッドの数を制御します:

# src/_index.yaml
version: "1.0"
namespace: app

entries:
  - name: processes
    kind: process.host
    host:
      workers: 16  # OSスレッドの数
    lifecycle:
      auto_start: true

ワーカー設定:

  • CPUバウンドな作業の並列性を制御
  • 通常はCPUコア数に設定
  • すべてのプロセスがこのスレッドプールを共有

主要コンセプト

モニタリング(一方向の監視):

  • process.spawn_monitored()またはprocess.monitor()を使用
  • モニタリング対象のプロセスが終了するとEXITイベントを受信
  • 子が終了しても親は実行を継続

リンク(双方向の運命共有):

  • process.spawn_linked()またはprocess.link()を使用
  • デフォルト:どちらかのプロセスが失敗すると両方が終了
  • trap_links=trueの場合:代わりにLINK_DOWNイベントを受信

キャンセル

  • 適切なシャットダウンにはprocess.cancel(pid, timeout)を使用
  • ワーカーはprocess.events()経由でCANCELイベントを受信
  • 強制終了前のクリーンアップ用タイムアウト期間

イベントタイプ

イベント トリガー 必要な設定
EXIT モニタリング対象のプロセスが終了 spawn_monitored()またはmonitor()
LINK_DOWN リンクされたプロセスが失敗 spawn_linked()またはlink()trap_links=true
CANCEL process.cancel()が呼ばれた なし(常に配信)

次のステップ