Server-Sent Events
The SSE middleware streams events from the server to HTTP clients using the Server-Sent Events protocol.
Two mechanisms are available: direct streaming from an HTTP handler, and process-backed relay via the sse_relay middleware.
Direct Streaming
Use res:write_event() to send SSE events directly from an HTTP handler. The response automatically switches to SSE mode on the first call, setting appropriate headers.
local http = require("http")
local function handler()
local res = http.response()
res:write_event({name = "status", data = {state = "started"}})
res:write_event({name = "progress", data = {percent = 50}})
res:write_event({name = "status", data = {state = "complete"}})
end
Each event requires a name and data field. The data value is JSON-encoded automatically.
SSE Relay
The SSE Relay middleware creates long-lived SSE streams backed by processes. It follows the same relay pattern as WebSocket Relay.
How It Works
- HTTP handler sets
X-SSE-Relayheader with a JSON relay configuration - Middleware intercepts the response and creates an SSE session
- Session registers as a process with its own PID
- Messages sent to the session PID are forwarded as SSE events to the client
Process Semantics
SSE streams are full processes with their own PID. They integrate with the process system:
- Addressable — Any process can send messages to a stream PID
- Monitorable — Processes can monitor SSE streams for exit events
- Linkable — SSE streams can be linked to other processes
- EXIT events — When a stream closes, monitors receive exit notifications
-- Send event to SSE client from any process
process.send(stream_pid, "sse.message", {event = "update", value = 42})
-- Monitor an SSE stream
process.monitor(stream_pid)
Configuration
Add as post-match middleware on a router:
- name: sse_router
kind: http.router
meta:
server: gateway
prefix: /sse
post_middleware:
- sse_relay
post_options:
sserelay.allowed.origins: "https://app.example.com"
| Option | Description |
|---|---|
sserelay.allowed.origins |
Comma-separated allowed origins (supports wildcards) |
Handler Setup
The HTTP handler spawns a process and configures the relay:
local http = require("http")
local json = require("json")
local function handler()
local res = http.response()
-- Spawn handler process
local pid = process.spawn("app.sse:handler", "app:processes")
-- Configure relay
res:set_header("X-SSE-Relay", json.encode({
target_pid = tostring(pid),
message_topic = "sse.message",
heartbeat_interval = "30s",
metadata = {
user_id = http.request():query("user_id")
}
}))
end
Relay Config Fields
| Field | Type | Default | Description |
|---|---|---|---|
target_pid |
string | — | Process PID to receive messages (omit for detached mode) |
message_topic |
string | sse.message |
Topic filter for forwarded events |
heartbeat_interval |
duration | 30s |
Heartbeat frequency (e.g. 30s, 1m) |
idle_timeout |
duration | — | Close stream after inactivity |
hard_timeout |
duration | — | Close stream after absolute duration |
metadata |
object | — | Attached to join/leave/heartbeat messages |
Managed vs Detached Mode
Managed Mode
When target_pid is set, the relay operates in managed mode:
- Monitors the target process
- Sends
sse.joinon connect andsse.leaveon disconnect - Closes the stream automatically if the target exits
Detached Mode
When target_pid is omitted, the relay starts in detached mode:
- Emits a
readyevent to the client withstream_pidandmessage_topic - No process is monitored initially
- A process can attach later by sending an
sse.controlmessage
-- Detached setup: no target_pid
res:set_header("X-SSE-Relay", json.encode({
heartbeat_interval = "30s"
}))
The client receives a ready event:
{"stream_pid": "sse@node/abc123", "message_topic": "sse.message"}
Message Topics
The relay uses these topics for communication between the stream and target process:
| Topic | Direction | When | Payload |
|---|---|---|---|
sse.join |
stream → target | Client connects | client_pid, metadata |
sse.message |
target → stream | Default event topic | Forwarded as SSE event |
sse.heartbeat |
stream → target | Periodic (if configured) | client_pid, uptime, message_count |
sse.leave |
stream → target | Client disconnects | client_pid, metadata |
sse.control |
any → stream | Control command | Relay config fields |
sse.close |
any → stream | Force close | Optional reason string |
Receiving in Target Process
local json = require("json")
local function handler()
local inbox = process.inbox()
while true do
local msg, ok = inbox:receive()
if not ok then break end
local topic = msg:topic()
local data = msg:payload():data()
if topic == "sse.join" then
local client_pid = data.client_pid
elseif topic == "sse.heartbeat" then
-- Periodic health check
elseif topic == "sse.leave" then
cleanup(data.client_pid)
end
end
end
Sending Events
Send events to the client by messaging the stream PID:
-- Send on the default message topic
process.send(stream_pid, "sse.message", {
event = "update",
value = 42
})
-- Force close the stream
process.send(stream_pid, "sse.close", "session expired")
Events sent on the configured message_topic are forwarded to the client as SSE events. The topic name becomes the SSE event name.
Connection Transfer
Send a control message to change the target process, topic filter, or timeouts dynamically:
process.send(stream_pid, "sse.control", {
target_pid = tostring(new_pid),
message_topic = "custom.topic",
idle_timeout = "5m"
})
When the target changes, the relay sends sse.leave to the old target and sse.join to the new one. Set target_pid to an empty string to detach without reattaching.
See Also
- Middleware — Middleware configuration
- WebSocket Relay — WebSocket equivalent
- Process — Process messaging