Server-Sent Events
Die SSE-Middleware streamt Ereignisse vom Server an HTTP-Clients über das Server-Sent-Events-Protokoll.
Zwei Mechanismen stehen zur Verfügung: direktes Streaming aus einem HTTP-Handler und prozessgestützter Relay über die sse_relay-Middleware.
Direktes Streaming
Verwende res:write_event(), um SSE-Ereignisse direkt aus einem HTTP-Handler zu senden. Die Antwort wechselt beim ersten Aufruf automatisch in den SSE-Modus und setzt die passenden Header.
local http = require("http")
local function handler()
local res = http.response()
res:write_event({name = "status", data = {state = "started"}})
res:write_event({name = "progress", data = {percent = 50}})
res:write_event({name = "status", data = {state = "complete"}})
end
Jedes Ereignis benötigt ein name- und ein data-Feld. Der data-Wert wird automatisch als JSON kodiert.
SSE-Relay
Die SSE-Relay-Middleware erstellt langlebige SSE-Streams, die durch Prozesse gestützt werden. Sie folgt demselben Relay-Muster wie WebSocket-Relay.
Funktionsweise
- Der HTTP-Handler setzt den
X-SSE-Relay-Header mit einer JSON-Relay-Konfiguration - Die Middleware fängt die Antwort ab und erstellt eine SSE-Sitzung
- Die Sitzung registriert sich als Prozess mit eigener PID
- An die Sitzungs-PID gesendete Nachrichten werden als SSE-Ereignisse an den Client weitergeleitet
Prozesssemantik
SSE-Streams sind vollwertige Prozesse mit eigener PID. Sie integrieren sich in das Prozesssystem:
- Adressierbar — Jeder Prozess kann Nachrichten an eine Stream-PID senden
- Überwachbar — Prozesse können SSE-Streams auf Exit-Ereignisse überwachen
- Verlinkbar — SSE-Streams können mit anderen Prozessen verlinkt werden
- EXIT-Ereignisse — Wenn ein Stream geschlossen wird, erhalten Monitore Exit-Benachrichtigungen
-- Ereignis aus jedem Prozess an SSE-Client senden
process.send(stream_pid, "sse.message", {event = "update", value = 42})
-- Einen SSE-Stream überwachen
process.monitor(stream_pid)
done-Ereignis.
Konfiguration
Als Post-Match-Middleware auf einem Router hinzufügen:
- name: sse_router
kind: http.router
meta:
server: gateway
prefix: /sse
post_middleware:
- sse_relay
post_options:
sserelay.allowed.origins: "https://app.example.com"
| Option | Beschreibung |
|---|---|
sserelay.allowed.origins |
Kommagetrennte erlaubte Origins (unterstützt Wildcards) |
Handler-Setup
Der HTTP-Handler erzeugt einen Prozess und konfiguriert den Relay:
local http = require("http")
local json = require("json")
local function handler()
local res = http.response()
-- Handler-Prozess erzeugen
local pid = process.spawn("app.sse:handler", "app:processes")
-- Relay konfigurieren
res:set_header("X-SSE-Relay", json.encode({
target_pid = tostring(pid),
message_topic = "sse.message",
heartbeat_interval = "30s",
metadata = {
user_id = http.request():query("user_id")
}
}))
end
Felder der Relay-Konfiguration
| Feld | Typ | Standard | Beschreibung |
|---|---|---|---|
target_pid |
string | — | Prozess-PID, die Nachrichten empfangen soll (für Detached-Modus weglassen) |
message_topic |
string | sse.message |
Topic-Filter für weitergeleitete Ereignisse |
heartbeat_interval |
duration | 30s |
Heartbeat-Frequenz (z. B. 30s, 1m) |
idle_timeout |
duration | — | Stream nach Inaktivität schließen |
hard_timeout |
duration | — | Stream nach absoluter Dauer schließen |
metadata |
object | — | An Join/Leave/Heartbeat-Nachrichten angehängt |
Managed- vs. Detached-Modus
Managed-Modus
Wenn target_pid gesetzt ist, läuft der Relay im Managed-Modus:
- Überwacht den Zielprozess
- Sendet
sse.joinbeim Verbinden undsse.leavebeim Trennen - Schließt den Stream automatisch, wenn das Ziel beendet wird
Detached-Modus
Wenn target_pid weggelassen wird, startet der Relay im Detached-Modus:
- Sendet ein
ready-Ereignis an den Client mitstream_pidundmessage_topic - Es wird zunächst kein Prozess überwacht
- Ein Prozess kann sich später durch Senden einer
sse.control-Nachricht anhängen
-- Detached-Setup: kein target_pid
res:set_header("X-SSE-Relay", json.encode({
heartbeat_interval = "30s"
}))
Der Client erhält ein ready-Ereignis:
{"stream_pid": "sse@node/abc123", "message_topic": "sse.message"}
Nachrichten-Topics
Der Relay verwendet diese Topics für die Kommunikation zwischen Stream und Zielprozess:
| Topic | Richtung | Wann | Payload |
|---|---|---|---|
sse.join |
Stream → Ziel | Client verbindet sich | client_pid, metadata |
sse.message |
Ziel → Stream | Standard-Ereignis-Topic | Wird als SSE-Ereignis weitergeleitet |
sse.heartbeat |
Stream → Ziel | Periodisch (falls konfiguriert) | client_pid, uptime, message_count |
sse.leave |
Stream → Ziel | Client trennt Verbindung | client_pid, metadata |
sse.control |
beliebig → Stream | Steuerbefehl | Felder der Relay-Konfiguration |
sse.close |
beliebig → Stream | Erzwungenes Schließen | Optionaler Grund-String |
Empfang im Zielprozess
local json = require("json")
local function handler()
local inbox = process.inbox()
while true do
local msg, ok = inbox:receive()
if not ok then break end
local topic = msg:topic()
local data = msg:payload():data()
if topic == "sse.join" then
local client_pid = data.client_pid
elseif topic == "sse.heartbeat" then
-- Periodische Statusprüfung
elseif topic == "sse.leave" then
cleanup(data.client_pid)
end
end
end
Ereignisse senden
Sende Ereignisse an den Client, indem du Nachrichten an die Stream-PID schickst:
-- Auf dem Standard-Nachrichten-Topic senden
process.send(stream_pid, "sse.message", {
event = "update",
value = 42
})
-- Stream erzwungen schließen
process.send(stream_pid, "sse.close", "session expired")
Ereignisse, die auf dem konfigurierten message_topic gesendet werden, werden als SSE-Ereignisse an den Client weitergeleitet. Der Topic-Name wird zum SSE-Ereignisnamen.
Verbindungsübergabe
Sende eine Steuernachricht, um den Zielprozess, den Topic-Filter oder die Timeouts dynamisch zu ändern:
process.send(stream_pid, "sse.control", {
target_pid = tostring(new_pid),
message_topic = "custom.topic",
idle_timeout = "5m"
})
Wenn sich das Ziel ändert, sendet der Relay sse.leave an das alte Ziel und sse.join an das neue. Setze target_pid auf einen leeren String, um abzukoppeln, ohne erneut anzukoppeln.
Siehe auch
- Middleware — Middleware-Konfiguration
- WebSocket-Relay — WebSocket-Äquivalent
- Process — Prozess-Messaging