Event Bus
O event bus é um sistema pub/sub usando uma única goroutine de dispatcher. Publishers enfileiram ações, o dispatcher as processa sequencialmente, e subscribers recebem eventos correspondentes em channels.
Estrutura de Evento
type Event struct {
System string // Componente/módulo (ex: "registry", "process")
Kind string // Tipo de evento (ex: "create", "update", "exit")
Path string // Identificador da entidade
Data any // Payload
}
Arquitetura do Bus
flowchart LR
subgraph Publishers
P1[Component]
P2[Component]
end
subgraph Bus
Q[actionQueue]
D[dispatcher goroutine]
S[subscribers map]
end
subgraph Subscribers
S1[chan Event]
S2[chan Event]
end
P1 & P2 -->|enqueue| Q
Q -->|signal| D
D -->|match & deliver| S1 & S2
D <-->|manage| S
O bus armazena estado em uma estrutura simples:
type Bus struct {
subscribers map[SubscriberID]sub
subscriberCounter uint64
actionQueue []action
spareQueue []action
actionMu sync.Mutex
actionReady chan struct{} // buffered=1
closed atomic.Bool
}
Todas as mutações passam pela goroutine do dispatcher, eliminando race conditions sem locking complexo.
Ações
Quatro tipos de ação fluem pela fila:
| Ação | Comportamento |
|---|---|
| Subscribe | Adiciona subscriber ao map, responde no done channel |
| Unsubscribe | Remove subscriber, responde no done channel |
| Send | Entrega evento para subscribers correspondentes |
| Stop | Limpa subscribers, drena fila, sai do loop |
Subscribe e Unsubscribe bloqueiam até o dispatcher confirmar. Send é fire-and-forget.
Troca de Fila
O dispatcher usa troca de slices para evitar alocações em estado estável:
func (b *Bus) processActions() bool {
b.actionMu.Lock()
actions := b.actionQueue
b.actionQueue = b.spareQueue[:0]
b.spareQueue = nil
b.actionMu.Unlock()
for i := range actions {
// processar ação
}
clear(actions)
b.actionMu.Lock()
b.spareQueue = actions[:0]
b.actionMu.Unlock()
return true
}
Dois slices alternam: um para processamento, um para novas chegadas. O channel actionReady tem buffer de 1, então sinalizar nunca bloqueia e múltiplos enqueues coalescem em um wakeup.
Pattern Matching
Inscrições compilam padrões uma vez no momento da inscrição:
type sub struct {
subID SubscriberID
ctx context.Context
system *wildcard.Wildcard
kind *wildcard.Wildcard
eventCh chan<- Event
}
O pacote wildcard suporta três tipos de padrão:
| Padrão | Corresponde |
|---|---|
registry |
Apenas match exato |
* |
Qualquer segmento único |
** |
Zero ou mais segmentos |
(a|b) |
Alternação dentro do segmento |
Padrões dividem em . então registry.* corresponde registry.create mas não registry.entry.create. O padrão registry.** corresponde todos os três de registry, registry.create, e registry.entry.create.
Entrega de Eventos
Durante processamento de Send, o dispatcher itera subscribers:
for id, s := range b.subscribers {
if s.system != nil && !s.system.Match(a.event.System) {
continue
}
if s.kind != nil && !s.kind.Match(a.event.Kind) {
continue
}
select {
case <-a.ctx.Done():
goto cleanup
case <-s.ctx.Done():
expiredSubs = append(expiredSubs, id)
case s.eventCh <- a.event:
}
}
Se o contexto de um subscriber for cancelado, ele é marcado para remoção durante aquela passagem de entrega. O contexto do evento também pode cancelar entrega no meio da iteração.
Ponte de Processo Lua
O dispatcher de eventos faz ponte de eventos Go para processos Lua. Ele se inscreve uma vez em todos os eventos ("**") e roteia internamente baseado em inscrições de processos:
type Dispatcher struct {
bus event.Bus
node relay.Node
subID SubscriberID
eventC chan event.Event
mu sync.RWMutex
subs map[string]*subscription // tópico -> inscrição
}
Quando um processo Lua se inscreve via events.subscribe(), o dispatcher armazena o padrão e PID alvo. Eventos correspondentes são empacotados e enviados via relay:
func (d *Dispatcher) routeEvent(evt event.Event) {
d.mu.RLock()
defer d.mu.RUnlock()
for _, sub := range d.subs {
if !matchPattern(sub.system, evt.System) {
continue
}
if sub.kind != "" && !matchPattern(sub.kind, evt.Kind) {
continue
}
data := map[string]any{
"system": evt.System,
"kind": evt.Kind,
"path": evt.Path,
}
if evt.Data != nil {
data["data"] = evt.Data
}
pkg := relay.NewPackage(pid.PID{}, sub.pid, sub.topic, payload.New(data))
d.node.Send(pkg)
}
}
Tipos Auxiliares
Subscriber
Encapsula inscrição de channel com callback:
handler, err := eventbus.NewSubscriber(ctx, bus, "registry", "*.created",
func(evt Event) {
// tratar
})
defer handler.Close()
Cria duas goroutines: uma lê eventos e chama o handler, outra aguarda cancelamento de contexto para desinscrição.
EventRouter
Gerencia múltiplos handlers com ciclo de vida centralizado:
router, err := eventbus.StartRouter(ctx, bus,
WithHandlers(handler1, handler2),
WithLogger(log))
defer router.Stop()
Cada handler implementa Pattern() e Handle(). O router cria um Subscriber para cada e fecha todos em Stop.
Awaiter
Espera síncrona por um evento específico:
awaiter := eventbus.NewAwaiter(bus, "registry", "accept")
waiter, _ := awaiter.Prepare(ctx, "service-id")
defer waiter.Close()
bus.Send(ctx, triggeringEvent)
result := waiter.Wait() // bloqueia até match ou timeout
O padrão Prepare-then-Wait evita race conditions: inscrição antes de acionar o evento que produz a resposta.
Shutdown
Stop()atomicamente define flag closed e enfileira ação Stop- Dispatcher limpa mapa de subscribers
- Ações restantes na fila são drenadas:
- Requisições Subscribe recebem erro "bus is closed"
- Requisições Unsubscribe completam imediatamente
- Eventos Send são descartados
- WaitGroup completa
Veja Também
- Registry - Principal produtor de eventos
- Command Dispatch - Roteamento processo-para-handler