# Queue Consumers _Path: en/guides/queue-consumers_ ## Table of Contents - Queue Consumers ## Content # Queue Consumers Queue consumers process messages from queues using worker pools. ## Overview ```mermaid flowchart LR subgraph Consumer QD[Queue Driver] --> DC[Delivery Channel
prefetch=10] DC --> WP[Worker Pool
concurrency] WP --> FH[Function Handler] FH --> AN[Ack/Nack] end ``` ## Configuration | Option | Default | Max | Description | |--------|---------|-----|-------------| | `queue` | Required | - | Queue registry ID | | `func` | Required | - | Handler function registry ID | | `concurrency` | 1 | 1000 | Worker count | | `prefetch` | 10 | 10000 | Message buffer size | ## Entry Definition ```yaml - name: order_consumer kind: queue.consumer queue: app:orders func: app:process_order concurrency: 5 prefetch: 20 lifecycle: auto_start: true depends_on: - app:orders ``` ## Handler Function The handler function receives the message body: ```lua -- process_order.lua local json = require("json") local function handler(body) local order = json.decode(body) -- Process the order local result, err = process_order(order) if err then -- Return error to trigger Nack (requeue) return nil, err end -- Success triggers Ack return result end return handler ``` ```yaml - name: process_order kind: function.lua source: file://process_order.lua modules: - json ``` ## Acknowledgment | Result | Action | Effect | |--------|--------|--------| | Success | Ack | Message removed from queue | | Error | Nack | Message requeued (driver-dependent) | ## Worker Pool - Workers run as concurrent goroutines - Each worker processes one message at a time - Messages distributed round-robin from delivery channel - Prefetch buffer allows driver to deliver ahead ### Example ``` concurrency: 3 prefetch: 10 Flow: 1. Driver delivers up to 10 messages to buffer 2. 3 workers pull from buffer concurrently 3. As workers finish, buffer refills 4. Backpressure when all workers busy and buffer full ``` ## Graceful Shutdown On stop: 1. Stop accepting new deliveries 2. Cancel worker contexts 3. Wait for in-flight messages (with timeout) 4. Return timeout error if workers don't finish ## Queue Declaration ```yaml # Queue driver (memory for dev/test) - name: queue_driver kind: queue.driver.memory lifecycle: auto_start: true # Queue definition - name: orders kind: queue.queue driver: app:queue_driver options: queue_name: orders # Override name (default: entry name) max_length: 10000 # Maximum queue size durable: true # Survive restarts ``` | Option | Description | |--------|-------------| | `queue_name` | Override queue name (default: entry ID name) | | `max_length` | Maximum queue size | | `durable` | Survive restarts (driver-dependent) | ## Memory Driver Built-in in-memory queue for development/testing: - Kind: `queue.driver.memory` - Messages stored in memory - Nack requeues message to front of queue - No persistence across restarts ## See Also - [Message Queue](lua/storage/queue.md) - Queue module reference - [Queue Configuration](system/queue.md) - Queue drivers and entry definitions - [Supervision Trees](guides/supervision.md) - Consumer lifecycle - [Process Management](lua/core/process.md) - Process spawning and communication ## Navigation Previous: Observability (guides/observability) Next: Supervision (guides/supervision)