Grupos de Proceso

Unir procesos en grupos con nombre y difundir a todos los miembros en el cluster. Modelado sobre pg de Erlang/OTP: los grupos son dinámicos, un proceso puede pertenecer a muchos grupos, y la membresía se rastrea en todo el cluster y es eventualmente consistente.

Para el tipo de entrada de ámbito y su configuración, ver Grupos de Proceso. Para el modelo de clustering más amplio, ver la Guía de Cluster.

Carga

local pg = require("pg")

Abrir un Ámbito

Un grupo de proceso reside dentro de un ámbito — una entrada de registro pg.scope. Abrirlo para obtener una instancia sobre la que operar:

local group, err = pg.open("app:pg")
if err then
    return nil, err
end
Parámetro Tipo Descripción
id string ID de entrada del ámbito (formato: "namespace:name")

Devuelve: pg.Instance, error

Permiso: pg.open sobre el id del ámbito

La instancia se libera automáticamente cuando el proceso sale; llamar release() para liberarla antes. Todas las demás operaciones son métodos de la instancia, llamados con :.

Unirse y Salir

local ok, err = group:join("workers")           -- un solo grupo
local ok, err = group:join({"workers", "all"})  -- lote
local ok, err = group:leave("workers")
Parámetro Tipo Descripción
group string | string[] Nombre de grupo, o una lista de nombres para una operación por lote

Devuelve: boolean, error

Un proceso puede unirse al mismo grupo más de una vez; debe salir el mismo número de veces para abandonarlo completamente (semántica multi-join). leave es de mejor esfuerzo en un lote y devuelve error solo cuando el proceso no era miembro de ninguno de los grupos nombrados.

Permisos: pg.join / pg.leave sobre cada nombre de grupo

Listar Miembros

local members, err = group:get_members("workers")        -- todos los nodos
local local_members, err = group:get_local_members("workers")  -- solo este nodo
Parámetro Tipo Descripción
group string Nombre de grupo

Devuelve: string[], error — un array de strings de PID (vacío para un grupo desconocido)

Permisos: pg.get_members / pg.get_local_members sobre el nombre de grupo

Listar Grupos

local groups, err = group:which_groups()         -- todos los grupos en el cluster
local local_groups, err = group:which_local_groups()  -- grupos con un miembro local

Devuelve: string[], error — nombres de grupos que actualmente tienen al menos un miembro

Permisos: pg.which_groups / pg.which_local_groups

Difusión

Enviar un mensaje a todos los miembros de un grupo. Cada miembro lo recibe bajo topic del proceso que llama — manejarlo con process.listen(topic).

local ok, err = group:broadcast("workers", "task", {id = 42})   -- todos los nodos
local ok, err = group:broadcast_local("workers", "task", {id = 42})  -- solo este nodo
Parámetro Tipo Descripción
group string Grupo destino
topic string Tema del mensaje
... any Cero o más valores de payload

Devuelve: boolean, error

Permisos: pg.broadcast / pg.broadcast_local sobre el nombre de grupo

Monitorear un Grupo

monitor se suscribe a eventos de unión/salida para un grupo y devuelve los miembros actuales atómicamente — ningún cambio de membresía puede deslizarse entre la instantánea y la suscripción.

local sub, members, err = group:monitor("workers")
if err then
    return nil, err
end

for _, pid in ipairs(members) do
    -- miembros actuales en el momento de la suscripción
end

local ch = sub:channel()
local event = ch:receive()  -- {kind = "member.joined" | "member.left", path = "workers", data = {...}}

sub:close()  -- desuscribirse; sub:close({flush = true}) vacía los eventos encolados primero
Parámetro Tipo Descripción
group string Grupo a observar

Devuelve: pg.Subscription, string[], error — la suscripción y una instantánea de los miembros actuales

Permiso: pg.monitor sobre el nombre de grupo

Observar Todos los Grupos

events se suscribe a cambios de membresía en todos los grupos del ámbito y devuelve una instantánea de todos los grupos con sus miembros.

local sub, snapshot, err = group:events()
-- snapshot: { ["workers"] = {pid, ...}, ["all"] = {pid, ...} }

local event = sub:channel():receive()
sub:close()

Devuelve: pg.Subscription, table, error

Permiso: pg.events

Campos del Evento

Los eventos entregados en un canal de suscripción contienen:

Campo Tipo Descripción
system string Siempre "pg"
kind string "member.joined" o "member.left"
path string El nombre del grupo
data table {Group = string, PIDs = string[]} — los miembros afectados

Los canales de suscripción tienen buffer (capacidad 64); si un consumidor lento llena el buffer, los eventos posteriores para esa suscripción se descartan.

Liberar

group:release()

Libera la instancia inmediatamente. Idempotente; tras la liberación, cada método devuelve un error. La limpieza también se ejecuta automáticamente cuando el proceso sale.

Devuelve: boolean

Permisos

Permiso Método Recurso
pg.open pg.open() id del ámbito
pg.join join() nombre de grupo
pg.leave leave() nombre de grupo
pg.get_members get_members() nombre de grupo
pg.get_local_members get_local_members() nombre de grupo
pg.which_groups which_groups() (ámbito)
pg.which_local_groups which_local_groups() (ámbito)
pg.broadcast broadcast() nombre de grupo
pg.broadcast_local broadcast_local() nombre de grupo
pg.monitor monitor() nombre de grupo
pg.events events() (ámbito)

Errores

Condición Tipo
Permiso denegado errors.PERMISSION_DENIED
Argumento faltante o vacío errors.INVALID
Ámbito no encontrado errors.NOT_FOUND
Salir de un grupo sin membresía errors.INVALID
Instancia liberada errors.INVALID

Consulte Manejo de Errores para trabajar con errores.

Ver También