Queue
Wippy provides a queue system for asynchronous message processing with configurable drivers and consumers.
Architecture
flowchart LR
P[Publisher] --> D[Driver]
D --> Q[Queue]
Q --> C[Consumer]
C --> W[Worker Pool]
W --> F[Function]
- Driver - Backend implementation (memory, AMQP, SQS)
- Queue - Logical queue bound to a driver
- Consumer - Connects queue to handler with concurrency settings
- Worker Pool - Concurrent message processors
Multiple queues can share a driver. Multiple consumers can process from the same queue.
Entry Kinds
| Kind | Description |
|---|---|
queue.driver.memory |
In-memory queue driver |
queue.driver.amqp |
AMQP (RabbitMQ) driver |
queue.driver.sqs |
AWS SQS driver (also LocalStack, ElasticMQ) |
queue.queue |
Queue declaration with driver reference |
queue.consumer |
Consumer that processes messages |
Driver Configuration
Memory Driver
In-process driver for development and single-node deployments. No external dependencies.
- name: memory_driver
kind: queue.driver.memory
lifecycle:
auto_start: true
AMQP Driver
For RabbitMQ and AMQP 0-9-1 compatible brokers.
- name: amqp_driver
kind: queue.driver.amqp
url: "amqp://guest:guest@localhost:5672/"
vhost: "/"
connection_name: "wippy-service"
heartbeat: "10s"
connection_timeout: "30s"
reconnect_delay: "1s"
reconnect_max_delay: "30s"
default_message_ttl: "1h"
default_queue_expiry: "24h"
prefetch_count: 10
lifecycle:
auto_start: true
| Field | Type | Default | Description |
|---|---|---|---|
url |
string | amqp://guest:guest@localhost:5672/ |
Broker URL |
vhost |
string | - | Virtual host override |
connection_name |
string | - | Identifier shown in broker UI |
auth_mechanism |
string | PLAIN |
PLAIN, EXTERNAL (mTLS), or AMQPLAIN |
heartbeat |
duration | - | Keep-alive interval |
connection_timeout |
duration | - | Dial timeout |
reconnect_delay |
duration | 1s |
Initial reconnect backoff |
reconnect_max_delay |
duration | 30s |
Max reconnect backoff |
default_message_ttl |
duration | - | Default message TTL applied to declared queues |
default_queue_ttl |
duration | - | Default TTL applied to declared queues |
default_queue_expiry |
duration | - | Default queue-expiry for declared queues |
prefetch_count |
int | - | Channel-level prefetch ceiling |
frame_size |
int | - | AMQP frame size limit |
channel_max |
int | - | Max channels per connection |
tls |
object | - | TLS settings (see below) |
TLS block:
tls:
enabled: true
server_name: "rabbit.example.com"
cert_env: "AMQP_CLIENT_CERT"
key_env: "AMQP_CLIENT_KEY"
ca_env: "AMQP_CA_CERT"
insecure_skip_verify: false
Inline cert/key/ca fields carry PEM content; *_env variants resolve through the env registry. The two sources are mutually exclusive per field. insecure_skip_verify disables certificate verification (development only).
SQS Driver
For AWS SQS and SQS-compatible endpoints (LocalStack, ElasticMQ). Credentials, region, and other AWS SDK settings come from a shared config.aws resource.
- name: aws_config
kind: config.aws
region: us-east-1
access_key_id_env: app:AWS_ACCESS_KEY_ID
secret_access_key_env: app:AWS_SECRET_ACCESS_KEY
- name: sqs_driver
kind: queue.driver.sqs
config: app:aws_config
endpoint: "http://localhost:9324"
message_retention_period: 345600
default_delay_seconds: 0
lifecycle:
auto_start: true
| Field | Type | Default | Description |
|---|---|---|---|
config |
Registry ID | required | config.aws resource providing region and credentials |
endpoint |
string | - | Custom endpoint URL (LocalStack, ElasticMQ); omit for real AWS |
message_retention_period |
int | 345600 (4d) |
Queue-level retention in seconds (60–1209600) |
default_delay_seconds |
int | 0 |
Default delivery delay applied on CreateQueue (0–900) |
disable_message_checksum_validation |
bool | false |
Disable SQS message checksum checks on send/receive |
use_fips |
bool | false |
Use FIPS-compliant endpoints |
use_dual_stack |
bool | false |
Use dual-stack (IPv4 + IPv6) endpoints |
Queues are auto-created by the driver on first use. Use SQS-prefixed headers (sqs.*) to address SQS-specific attributes on publish; neutral keys like correlation_id and content_type are translated to SQS system attributes where possible.
Queue Configuration
- name: tasks
kind: queue.queue
driver: app.queue:memory_driver
codec: json/plain
queue_name: "app_tasks"
driver_options:
memory:
max_length: 500
dead_letter:
queue: app.queue:tasks_dlq
max_attempts: 5
| Field | Type | Required | Description |
|---|---|---|---|
driver |
Registry ID | Yes | Queue driver |
codec |
string | No | Payload encoding (e.g. json/plain, msgpack/plain) |
queue_name |
string | No | External queue name (defaults to entry name) |
driver_options |
object | No | Per-driver sub-bag, keyed by driver kind |
dead_letter.queue |
Registry ID | No | Queue ID for failed messages |
dead_letter.max_attempts |
int | No | Attempts before routing to DLQ |
Driver Options
Keys under driver_options are scoped by driver name. A driver reads only its own sub-bag — other keys are dormant, which lets a single queue entry declare settings for multiple drivers if needed.
memory:
| Key | Description |
|---|---|
max_length |
Bounded buffer size (0 = unbounded) |
amqp:
| Key | Description |
|---|---|
durable |
Survive broker restart |
auto_delete |
Delete when last consumer detaches |
message_ttl |
Per-queue message TTL override |
queue_expiry |
Unused-queue expiration |
max_length |
Max messages retained |
Consumer Configuration
- name: task_consumer
kind: queue.consumer
queue: app.queue:tasks
func: app.queue:task_handler
concurrency: 4
prefetch: 20
auto_ack: false
driver_options:
amqp:
consumer_tag: "worker-1"
exclusive: false
lifecycle:
auto_start: true
depends_on:
- app.queue:tasks
| Field | Default | Description |
|---|---|---|
queue |
required | Queue registry ID |
func |
required | Handler function registry ID |
concurrency |
1 | Parallel worker count |
prefetch |
10 | Per-worker buffer size |
auto_ack |
false | When true, the runtime does not call broker ack; handler success/failure is the only settle signal |
driver_options |
- | Per-driver sub-bag (same structure as queue) |
amqp consumer options:
| Key | Description |
|---|---|
exclusive |
Single-consumer queue access |
no_local |
Reject messages published on the same connection |
no_wait |
Don't wait for broker confirmation on subscribe |
consumer_tag |
Identifier for this subscription |
Worker Pool
Workers run as concurrent goroutines:
concurrency: 3, prefetch: 10
1. Driver delivers up to 10 messages to buffer
2. 3 workers pull from buffer concurrently
3. As workers finish, buffer refills
4. Backpressure when all workers busy and buffer full
Handler Function
Consumer handlers receive the decoded message body as the first argument. Use queue.message() to access delivery metadata (id, headers).
local queue = require("queue")
local logger = require("logger")
local function main(body)
local msg = queue.message()
logger:info("processing", {
id = msg:id(),
correlation_id = msg:header("correlation_id")
})
local ok, err = process_task(body)
if err then
return false -- nack: redelivery or DLQ
end
return true -- ack: remove from queue
end
return { main = main }
- name: task_handler
kind: function.lua
source: file://task_handler.lua
method: main
modules:
- queue
- logger
Acknowledgment
The runtime auto-settles based on the handler return:
| Handler Result | Action |
|---|---|
true or non-false return |
Ack |
false |
Nack (redeliver or dead-letter per driver) |
| Raised error | Nack |
Call msg:ack() or msg:nack() explicitly only to settle early. Settlement is single-shot: whichever call lands first wins.
Dead-Letter Routing
When dead_letter is configured on the queue, a message that nacks beyond max_attempts is routed to the DLQ with x_dead_letter_reason and x_original_queue headers set by the driver. Publishers must not set any x_* header — those are reserved for DLQ bookkeeping.
Publishing Messages
From Lua code:
local queue = require("queue")
queue.publish("app.queue:tasks", {
id = "task-123",
action = "process",
data = payload
})
See Queue Module for full API.
Graceful Shutdown
On consumer stop:
- Stop accepting new deliveries
- Cancel worker contexts
- Wait for in-flight messages (with timeout)
- Return error if workers don't finish in time
See Also
- Queue Module - Lua API reference
- Queue Consumers Guide - Consumer patterns and worker pools
- Supervision - Consumer lifecycle management