プロセスグループ
プロセスを名前付きグループに参加させ、クラスタ全体のすべてのメンバーにブロードキャストします。Erlang/OTP pg をモデルにしています: グループは動的で、プロセスは複数のグループに所属でき、メンバーシップはゴシップを通じてクラスタ全体で追跡されます。
スコープエントリ種別とその設定についてはプロセスグループを参照。クラスタリングモデル全体についてはクラスタガイドを参照。
ロード
local pg = require("pg")
スコープを開く
プロセスグループはスコープ — pg.scope レジストリエントリ — の中に存在します。インスタンスを取得するにはそれを開きます:
local group, err = pg.open("app:pg")
if err then
return nil, err
end
| パラメータ | 型 | 説明 |
|---|---|---|
id |
string | スコープエントリID(形式: "namespace:name") |
戻り値: pg.Instance, error
権限: スコープ id に対する pg.open
インスタンスはプロセス終了時に自動的に解放されます。早期に解放するには release() を呼び出します。他の操作はすべてインスタンスのメソッドで、: で呼び出します。
参加と離脱
local ok, err = group:join("workers") -- 単一グループ
local ok, err = group:join({"workers", "all"}) -- バッチ
local ok, err = group:leave("workers")
| パラメータ | 型 | 説明 |
|---|---|---|
group |
string | string[] | グループ名、またはバッチ操作の名前リスト |
戻り値: boolean, error
プロセスは同じグループに複数回参加できます。完全に離脱するには同じ回数 leave する必要があります(マルチ参加セマンティクス)。leave はバッチ全体でベストエフォートで、指定されたいずれのグループにもメンバーでない場合のみエラーを返します。
権限: 各グループ名に対する pg.join / pg.leave
メンバーの一覧取得
local members, err = group:get_members("workers") -- 全ノード
local local_members, err = group:get_local_members("workers") -- このノードのみ
| パラメータ | 型 | 説明 |
|---|---|---|
group |
string | グループ名 |
戻り値: string[], error — PID文字列の配列(不明なグループは空)
権限: グループ名に対する pg.get_members / pg.get_local_members
グループの一覧取得
local groups, err = group:which_groups() -- クラスタ内の全グループ
local local_groups, err = group:which_local_groups() -- ローカルメンバーを持つグループ
戻り値: string[], error — 現在少なくとも1つのメンバーを持つグループ名
権限: pg.which_groups / pg.which_local_groups
ブロードキャスト
グループのすべてのメンバーにメッセージを送信します。各メンバーは呼び出しプロセスから topic 名でメッセージを受け取ります — process.listen(topic) で処理します。
local ok, err = group:broadcast("workers", "task", {id = 42}) -- 全ノード
local ok, err = group:broadcast_local("workers", "task", {id = 42}) -- このノードのみ
| パラメータ | 型 | 説明 |
|---|---|---|
group |
string | 対象グループ |
topic |
string | メッセージトピック |
... |
any | ゼロ個以上のペイロード値 |
戻り値: boolean, error
権限: グループ名に対する pg.broadcast / pg.broadcast_local
グループの監視
monitor は1つのグループの参加/離脱イベントをサブスクライブし、現在のメンバーをアトミックに返します — サブスクリプションとスナップショットの間でメンバーシップ変更が抜け落ちることはありません。
local sub, members, err = group:monitor("workers")
if err then
return nil, err
end
for _, pid in ipairs(members) do
-- サブスクリプション時の現在のメンバー
end
local ch = sub:channel()
local event = ch:receive() -- {kind = "member.joined" | "member.left", path = "workers", data = {...}}
sub:close() -- アンサブスクライブ。sub:close({flush = true}) でキューされたイベントを先にドレイン
| パラメータ | 型 | 説明 |
|---|---|---|
group |
string | 監視するグループ |
戻り値: pg.Subscription, string[], error — サブスクリプションと現在のメンバーのスナップショット
権限: グループ名に対する pg.monitor
全グループの監視
events はスコープ内のすべてのグループにまたがるメンバーシップ変更をサブスクライブし、すべてのグループとそのメンバーのスナップショットを返します。
local sub, snapshot, err = group:events()
-- snapshot: { ["workers"] = {pid, ...}, ["all"] = {pid, ...} }
local event = sub:channel():receive()
sub:close()
戻り値: pg.Subscription, table, error
権限: pg.events
イベントフィールド
サブスクリプションチャネルで配信されるイベントには以下が含まれます:
| フィールド | 型 | 説明 |
|---|---|---|
system |
string | 常に "pg" |
kind |
string | "member.joined" または "member.left" |
path |
string | グループ名 |
data |
table | {Group = string, PIDs = string[]} — 影響を受けるメンバー |
サブスクリプションチャネルはバッファ付き(容量64)。遅いコンシューマがバッファを満たすと、そのサブスクリプションへのイベントはドロップされます。
解放
group:release()
インスタンスを即座に解放します。冪等です。解放後はすべてのメソッドがエラーを返します。プロセス終了時にもクリーンアップは自動的に実行されます。
戻り値: boolean
権限
| 権限 | メソッド | リソース |
|---|---|---|
pg.open |
pg.open() |
scope id |
pg.join |
join() |
group name |
pg.leave |
leave() |
group name |
pg.get_members |
get_members() |
group name |
pg.get_local_members |
get_local_members() |
group name |
pg.which_groups |
which_groups() |
(scope) |
pg.which_local_groups |
which_local_groups() |
(scope) |
pg.broadcast |
broadcast() |
group name |
pg.broadcast_local |
broadcast_local() |
group name |
pg.monitor |
monitor() |
group name |
pg.events |
events() |
(scope) |
エラー
| 条件 | 種別 |
|---|---|
| 権限拒否 | errors.PERMISSION_DENIED |
| 引数が欠損または空 | errors.INVALID |
| スコープが見つからない | errors.NOT_FOUND |
| メンバーでないグループからの離脱 | errors.INVALID |
| インスタンスが解放済み | errors.INVALID |
エラーの処理についてはエラー処理を参照。