Event Bus
El event bus es un sistema pub/sub usando una sola goroutine dispatcher. Los publishers encolan acciones, el dispatcher las procesa secuencialmente, y los subscribers reciben eventos matcheados en canales.
Estructura de Evento
type Event struct {
System string // Componente/módulo (ej. "registry", "process")
Kind string // Tipo de evento (ej. "create", "update", "exit")
Path string // Identificador de entidad
Data any // Payload
}
Arquitectura del Bus
flowchart LR
subgraph Publishers
P1[Componente]
P2[Componente]
end
subgraph Bus
Q[actionQueue]
D[dispatcher goroutine]
S[subscribers map]
end
subgraph Subscribers
S1[chan Event]
S2[chan Event]
end
P1 & P2 -->|encolar| Q
Q -->|signal| D
D -->|match & entregar| S1 & S2
D <-->|gestionar| S
El bus almacena estado en una estructura simple:
type Bus struct {
subscribers map[SubscriberID]sub
subscriberCounter uint64
actionQueue []action
spareQueue []action
actionMu sync.Mutex
actionReady chan struct{} // buffered=1
closed atomic.Bool
}
Todas las mutaciones pasan por la goroutine dispatcher, eliminando condiciones de carrera sin locking complejo.
Acciones
Cuatro tipos de acciones fluyen a través de la cola:
| Acción | Comportamiento |
|---|---|
| Subscribe | Agrega subscriber al mapa, responde en canal done |
| Unsubscribe | Remueve subscriber, responde en canal done |
| Send | Entrega evento a subscribers matcheados |
| Stop | Limpia subscribers, drena cola, sale del loop |
Subscribe y Unsubscribe bloquean hasta que el dispatcher confirma. Send es fire-and-forget.
Intercambio de Cola
El dispatcher usa intercambio de slices para evitar asignaciones en estado estable:
func (b *Bus) processActions() bool {
b.actionMu.Lock()
actions := b.actionQueue
b.actionQueue = b.spareQueue[:0]
b.spareQueue = nil
b.actionMu.Unlock()
for i := range actions {
// procesar acción
}
clear(actions)
b.actionMu.Lock()
b.spareQueue = actions[:0]
b.actionMu.Unlock()
return true
}
Dos slices alternan: uno para procesamiento, uno para nuevas llegadas. El canal actionReady tiene buffer de 1, así que la señalización nunca bloquea y múltiples encolas colapsan en un solo wakeup.
Pattern Matching
Las suscripciones compilan patrones una vez en tiempo de subscribe:
type sub struct {
subID SubscriberID
ctx context.Context
system *wildcard.Wildcard
kind *wildcard.Wildcard
eventCh chan<- Event
}
El paquete wildcard soporta tres tipos de patrón:
| Patrón | Matchea |
|---|---|
registry |
Solo match exacto |
* |
Cualquier segmento único |
** |
Cero o más segmentos |
(a|b) |
Alternación dentro de segmento |
Los patrones se dividen en . así que registry.* matchea registry.create pero no registry.entry.create. El patrón registry.** matchea los tres: registry, registry.create, y registry.entry.create.
Entrega de Eventos
Durante procesamiento de Send, el dispatcher itera subscribers:
for id, s := range b.subscribers {
if s.system != nil && !s.system.Match(a.event.System) {
continue
}
if s.kind != nil && !s.kind.Match(a.event.Kind) {
continue
}
select {
case <-a.ctx.Done():
goto cleanup
case <-s.ctx.Done():
expiredSubs = append(expiredSubs, id)
case s.eventCh <- a.event:
}
}
Si el contexto de un subscriber es cancelado, se marca para remoción durante ese pase de entrega. El contexto del evento también puede cancelar entrega a mitad de iteración.
Bridge de Proceso Lua
El dispatcher de eventos conecta eventos Go a procesos Lua. Se suscribe una vez a todos los eventos ("**") y enruta internamente basado en suscripciones de procesos:
type Dispatcher struct {
bus event.Bus
node relay.Node
subID SubscriberID
eventC chan event.Event
mu sync.RWMutex
subs map[string]*subscription // topic -> subscription
}
Cuando un proceso Lua se suscribe vía events.subscribe(), el dispatcher almacena el patrón y PID destino. Eventos matcheados son empaquetados y enviados vía relay:
func (d *Dispatcher) routeEvent(evt event.Event) {
d.mu.RLock()
defer d.mu.RUnlock()
for _, sub := range d.subs {
if !matchPattern(sub.system, evt.System) {
continue
}
if sub.kind != "" && !matchPattern(sub.kind, evt.Kind) {
continue
}
data := map[string]any{
"system": evt.System,
"kind": evt.Kind,
"path": evt.Path,
}
if evt.Data != nil {
data["data"] = evt.Data
}
pkg := relay.NewPackage(pid.PID{}, sub.pid, sub.topic, payload.New(data))
d.node.Send(pkg)
}
}
Tipos Helper
Subscriber
Envuelve suscripción de canal con callback:
handler, err := eventbus.NewSubscriber(ctx, bus, "registry", "*.created",
func(evt Event) {
// manejar
})
defer handler.Close()
Genera dos goroutines: una lee eventos y llama al handler, otra espera cancelación de contexto para desuscribir.
EventRouter
Gestiona múltiples handlers con ciclo de vida centralizado:
router, err := eventbus.StartRouter(ctx, bus,
WithHandlers(handler1, handler2),
WithLogger(log))
defer router.Stop()
Cada handler implementa Pattern() y Handle(). El router crea un Subscriber para cada uno y cierra todos en Stop.
Awaiter
Espera sincrónica por un evento específico:
awaiter := eventbus.NewAwaiter(bus, "registry", "accept")
waiter, _ := awaiter.Prepare(ctx, "service-id")
defer waiter.Close()
bus.Send(ctx, triggeringEvent)
result := waiter.Wait() // bloquea hasta match o timeout
El patrón Prepare-then-Wait evita condiciones de carrera: suscribirse antes de disparar el evento que produce la respuesta.
Shutdown
Stop()atómicamente establece flag closed y encola acción Stop- Dispatcher limpia mapa de subscribers
- Acciones restantes en cola son drenadas:
- Solicitudes Subscribe obtienen error "bus is closed"
- Solicitudes Unsubscribe completan inmediatamente
- Eventos Send son descartados
- WaitGroup completa
Ver También
- Registry - Productor principal de eventos
- Command Dispatch - Routing proceso-a-handler