Process Groups

Join processes into named groups and broadcast to every member across the cluster. Modeled on Erlang/OTP pg: groups are dynamic, a process can belong to many groups, and membership is tracked cluster-wide through gossip.

For the scope entry kind and its configuration, see Process Groups. For the broader clustering model, see the Cluster Guide.

Loading

local pg = require("pg")

Opening a Scope

A process group lives inside a scope — a pg.scope registry entry. Open it to get an instance you operate on:

local group, err = pg.open("app:pg")
if err then
    return nil, err
end
Parameter Type Description
id string Scope entry ID (format: "namespace:name")

Returns: pg.Instance, error

Permission: pg.open on the scope id

The instance is released automatically when the process exits; call release() to free it earlier. All other operations are methods on the instance, called with :.

Joining and Leaving

local ok, err = group:join("workers")           -- single group
local ok, err = group:join({"workers", "all"})  -- batch
local ok, err = group:leave("workers")
Parameter Type Description
group string | string[] Group name, or a list of names for a batch operation

Returns: boolean, error

A process may join the same group more than once; it must leave the same number of times to fully depart (multi-join semantics). leave is best-effort across a batch and returns an error only when the process was a member of none of the named groups.

Permissions: pg.join / pg.leave on each group name

Listing Members

local members, err = group:get_members("workers")        -- all nodes
local local_members, err = group:get_local_members("workers")  -- this node only
Parameter Type Description
group string Group name

Returns: string[], error — an array of PID strings (empty for an unknown group)

Permissions: pg.get_members / pg.get_local_members on the group name

Listing Groups

local groups, err = group:which_groups()         -- all groups in the cluster
local local_groups, err = group:which_local_groups()  -- groups with a local member

Returns: string[], error — group names that currently have at least one member

Permissions: pg.which_groups / pg.which_local_groups

Broadcasting

Send a message to every member of a group. Each member receives it under topic from the calling process — handle it with process.listen(topic).

local ok, err = group:broadcast("workers", "task", {id = 42})   -- all nodes
local ok, err = group:broadcast_local("workers", "task", {id = 42})  -- this node only
Parameter Type Description
group string Target group
topic string Message topic
... any Zero or more payload values

Returns: boolean, error

Permissions: pg.broadcast / pg.broadcast_local on the group name

Monitoring a Group

monitor subscribes to join/leave events for one group and returns the current members atomically — no membership change can slip between the snapshot and the subscription.

local sub, members, err = group:monitor("workers")
if err then
    return nil, err
end

for _, pid in ipairs(members) do
    -- current members at subscription time
end

local ch = sub:channel()
local event = ch:receive()  -- {kind = "member.joined" | "member.left", path = "workers", data = {...}}

sub:close()  -- unsubscribe; sub:close({flush = true}) drains queued events first
Parameter Type Description
group string Group to watch

Returns: pg.Subscription, string[], error — the subscription and a snapshot of current members

Permission: pg.monitor on the group name

Watching All Groups

events subscribes to membership changes across every group in the scope and returns a snapshot of all groups to their members.

local sub, snapshot, err = group:events()
-- snapshot: { ["workers"] = {pid, ...}, ["all"] = {pid, ...} }

local event = sub:channel():receive()
sub:close()

Returns: pg.Subscription, table, error

Permission: pg.events

Event Fields

Events delivered on a subscription channel carry:

Field Type Description
system string Always "pg"
kind string "member.joined" or "member.left"
path string The group name
data table {Group = string, PIDs = string[]} — the affected members

Subscription channels are buffered (capacity 64); if a slow consumer fills the buffer, further events for that subscription are dropped.

Releasing

group:release()

Frees the instance immediately. Idempotent; after release, every method returns an error. Cleanup also runs automatically when the process exits.

Returns: boolean

Permissions

Permission Method Resource
pg.open pg.open() scope id
pg.join join() group name
pg.leave leave() group name
pg.get_members get_members() group name
pg.get_local_members get_local_members() group name
pg.which_groups which_groups() (scope)
pg.which_local_groups which_local_groups() (scope)
pg.broadcast broadcast() group name
pg.broadcast_local broadcast_local() group name
pg.monitor monitor() group name
pg.events events() (scope)

Errors

Condition Kind
Permission denied errors.PERMISSION_DENIED
Missing or empty argument errors.INVALID
Scope not found errors.NOT_FOUND
Leave a group with no membership errors.INVALID
Instance released errors.INVALID

See Error Handling for working with errors.

See Also