큐
Wippy는 설정 가능한 드라이버와 컨슈머가 있는 비동기 메시지 처리를 위한 큐 시스템을 제공합니다.
아키텍처
flowchart LR
P[Publisher] --> D[Driver]
D --> Q[Queue]
Q --> C[Consumer]
C --> W[Worker Pool]
W --> F[Function]
- 드라이버 - 백엔드 구현 (memory, AMQP, Redis)
- 큐 - 드라이버에 바인딩된 논리적 큐
- 컨슈머 - 동시성 설정으로 큐를 핸들러에 연결
- 워커 풀 - 동시 메시지 프로세서
여러 큐가 드라이버를 공유할 수 있습니다. 여러 컨슈머가 같은 큐에서 처리할 수 있습니다.
엔트리 종류
| Kind | 설명 |
|---|---|
queue.driver.memory |
인메모리 큐 드라이버 |
queue.queue |
드라이버 참조가 있는 큐 선언 |
queue.consumer |
메시지를 처리하는 컨슈머 |
드라이버 설정
메모리 드라이버
개발 및 테스트용 인메모리 드라이버.
- name: memory_driver
kind: queue.driver.memory
lifecycle:
auto_start: true
큐 설정
- name: tasks
kind: queue.queue
driver: app.queue:memory_driver
| 필드 | 타입 | 필수 | 설명 |
|---|---|---|---|
driver |
레지스트리 ID | 예 | 큐 드라이버 참조 |
options |
Map | 아니오 | 드라이버별 옵션 |
컨슈머 설정
- name: task_consumer
kind: queue.consumer
queue: app.queue:tasks
func: app.queue:task_handler
concurrency: 4
prefetch: 20
lifecycle:
auto_start: true
depends_on:
- app.queue:tasks
| 필드 | 기본값 | 최대 | 설명 |
|---|---|---|---|
queue |
필수 | - | 큐 레지스트리 ID |
func |
필수 | - | 핸들러 함수 레지스트리 ID |
concurrency |
1 | 1000 | 병렬 워커 수 |
prefetch |
10 | 10000 | 메시지 버퍼 크기 |
워커 풀
워커는 동시 고루틴으로 실행됩니다:
concurrency: 3, prefetch: 10
1. 드라이버가 버퍼에 최대 10개 메시지 전달
2. 3개 워커가 버퍼에서 동시에 가져옴
3. 워커가 완료되면 버퍼 리필
4. 모든 워커가 바쁘고 버퍼가 가득 차면 백프레셔
핸들러 함수
컨슈머 함수는 메시지 데이터를 받고 성공 또는 에러를 반환합니다:
local json = require("json")
local logger = require("logger")
local function handler(body)
local data = json.decode(body)
logger.info("Processing", {task_id = data.id})
local result, err = process_task(data)
if err then
return nil, err -- Nack: 메시지 재큐잉
end
return result -- Ack: 큐에서 제거
end
return handler
- name: task_handler
kind: function.lua
source: file://task_handler.lua
modules:
- json
- logger
확인 응답
| 핸들러 결과 | 액션 | 효과 |
|---|---|---|
| 반환 값 | Ack | 큐에서 메시지 제거 |
| 반환 에러 | Nack | 메시지 재큐잉(드라이버에 따라 다름) |
메시지 발행
Lua 코드에서:
local queue = require("queue")
queue.publish("app.queue:tasks", {
id = "task-123",
action = "process",
data = payload
})
전체 API는 Queue 모듈을 참조하세요.
정상 종료
컨슈머 중지 시:
- 새 메시지 수신 중지
- 워커 컨텍스트 취소
- 처리 중인 메시지 완료 대기(타임아웃 적용)
- 워커가 제시간에 완료되지 않으면 오류 반환