Process Groups
Join processes into named groups and broadcast to every member across the cluster. Modeled on Erlang/OTP pg: groups are dynamic, a process can belong to many groups, and membership is tracked cluster-wide through gossip.
For the scope entry kind and its configuration, see Process Groups. For the broader clustering model, see the Cluster Guide.
Loading
local pg = require("pg")
Opening a Scope
A process group lives inside a scope — a pg.scope registry entry. Open it to get an instance you operate on:
local group, err = pg.open("app:pg")
if err then
return nil, err
end
| Parameter | Type | Description |
|---|---|---|
id |
string | Scope entry ID (format: "namespace:name") |
Returns: pg.Instance, error
Permission: pg.open on the scope id
The instance is released automatically when the process exits; call release() to free it earlier. All other operations are methods on the instance, called with :.
Joining and Leaving
local ok, err = group:join("workers") -- single group
local ok, err = group:join({"workers", "all"}) -- batch
local ok, err = group:leave("workers")
| Parameter | Type | Description |
|---|---|---|
group |
string | string[] | Group name, or a list of names for a batch operation |
Returns: boolean, error
A process may join the same group more than once; it must leave the same number of times to fully depart (multi-join semantics). leave is best-effort across a batch and returns an error only when the process was a member of none of the named groups.
Permissions: pg.join / pg.leave on each group name
Listing Members
local members, err = group:get_members("workers") -- all nodes
local local_members, err = group:get_local_members("workers") -- this node only
| Parameter | Type | Description |
|---|---|---|
group |
string | Group name |
Returns: string[], error — an array of PID strings (empty for an unknown group)
Permissions: pg.get_members / pg.get_local_members on the group name
Listing Groups
local groups, err = group:which_groups() -- all groups in the cluster
local local_groups, err = group:which_local_groups() -- groups with a local member
Returns: string[], error — group names that currently have at least one member
Permissions: pg.which_groups / pg.which_local_groups
Broadcasting
Send a message to every member of a group. Each member receives it under topic from the calling process — handle it with process.listen(topic).
local ok, err = group:broadcast("workers", "task", {id = 42}) -- all nodes
local ok, err = group:broadcast_local("workers", "task", {id = 42}) -- this node only
| Parameter | Type | Description |
|---|---|---|
group |
string | Target group |
topic |
string | Message topic |
... |
any | Zero or more payload values |
Returns: boolean, error
Permissions: pg.broadcast / pg.broadcast_local on the group name
Monitoring a Group
monitor subscribes to join/leave events for one group and returns the current members atomically — no membership change can slip between the snapshot and the subscription.
local sub, members, err = group:monitor("workers")
if err then
return nil, err
end
for _, pid in ipairs(members) do
-- current members at subscription time
end
local ch = sub:channel()
local event = ch:receive() -- {kind = "member.joined" | "member.left", path = "workers", data = {...}}
sub:close() -- unsubscribe; sub:close({flush = true}) drains queued events first
| Parameter | Type | Description |
|---|---|---|
group |
string | Group to watch |
Returns: pg.Subscription, string[], error — the subscription and a snapshot of current members
Permission: pg.monitor on the group name
Watching All Groups
events subscribes to membership changes across every group in the scope and returns a snapshot of all groups to their members.
local sub, snapshot, err = group:events()
-- snapshot: { ["workers"] = {pid, ...}, ["all"] = {pid, ...} }
local event = sub:channel():receive()
sub:close()
Returns: pg.Subscription, table, error
Permission: pg.events
Event Fields
Events delivered on a subscription channel carry:
| Field | Type | Description |
|---|---|---|
system |
string | Always "pg" |
kind |
string | "member.joined" or "member.left" |
path |
string | The group name |
data |
table | {Group = string, PIDs = string[]} — the affected members |
Subscription channels are buffered (capacity 64); if a slow consumer fills the buffer, further events for that subscription are dropped.
Releasing
group:release()
Frees the instance immediately. Idempotent; after release, every method returns an error. Cleanup also runs automatically when the process exits.
Returns: boolean
Permissions
| Permission | Method | Resource |
|---|---|---|
pg.open |
pg.open() |
scope id |
pg.join |
join() |
group name |
pg.leave |
leave() |
group name |
pg.get_members |
get_members() |
group name |
pg.get_local_members |
get_local_members() |
group name |
pg.which_groups |
which_groups() |
(scope) |
pg.which_local_groups |
which_local_groups() |
(scope) |
pg.broadcast |
broadcast() |
group name |
pg.broadcast_local |
broadcast_local() |
group name |
pg.monitor |
monitor() |
group name |
pg.events |
events() |
(scope) |
Errors
| Condition | Kind |
|---|---|
| Permission denied | errors.PERMISSION_DENIED |
| Missing or empty argument | errors.INVALID |
| Scope not found | errors.NOT_FOUND |
| Leave a group with no membership | errors.INVALID |
| Instance released | errors.INVALID |
See Error Handling for working with errors.
See Also
- Process Groups - Scope entry kind and configuration
- Cluster - Membership, naming, and the clustering model
- Process Management - Spawning and messaging individual processes