Event-Bus
Der Event-Bus ist ein Pub/Sub-System mit einer einzelnen Dispatcher-Goroutine. Publisher reihen Actions ein, der Dispatcher verarbeitet sie sequentiell, und Subscriber empfangen passende Events auf Channels.
Event-Struktur
type Event struct {
System string // Komponente/Modul (z.B. "registry", "process")
Kind string // Event-Typ (z.B. "create", "update", "exit")
Path string // Entitäts-Identifier
Data any // Payload
}
Bus-Architektur
flowchart LR
subgraph Publishers
P1[Komponente]
P2[Komponente]
end
subgraph Bus
Q[actionQueue]
D[dispatcher goroutine]
S[subscribers map]
end
subgraph Subscribers
S1[chan Event]
S2[chan Event]
end
P1 & P2 -->|enqueue| Q
Q -->|signal| D
D -->|match & deliver| S1 & S2
D <-->|manage| S
Der Bus speichert Zustand in einer einfachen Struktur:
type Bus struct {
subscribers map[SubscriberID]sub
subscriberCounter uint64
actionQueue []action
spareQueue []action
actionMu sync.Mutex
actionReady chan struct{} // buffered=1
closed atomic.Bool
}
Alle Änderungen laufen durch die Dispatcher-Goroutine, wodurch Race-Conditions ohne komplexe Sperrmechanismen vermieden werden.
Actions
Vier Action-Typen fließen durch die Queue:
| Action | Verhalten |
|---|---|
| Subscribe | Fügt Subscriber zur Map hinzu, antwortet auf done-Channel |
| Unsubscribe | Entfernt Subscriber, antwortet auf done-Channel |
| Send | Liefert Event an passende Subscriber |
| Stop | Leert Subscriber, draint Queue, beendet Loop |
Subscribe und Unsubscribe blockieren bis der Dispatcher bestätigt. Send ist Fire-and-Forget.
Queue-Swapping
Der Dispatcher verwendet Slice-Swapping um Allokationen im Steady-State zu vermeiden:
func (b *Bus) processActions() bool {
b.actionMu.Lock()
actions := b.actionQueue
b.actionQueue = b.spareQueue[:0]
b.spareQueue = nil
b.actionMu.Unlock()
for i := range actions {
// action verarbeiten
}
clear(actions)
b.actionMu.Lock()
b.spareQueue = actions[:0]
b.actionMu.Unlock()
return true
}
Zwei Slices alternieren: eines für Verarbeitung, eines für neue Ankünfte. Der actionReady-Channel ist auf 1 gepuffert, sodass Signaling nie blockiert und mehrere Enqueues in einem Wakeup verschmelzen.
Pattern-Matching
Subscriptions kompilieren Patterns einmal bei Subscribe-Zeit:
type sub struct {
subID SubscriberID
ctx context.Context
system *wildcard.Wildcard
kind *wildcard.Wildcard
eventCh chan<- Event
}
Das Wildcard-Paket unterstützt drei Pattern-Typen:
| Pattern | Matched |
|---|---|
registry |
Nur exakter Match |
* |
Einzelnes Segment |
** |
Null oder mehr Segmente |
(a|b) |
Alternation innerhalb Segment |
Patterns splitten auf ., also matched registry.* registry.create aber nicht registry.entry.create. Das Pattern registry.** matched alle drei: registry, registry.create und registry.entry.create.
Event-Zustellung
Während Send-Verarbeitung iteriert der Dispatcher Subscriber:
for id, s := range b.subscribers {
if s.system != nil && !s.system.Match(a.event.System) {
continue
}
if s.kind != nil && !s.kind.Match(a.event.Kind) {
continue
}
select {
case <-a.ctx.Done():
goto cleanup
case <-s.ctx.Done():
expiredSubs = append(expiredSubs, id)
case s.eventCh <- a.event:
}
}
Wenn ein Subscriber-Kontext gecancelt ist, wird er während dieses Zustellungsdurchlaufs zur Entfernung markiert. Der Event-Kontext kann auch Zustellung mitten in der Iteration canceln.
Lua-Prozess-Bridge
Der Events-Dispatcher verbindet Go-Events mit Lua-Prozessen. Er subscribt einmal auf alle Events ("**") und routet intern basierend auf Prozess-Subscriptions:
type Dispatcher struct {
bus event.Bus
node relay.Node
subID SubscriberID
eventC chan event.Event
mu sync.RWMutex
subs map[string]*subscription // topic -> subscription
}
Wenn ein Lua-Prozess via events.subscribe() subscribt, speichert der Dispatcher Pattern und Ziel-PID. Passende Events werden verpackt und via Relay gesendet:
func (d *Dispatcher) routeEvent(evt event.Event) {
d.mu.RLock()
defer d.mu.RUnlock()
for _, sub := range d.subs {
if !matchPattern(sub.system, evt.System) {
continue
}
if sub.kind != "" && !matchPattern(sub.kind, evt.Kind) {
continue
}
data := map[string]any{
"system": evt.System,
"kind": evt.Kind,
"path": evt.Path,
}
if evt.Data != nil {
data["data"] = evt.Data
}
pkg := relay.NewPackage(pid.PID{}, sub.pid, sub.topic, payload.New(data))
d.node.Send(pkg)
}
}
Hilfstypen
Subscriber
Wrappt Channel-Subscription mit einem Callback:
handler, err := eventbus.NewSubscriber(ctx, bus, "registry", "*.created",
func(evt Event) {
// handle
})
defer handler.Close()
Startet zwei Goroutines: eine liest Events und ruft Handler auf, eine andere wartet auf Kontext-Cancellation zum Unsubscriben.
EventRouter
Verwaltet mehrere Handler mit zentralisiertem Lebenszyklus:
router, err := eventbus.StartRouter(ctx, bus,
WithHandlers(handler1, handler2),
WithLogger(log))
defer router.Stop()
Jeder Handler implementiert Pattern() und Handle(). Der Router erstellt einen Subscriber für jeden und schließt alle bei Stop.
Awaiter
Synchrones Warten auf ein spezifisches Event:
awaiter := eventbus.NewAwaiter(bus, "registry", "accept")
waiter, _ := awaiter.Prepare(ctx, "service-id")
defer waiter.Close()
bus.Send(ctx, triggeringEvent)
result := waiter.Wait() // blockiert bis Match oder Timeout
Das Vorbereiten-dann-Warten-Muster vermeidet Race-Conditions: Erst abonnieren, bevor das Ereignis ausgelöst wird, das die Antwort erzeugt.
Shutdown
Stop()setzt atomar closed-Flag und reiht Stop-Action ein- Dispatcher leert Subscriber-Map
- Verbleibende queued Actions werden drainiert:
- Subscribe-Requests erhalten "bus is closed" Fehler
- Unsubscribe-Requests schließen sofort ab
- Send-Events werden verworfen
- WaitGroup wird abgeschlossen
Siehe auch
- Registry - Primärer Event-Producer
- Command-Dispatch - Prozess-zu-Handler-Routing