큐 컨슈머

큐 컨슈머는 워커 풀을 사용하여 큐에서 메시지를 처리합니다.

개요

flowchart LR
    subgraph Consumer[컨슈머]
        QD[큐 드라이버] --> DC[배달 채널
prefetch=10] DC --> WP[워커 풀
concurrency] WP --> FH[함수 핸들러] FH --> AN[Ack/Nack] end

설정

옵션 기본값 최대 설명
queue 필수 - 큐 레지스트리 ID
func 필수 - 핸들러 함수 레지스트리 ID
concurrency 1 1000 워커 수
prefetch 10 10000 메시지 버퍼 크기
auto_ack false - 핸들러 실행 전 자동으로 Ack
driver_options {} - 드라이버별 컨슈머 옵션

엔트리 정의

- name: order_consumer
  kind: queue.consumer
  queue: app:orders
  func: app:process_order
  concurrency: 5
  prefetch: 20
  lifecycle:
    auto_start: true
    depends_on:
      - app:orders

핸들러 함수

핸들러 함수는 메시지 본문을 받습니다:

-- process_order.lua
local json = require("json")

local function handler(body)
    local order = json.decode(body)

    -- 주문 처리
    local result, err = process_order(order)
    if err then
        -- 에러 반환으로 Nack 트리거 (재큐잉)
        return nil, err
    end

    -- 성공은 Ack 트리거
    return result
end

return handler
- name: process_order
  kind: function.lua
  source: file://process_order.lua
  modules:
    - json

확인 응답

결과 액션 효과
성공 Ack 큐에서 메시지 제거
에러 Nack 메시지 재큐잉(드라이버에 따라 다름)

워커 풀

  • 워커는 동시에 고루틴으로 실행
  • 각 워커는 한 번에 하나의 메시지만 처리
  • 메시지는 딜리버리 채널에서 라운드 로빈으로 분배
  • 프리페치 버퍼를 통해 드라이버가 미리 메시지 전달 가능

예제

concurrency: 3
prefetch: 10

흐름:
1. 드라이버가 버퍼에 최대 10개 메시지 전달
2. 3개 워커가 버퍼에서 동시에 가져옴
3. 워커가 완료되면 버퍼 리필
4. 모든 워커가 바쁘고 버퍼가 가득 차면 백프레셔

정상 종료

중지 시:

  1. 새 메시지 수신 중지
  2. 워커 컨텍스트 취소
  3. 처리 중인 메시지 완료 대기(타임아웃 적용)
  4. 워커가 완료되지 않으면 타임아웃 오류 반환

큐 선언

# 큐 드라이버 (개발/테스트용 메모리)
- name: queue_driver
  kind: queue.driver.memory
  lifecycle:
    auto_start: true

# 큐 정의
- name: orders
  kind: queue.queue
  driver: app:queue_driver
  queue_name: orders        # 이름 오버라이드 (기본값: 엔트리 이름)
  codec: json               # 페이로드 코덱 (선택사항)
  dead_letter:              # 데드 레터 처리 (선택사항)
    queue: app:dlq
    max_attempts: 5
  driver_options:
    memory:
      max_length: 10000     # 메모리 드라이버: 제한된 큐 크기
필드 설명
queue_name 큐 이름 오버라이드 (기본값: 엔트리 ID 이름)
codec 페이로드 코덱 이름
dead_letter.queue 데드 레터 큐의 레지스트리 ID
dead_letter.max_attempts DLQ로 라우팅되기 전 최대 전달 시도 횟수
driver_options 드라이버 이름으로 키가 지정된 드라이버별 설정
데드 레터 라우팅은 드라이버에 따라 다릅니다. AMQP는 브로커 레벨의 DLX 설정을 따르며, 메모리 드라이버는 DLQ로 라우팅하지 않습니다.

메모리 드라이버

개발/테스트용 내장 인메모리 큐:

  • Kind: queue.driver.memory
  • 메시지는 메모리에 저장
  • Nack은 메시지를 큐의 끝에 재큐잉
  • 재시작 간 지속성 없음

참고