Fila
O Wippy fornece um sistema de filas para processamento assíncrono de mensagens com drivers e consumidores configuráveis.
Arquitetura
flowchart LR
P[Publisher] --> D[Driver]
D --> Q[Queue]
Q --> C[Consumer]
C --> W[Worker Pool]
W --> F[Function]
- Driver - Implementação de backend (memória, AMQP, SQS)
- Queue - Fila lógica vinculada a um driver
- Consumer - Conecta fila ao handler com configurações de concorrência
- Worker Pool - Processadores de mensagens concorrentes
Múltiplas filas podem compartilhar um driver. Múltiplos consumidores podem processar da mesma fila.
Tipos de Entradas
| Tipo | Descrição |
|---|---|
queue.driver.memory |
Driver de fila em memória |
queue.driver.amqp |
Driver AMQP (RabbitMQ) |
queue.driver.sqs |
Driver AWS SQS (também LocalStack, ElasticMQ) |
queue.queue |
Declaração de fila com referência ao driver |
queue.consumer |
Consumidor que processa mensagens |
Configuração do Driver
Driver de Memória
Driver in-process para desenvolvimento e implantações de nó único. Sem dependências externas.
- name: memory_driver
kind: queue.driver.memory
lifecycle:
auto_start: true
Driver AMQP
Para RabbitMQ e brokers compatíveis com AMQP 0-9-1.
- name: amqp_driver
kind: queue.driver.amqp
url: "amqp://guest:guest@localhost:5672/"
vhost: "/"
connection_name: "wippy-service"
heartbeat: "10s"
connection_timeout: "30s"
reconnect_delay: "1s"
reconnect_max_delay: "30s"
default_message_ttl: "1h"
default_queue_expiry: "24h"
prefetch_count: 10
lifecycle:
auto_start: true
| Campo | Tipo | Padrão | Descrição |
|---|---|---|---|
url |
string | amqp://guest:guest@localhost:5672/ |
URL do broker |
vhost |
string | - | Override do virtual host |
connection_name |
string | - | Identificador exibido na UI do broker |
auth_mechanism |
string | PLAIN |
PLAIN, EXTERNAL (mTLS), ou AMQPLAIN |
heartbeat |
duration | - | Intervalo de keep-alive |
connection_timeout |
duration | - | Timeout de conexão |
reconnect_delay |
duration | 1s |
Backoff inicial de reconexão |
reconnect_max_delay |
duration | 30s |
Backoff máximo de reconexão |
default_message_ttl |
duration | - | TTL de mensagem padrão aplicado a filas declaradas |
default_queue_ttl |
duration | - | TTL padrão aplicado a filas declaradas |
default_queue_expiry |
duration | - | Expiração de fila padrão para filas declaradas |
prefetch_count |
int | - | Limite de prefetch no nível do canal |
frame_size |
int | - | Limite de tamanho de frame AMQP |
channel_max |
int | - | Máximo de canais por conexão |
tls |
object | - | Configurações TLS (ver abaixo) |
Bloco TLS:
tls:
enabled: true
server_name: "rabbit.example.com"
cert_env: "AMQP_CLIENT_CERT"
key_env: "AMQP_CLIENT_KEY"
ca_env: "AMQP_CA_CERT"
insecure_skip_verify: false
Os campos inline cert/key/ca carregam conteúdo PEM; as variantes *_env são resolvidas através do registro env. As duas fontes são mutuamente exclusivas por campo. insecure_skip_verify desativa a verificação de certificado (apenas desenvolvimento).
Driver SQS
Para AWS SQS e endpoints compatíveis com SQS (LocalStack, ElasticMQ). Credenciais, região e outras configurações do AWS SDK vêm de um recurso config.aws compartilhado.
- name: aws_config
kind: config.aws
region: us-east-1
access_key_id_env: app:AWS_ACCESS_KEY_ID
secret_access_key_env: app:AWS_SECRET_ACCESS_KEY
- name: sqs_driver
kind: queue.driver.sqs
config: app:aws_config
endpoint: "http://localhost:9324"
message_retention_period: 345600
default_delay_seconds: 0
lifecycle:
auto_start: true
| Campo | Tipo | Padrão | Descrição |
|---|---|---|---|
config |
ID do Registro | obrigatório | Recurso config.aws que fornece região e credenciais |
endpoint |
string | - | URL de endpoint personalizado (LocalStack, ElasticMQ); omita para AWS real |
message_retention_period |
int | 345600 (4d) |
Retenção no nível da fila em segundos (60–1209600) |
default_delay_seconds |
int | 0 |
Atraso de entrega padrão aplicado em CreateQueue (0–900) |
disable_message_checksum_validation |
bool | false |
Desativa verificações de checksum de mensagens SQS no envio/recebimento |
use_fips |
bool | false |
Usa endpoints compatíveis com FIPS |
use_dual_stack |
bool | false |
Usa endpoints dual-stack (IPv4 + IPv6) |
As filas são criadas automaticamente pelo driver no primeiro uso. Use headers com prefixo SQS (sqs.*) para endereçar atributos específicos do SQS na publicação; chaves neutras como correlation_id e content_type são traduzidas para atributos do sistema SQS quando possível.
Configuração de Fila
- name: tasks
kind: queue.queue
driver: app.queue:memory_driver
codec: json/plain
queue_name: "app_tasks"
driver_options:
memory:
max_length: 500
dead_letter:
queue: app.queue:tasks_dlq
max_attempts: 5
| Campo | Tipo | Obrigatório | Descrição |
|---|---|---|---|
driver |
ID do Registro | Sim | Driver de fila |
codec |
string | Não | Codificação do payload (e.g. json/plain, msgpack/plain) |
queue_name |
string | Não | Nome externo da fila (padrão é o nome da entrada) |
driver_options |
object | Não | Sub-bag por driver, indexado pelo kind do driver |
dead_letter.queue |
ID do Registro | Não | ID da fila para mensagens com falha |
dead_letter.max_attempts |
int | Não | Tentativas antes de rotear para a DLQ |
Opções do Driver
As chaves sob driver_options são agrupadas por nome do driver. Um driver lê apenas seu próprio sub-bag — as outras chaves ficam inativas, o que permite que uma única entrada de fila declare configurações para múltiplos drivers se necessário.
memory:
| Chave | Descrição |
|---|---|
max_length |
Tamanho de buffer limitado (0 = ilimitado) |
amqp:
| Chave | Descrição |
|---|---|
durable |
Sobrevive ao reinício do broker |
auto_delete |
Excluído quando o último consumidor se desconecta |
message_ttl |
Override de TTL de mensagem por fila |
queue_expiry |
Expiração de fila não utilizada |
max_length |
Máximo de mensagens retidas |
Configuração do Consumidor
- name: task_consumer
kind: queue.consumer
queue: app.queue:tasks
func: app.queue:task_handler
concurrency: 4
prefetch: 20
auto_ack: false
driver_options:
amqp:
consumer_tag: "worker-1"
exclusive: false
lifecycle:
auto_start: true
depends_on:
- app.queue:tasks
| Campo | Padrão | Descrição |
|---|---|---|
queue |
obrigatório | ID do registro da fila |
func |
obrigatório | ID do registro da função handler |
concurrency |
1 | Contagem de workers paralelos |
prefetch |
10 | Tamanho do buffer por worker |
auto_ack |
false | Quando true, o runtime não chama ack do broker; sucesso/falha do handler é o único sinal de settle |
driver_options |
- | Sub-bag por driver (mesma estrutura da fila) |
Opções de consumidor amqp:
| Chave | Descrição |
|---|---|
exclusive |
Acesso à fila de consumidor único |
no_local |
Rejeita mensagens publicadas na mesma conexão |
no_wait |
Não espera confirmação do broker ao se inscrever |
consumer_tag |
Identificador para esta inscrição |
Pool de Workers
Workers executam como goroutines concorrentes:
concurrency: 3, prefetch: 10
1. Driver entrega até 10 mensagens para o buffer
2. 3 workers pegam do buffer concorrentemente
3. Conforme workers terminam, buffer reabastece
4. Contrapressão quando todos workers ocupados e buffer cheio
Função Handler
Handlers de consumidor recebem o corpo da mensagem decodificado como primeiro argumento. Use queue.message() para acessar metadados de entrega (id, headers).
local queue = require("queue")
local logger = require("logger")
local function main(body)
local msg = queue.message()
logger:info("processing", {
id = msg:id(),
correlation_id = msg:header("correlation_id")
})
local ok, err = process_task(body)
if err then
return false -- nack: redelivery or DLQ
end
return true -- ack: remove from queue
end
return { main = main }
- name: task_handler
kind: function.lua
source: file://task_handler.lua
method: main
modules:
- queue
- logger
Reconhecimento
O runtime faz settle automaticamente baseado no retorno do handler:
| Resultado do Handler | Ação |
|---|---|
true ou retorno não-false |
Ack |
false |
Nack (redelivery ou dead-letter conforme o driver) |
| Erro lançado | Nack |
Chame msg:ack() ou msg:nack() explicitamente apenas para fazer settle antecipadamente. O settlement é de disparo único: vence a primeira chamada que chega.
Roteamento Dead-Letter
Quando dead_letter está configurado na fila, uma mensagem que é nack além de max_attempts é roteada para a DLQ com os headers x_dead_letter_reason e x_original_queue definidos pelo driver. Publicadores não devem definir nenhum header x_* — estes são reservados para registro da DLQ.
Publicando Mensagens
A partir de código Lua:
local queue = require("queue")
queue.publish("app.queue:tasks", {
id = "task-123",
action = "process",
data = payload
})
Veja Módulo Queue para API completa.
Encerramento Gracioso
Ao parar consumidor:
- Para de aceitar novas entregas
- Cancela contextos de workers
- Aguarda mensagens em voo (com timeout)
- Retorna erro se workers não terminarem a tempo
Veja Também
- Módulo Queue - Referência da API Lua
- Guia de Consumidores de Filas - Padrões de consumidor e pools de workers
- Supervisão - Gerenciamento de ciclo de vida do consumidor