큐 컨슈머

큐 컨슈머는 워커 풀을 사용하여 큐에서 메시지를 처리합니다.

개요

flowchart LR
    subgraph Consumer[컨슈머]
        QD[큐 드라이버] --> DC[배달 채널
prefetch=10] DC --> WP[워커 풀
concurrency] WP --> FH[함수 핸들러] FH --> AN[Ack/Nack] end

설정

옵션 기본값 최대 설명
queue 필수 - 큐 레지스트리 ID
func 필수 - 핸들러 함수 레지스트리 ID
concurrency 1 1000 워커 수
prefetch 10 10000 메시지 버퍼 크기

엔트리 정의

- name: order_consumer
  kind: queue.consumer
  queue: app:orders
  func: app:process_order
  concurrency: 5
  prefetch: 20
  lifecycle:
    auto_start: true
    depends_on:
      - app:orders

핸들러 함수

핸들러 함수는 메시지 본문을 받습니다:

-- process_order.lua
local json = require("json")

local function handler(body)
    local order = json.decode(body)

    -- 주문 처리
    local result, err = process_order(order)
    if err then
        -- 에러 반환으로 Nack 트리거 (재큐잉)
        return nil, err
    end

    -- 성공은 Ack 트리거
    return result
end

return handler
- name: process_order
  kind: function.lua
  source: file://process_order.lua
  modules:
    - json

확인 응답

결과 액션 효과
성공 Ack 큐에서 메시지 제거
에러 Nack 메시지 재큐잉(드라이버에 따라 다름)

워커 풀

  • 워커는 동시에 고루틴으로 실행
  • 각 워커는 한 번에 하나의 메시지만 처리
  • 메시지는 딜리버리 채널에서 라운드 로빈으로 분배
  • 프리페치 버퍼를 통해 드라이버가 미리 메시지 전달 가능

예제

concurrency: 3
prefetch: 10

흐름:
1. 드라이버가 버퍼에 최대 10개 메시지 전달
2. 3개 워커가 버퍼에서 동시에 가져옴
3. 워커가 완료되면 버퍼 리필
4. 모든 워커가 바쁘고 버퍼가 가득 차면 백프레셔

정상 종료

중지 시:

  1. 새 메시지 수신 중지
  2. 워커 컨텍스트 취소
  3. 처리 중인 메시지 완료 대기(타임아웃 적용)
  4. 워커가 완료되지 않으면 타임아웃 오류 반환

큐 선언

# 큐 드라이버 (개발/테스트용 메모리)
- name: queue_driver
  kind: queue.driver.memory
  lifecycle:
    auto_start: true

# 큐 정의
- name: orders
  kind: queue.queue
  driver: app:queue_driver
  options:
    queue_name: orders      # 이름 오버라이드 (기본값: 엔트리 이름)
    max_length: 10000       # 최대 큐 크기
    durable: true           # 재시작 시 유지
옵션 설명
queue_name 큐 이름 오버라이드 (기본값: 엔트리 ID 이름)
max_length 최대 큐 크기
durable 재시작 시 유지(드라이버에 따라 다름)

메모리 드라이버

개발/테스트용 내장 인메모리 큐:

  • Kind: queue.driver.memory
  • 메시지는 메모리에 저장
  • Nack은 메시지를 큐 앞으로 재큐잉
  • 재시작 간 지속성 없음

참고