WebSocket Relay
WebSocket relay 中间件将 HTTP 连接升级为 WebSocket 并将消息中继到目标进程。
工作原理
- HTTP 处理器设置
X-WS-Relay头,包含目标进程 PID - 中间件将连接升级为 WebSocket
- Relay 附加到目标进程并监控它
- 消息在客户端和进程之间双向流动
进程语义
WebSocket 连接是完整的进程,拥有自己的 PID。它们与进程系统集成:
- 可寻址 - 任何进程都可以向 WebSocket PID 发送消息
- 可监控 - 进程可以监控 WebSocket 连接的退出事件
- 可链接 - WebSocket 连接可以链接到其他进程
- EXIT 事件 - 连接关闭时,监控者收到退出通知
-- 从另一个进程监控 WebSocket 连接
process.monitor(websocket_pid)
-- 从任何进程向 WebSocket 客户端发送消息。
-- Relay 将其包装为 {topic, data} JSON; topic 名称是任意的。
process.send(websocket_pid, "update", "hello")
连接转移
可以通过发送控制消息将连接转移到另一个进程:
process.send(websocket_pid, "ws.control", {
target_pid = new_process_pid,
message_topic = "ws.message"
})
配置
在路由器上添加为匹配后中间件:
- name: ws_router
kind: http.router
meta:
server: gateway
prefix: /ws
post_middleware:
- websocket_relay
post_options:
wsrelay.allowed.origins: "https://app.example.com"
| 选项 | 说明 |
|---|---|
wsrelay.allowed.origins |
允许的来源,逗号分隔 |
处理器设置
HTTP 处理器生成进程并配置 relay:
local http = require("http")
local json = require("json")
local function handler()
local req = http.request()
local res = http.response()
-- 生成处理进程
local pid = process.spawn("app.ws:handler", "app:processes")
-- 配置 relay
res:header("X-WS-Relay", json.encode({
target_pid = tostring(pid),
message_topic = "ws.message",
heartbeat_interval = "30s",
metadata = {
user_id = req:query("user_id")
}
}))
end
Relay 配置字段
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
target_pid |
string | 必填 | 接收消息的进程 PID |
message_topic |
string | ws.message |
客户端消息的主题 |
heartbeat_interval |
duration | - | 心跳频率 (如 30s) |
metadata |
object | - | 附加到所有消息 |
消息主题
Relay 向目标进程发送以下消息:
| 主题 | 时机 | 负载 |
|---|---|---|
ws.join |
客户端连接 | JSON {client_pid, metadata} |
ws.message (或您的 message_topic) |
客户端发送消息 | 原始客户端负载 (text 帧 -> string, binary 帧 -> bytes); relay 包的源 PID 即客户端 PID |
ws.heartbeat |
定期 (如果配置) | JSON {client_pid, uptime, message_count, metadata} |
ws.leave |
客户端断开 | JSON {client_pid, metadata} |
接收消息
local json = require("json")
local function handler()
local inbox = process.inbox()
while true do
local msg, ok = inbox:receive()
if not ok then break end
local topic = msg:topic()
local from = msg:from() -- 客户端连接 PID
if topic == "ws.join" then
-- 客户端已连接 -- 负载是 {client_pid, metadata}
local data = msg:payload():data()
local client_pid = data.client_pid
elseif topic == "ws.message" then
-- 原始客户端消息; from() 是客户端 PID
local body = msg:payload():data() -- string 或 bytes
handle_message(from, json.decode(body))
elseif topic == "ws.leave" then
-- 客户端已断开 -- 负载是 {client_pid, metadata}
cleanup(from)
end
end
end
发送到客户端
使用客户端 PID 发送回复消息。您选择的任何 topic 都会被包装为 {topic, data} JSON 并转发到 WebSocket。帧类型由负载格式决定: 字符串成为 text 帧, bytes 成为 binary 帧 (在 JSON 包装内 base64 编码)。
-- 发送结构化消息 (任意 topic 名称)
process.send(client_pid, "update", json.encode({event = "update", value = 42}))
-- 发送二进制
process.send(client_pid, "data", binary_content)
-- 关闭连接 (负载为关闭原因字符串)
process.send(client_pid, "ws.close", "Session ended")
服务器到客户端的保留 topic 是 ws.control (relay 重新配置) 和 ws.close (关闭连接)。
广播
跟踪客户端 PID 以广播到多个客户端:
local clients = {}
-- 加入时
clients[client_pid] = true
-- 离开时
clients[client_pid] = nil
-- 广播
local function broadcast(message)
local data = json.encode(message)
for pid, _ in pairs(clients) do
process.send(pid, "broadcast", data)
end
end
参见
- 中间件 - 中间件配置
- 进程 - 进程消息传递
- WebSocket 客户端 - 出站 WebSocket 连接