WebSocket 릴레이
WebSocket 릴레이 미들웨어는 HTTP 연결을 WebSocket으로 업그레이드하고 대상 프로세스로 메시지를 릴레이합니다.
작동 방식
- HTTP 핸들러가 대상 프로세스 PID와 함께
X-WS-Relay헤더 설정 - 미들웨어가 연결을 WebSocket으로 업그레이드
- 릴레이가 대상 프로세스에 연결하고 모니터링
- 메시지가 클라이언트와 프로세스 간에 양방향으로 흐름
프로세스 시맨틱스
WebSocket 연결은 자체 PID를 가진 완전한 프로세스입니다. 프로세스 시스템과 통합됩니다:
- 주소 지정 가능 → 모든 프로세스가 WebSocket PID로 메시지 전송 가능
- 모니터링 가능 → 프로세스가 종료 이벤트를 위해 WebSocket 연결 모니터링 가능
- 연결 가능 → WebSocket 연결을 다른 프로세스에 연결 가능
- EXIT 이벤트 → 연결이 닫히면 모니터가 종료 알림 수신
-- 다른 프로세스에서 WebSocket 연결 모니터링
process.monitor(websocket_pid)
-- 모든 프로세스에서 WebSocket 클라이언트로 메시지 전송.
-- 릴레이는 이를 {topic, data} JSON으로 래핑합니다; 토픽 이름은 임의입니다.
process.send(websocket_pid, "update", "hello")
연결 전송
제어 메시지를 보내 연결을 다른 프로세스로 전송할 수 있습니다:
process.send(websocket_pid, "ws.control", {
target_pid = new_process_pid,
message_topic = "ws.message"
})
설정
라우터에 매칭 후 미들웨어로 추가:
- name: ws_router
kind: http.router
meta:
server: gateway
prefix: /ws
post_middleware:
- websocket_relay
post_options:
wsrelay.allowed.origins: "https://app.example.com"
| 옵션 | 설명 |
|---|---|
wsrelay.allowed.origins |
쉼표로 구분된 허용 오리진 |
핸들러 설정
HTTP 핸들러가 프로세스를 스폰하고 릴레이를 설정합니다:
local http = require("http")
local json = require("json")
local function handler()
local req = http.request()
local res = http.response()
-- 핸들러 프로세스 스폰
local pid = process.spawn("app.ws:handler", "app:processes")
-- 릴레이 설정
res:header("X-WS-Relay", json.encode({
target_pid = tostring(pid),
message_topic = "ws.message",
heartbeat_interval = "30s",
metadata = {
user_id = req:query("user_id")
}
}))
end
릴레이 설정 필드
| 필드 | 타입 | 기본값 | 설명 |
|---|---|---|---|
target_pid |
string | 필수 | 메시지를 받을 프로세스 PID |
message_topic |
string | ws.message |
클라이언트 메시지의 토픽 |
heartbeat_interval |
duration | - | 하트비트 빈도 (예: 30s) |
metadata |
object | - | 모든 메시지에 첨부 |
메시지 토픽
릴레이가 대상 프로세스에 보내는 메시지:
| 토픽 | 시점 | 페이로드 |
|---|---|---|
ws.join |
클라이언트 연결 시 | JSON {client_pid, metadata} |
ws.message (또는 사용자의 message_topic) |
클라이언트 메시지 전송 시 | 원시 클라이언트 페이로드 (텍스트 프레임 -> string, 바이너리 프레임 -> bytes); 릴레이 패키지의 소스 PID가 클라이언트 PID |
ws.heartbeat |
주기적 (설정된 경우) | JSON {client_pid, uptime, message_count, metadata} |
ws.leave |
클라이언트 연결 해제 시 | JSON {client_pid, metadata} |
메시지 수신
local json = require("json")
local function handler()
local inbox = process.inbox()
while true do
local msg, ok = inbox:receive()
if not ok then break end
local topic = msg:topic()
local from = msg:from() -- 클라이언트 연결 PID
if topic == "ws.join" then
-- 클라이언트 연결됨 -- 페이로드는 {client_pid, metadata}
local data = msg:payload():data()
local client_pid = data.client_pid
elseif topic == "ws.message" then
-- 원시 클라이언트 메시지; from() 은 클라이언트 PID
local body = msg:payload():data() -- string 또는 bytes
handle_message(from, json.decode(body))
elseif topic == "ws.leave" then
-- 클라이언트 연결 해제됨 -- 페이로드는 {client_pid, metadata}
cleanup(from)
end
end
end
클라이언트로 전송
클라이언트 PID를 사용하여 메시지 반환. 선택한 어떤 토픽이든 {topic, data} JSON으로 래핑되어 WebSocket으로 전달됩니다. 프레임 타입은 페이로드 형식에 따라 결정됩니다: 문자열은 텍스트 프레임이 되고, 바이트는 바이너리 프레임이 됩니다 (JSON 래퍼 내부에서 base64 인코딩).
-- 구조화된 메시지 전송 (임의의 토픽 이름)
process.send(client_pid, "update", json.encode({event = "update", value = 42}))
-- 바이너리 전송
process.send(client_pid, "data", binary_content)
-- 연결 종료 (페이로드는 종료 사유 문자열)
process.send(client_pid, "ws.close", "Session ended")
서버 -> 클라이언트의 예약된 토픽은 ws.control (릴레이 재구성) 및 ws.close (연결 종료) 입니다.
브로드캐스팅
여러 클라이언트에 브로드캐스트하기 위해 클라이언트 PID 추적:
local clients = {}
-- join 시
clients[client_pid] = true
-- leave 시
clients[client_pid] = nil
-- 브로드캐스트
local function broadcast(message)
local data = json.encode(message)
for pid, _ in pairs(clients) do
process.send(pid, "broadcast", data)
end
end
참고
- 미들웨어 - 미들웨어 설정
- 프로세스 - 프로세스 메시징
- WebSocket 클라이언트 - 아웃바운드 WebSocket 연결