Prozessgruppen

Prozesse in benannte Gruppen aufnehmen und an jedes Mitglied im Cluster senden. Modelliert nach Erlang/OTP pg: Gruppen sind dynamisch, ein Prozess kann vielen Gruppen angehören, und die Mitgliedschaft wird clusterweit über Gossip verfolgt.

Für den Scope-Eintragstyp und seine Konfiguration siehe Prozessgruppen. Für das umfassendere Clustering-Modell siehe den Cluster-Leitfaden.

Laden

local pg = require("pg")

Einen Scope öffnen

Eine Prozessgruppe lebt innerhalb eines Scopes — einem pg.scope-Registry-Eintrag. Öffnen Sie ihn, um eine Instanz zu erhalten, auf der Sie operieren:

local group, err = pg.open("app:pg")
if err then
    return nil, err
end
Parameter Typ Beschreibung
id string Scope-Eintrag-ID (Format: "namespace:name")

Gibt zurück: pg.Instance, error

Berechtigung: pg.open auf der Scope-id

Die Instanz wird automatisch freigegeben, wenn der Prozess endet; rufen Sie release() auf, um sie früher freizugeben. Alle anderen Operationen sind Methoden der Instanz, aufgerufen mit :.

Beitreten und Verlassen

local ok, err = group:join("workers")           -- einzelne Gruppe
local ok, err = group:join({"workers", "all"})  -- Batch
local ok, err = group:leave("workers")
Parameter Typ Beschreibung
group string | string[] Gruppenname oder eine Liste von Namen für eine Batch-Operation

Gibt zurück: boolean, error

Ein Prozess kann derselben Gruppe mehr als einmal beitreten; er muss genauso oft austreten, um vollständig auszuscheiden (Multi-Join-Semantik). leave ist Best-Effort über einen Batch und gibt nur dann einen Fehler zurück, wenn der Prozess in keiner der genannten Gruppen Mitglied war.

Berechtigungen: pg.join / pg.leave auf jedem Gruppennamen

Mitglieder auflisten

local members, err = group:get_members("workers")        -- alle Knoten
local local_members, err = group:get_local_members("workers")  -- nur dieser Knoten
Parameter Typ Beschreibung
group string Gruppenname

Gibt zurück: string[], error — ein Array von PID-Strings (leer für eine unbekannte Gruppe)

Berechtigungen: pg.get_members / pg.get_local_members auf dem Gruppennamen

Gruppen auflisten

local groups, err = group:which_groups()         -- alle Gruppen im Cluster
local local_groups, err = group:which_local_groups()  -- Gruppen mit einem lokalen Mitglied

Gibt zurück: string[], error — Gruppennamen, die aktuell mindestens ein Mitglied haben

Berechtigungen: pg.which_groups / pg.which_local_groups

Broadcast

Eine Nachricht an jedes Mitglied einer Gruppe senden. Jedes Mitglied empfängt sie unter topic vom aufrufenden Prozess — mit process.listen(topic) verarbeiten.

local ok, err = group:broadcast("workers", "task", {id = 42})   -- alle Knoten
local ok, err = group:broadcast_local("workers", "task", {id = 42})  -- nur dieser Knoten
Parameter Typ Beschreibung
group string Zielgruppe
topic string Nachrichten-Topic
... any Null oder mehr Payload-Werte

Gibt zurück: boolean, error

Berechtigungen: pg.broadcast / pg.broadcast_local auf dem Gruppennamen

Eine Gruppe überwachen

monitor abonniert Beitritts-/Verlassens-Ereignisse für eine Gruppe und gibt die aktuellen Mitglieder atomar zurück — keine Mitgliedschaftsänderung kann zwischen dem Snapshot und dem Abonnement verlorengehen.

local sub, members, err = group:monitor("workers")
if err then
    return nil, err
end

for _, pid in ipairs(members) do
    -- aktuelle Mitglieder zum Abonnementzeitpunkt
end

local ch = sub:channel()
local event = ch:receive()  -- {kind = "member.joined" | "member.left", path = "workers", data = {...}}

sub:close()  -- abbestellen; sub:close({flush = true}) leert zuerst wartende Ereignisse
Parameter Typ Beschreibung
group string Zu überwachende Gruppe

Gibt zurück: pg.Subscription, string[], error — das Abonnement und ein Snapshot der aktuellen Mitglieder

Berechtigung: pg.monitor auf dem Gruppennamen

Alle Gruppen beobachten

events abonniert Mitgliedschaftsänderungen in jeder Gruppe im Scope und gibt einen Snapshot aller Gruppen zu ihren Mitgliedern zurück.

local sub, snapshot, err = group:events()
-- snapshot: { ["workers"] = {pid, ...}, ["all"] = {pid, ...} }

local event = sub:channel():receive()
sub:close()

Gibt zurück: pg.Subscription, table, error

Berechtigung: pg.events

Ereignis-Felder

Ereignisse, die auf einem Abonnement-Channel geliefert werden, enthalten:

Feld Typ Beschreibung
system string Immer "pg"
kind string "member.joined" oder "member.left"
path string Der Gruppenname
data table {Group = string, PIDs = string[]} — die betroffenen Mitglieder

Abonnement-Channels sind gepuffert (Kapazität 64); wenn ein langsamer Konsument den Puffer füllt, werden weitere Ereignisse für dieses Abonnement verworfen.

Freigeben

group:release()

Gibt die Instanz sofort frei. Idempotent; nach der Freigabe gibt jede Methode einen Fehler zurück. Die Bereinigung läuft auch automatisch, wenn der Prozess endet.

Gibt zurück: boolean

Berechtigungen

Berechtigung Methode Ressource
pg.open pg.open() Scope-ID
pg.join join() Gruppenname
pg.leave leave() Gruppenname
pg.get_members get_members() Gruppenname
pg.get_local_members get_local_members() Gruppenname
pg.which_groups which_groups() (Scope)
pg.which_local_groups which_local_groups() (Scope)
pg.broadcast broadcast() Gruppenname
pg.broadcast_local broadcast_local() Gruppenname
pg.monitor monitor() Gruppenname
pg.events events() (Scope)

Fehler

Bedingung Art
Berechtigung verweigert errors.PERMISSION_DENIED
Fehlendes oder leeres Argument errors.INVALID
Scope nicht gefunden errors.NOT_FOUND
Gruppe verlassen ohne Mitgliedschaft errors.INVALID
Instanz freigegeben errors.INVALID

Siehe Fehlerbehandlung für die Arbeit mit Fehlern.

Siehe auch