Consumidores de Filas
Consumidores de filas processam mensagens de filas usando pools de workers.
Visão Geral
flowchart LR
subgraph Consumer[Consumidor]
QD[Driver de Fila] --> DC[Canal de Entrega
prefetch=10]
DC --> WP[Pool de Workers
concurrency]
WP --> FH[Handler de Função]
FH --> AN[Ack/Nack]
end
Configuração
| Opção | Padrão | Max | Descrição |
|---|---|---|---|
queue |
Obrigatório | - | ID do registro da fila |
func |
Obrigatório | - | ID do registro da função handler |
concurrency |
1 | 1000 | Quantidade de workers |
prefetch |
10 | 10000 | Tamanho do buffer de mensagens |
Definição de Entrada
- name: order_consumer
kind: queue.consumer
queue: app:orders
func: app:process_order
concurrency: 5
prefetch: 20
lifecycle:
auto_start: true
depends_on:
- app:orders
Função Handler
A função handler recebe o corpo da mensagem:
-- process_order.lua
local json = require("json")
local function handler(body)
local order = json.decode(body)
-- Processa o pedido
local result, err = process_order(order)
if err then
-- Retorna erro para disparar Nack (reenfileirar)
return nil, err
end
-- Sucesso dispara Ack
return result
end
return handler
- name: process_order
kind: function.lua
source: file://process_order.lua
modules:
- json
Reconhecimento
| Resultado | Ação | Efeito |
|---|---|---|
| Sucesso | Ack | Mensagem removida da fila |
| Erro | Nack | Mensagem reenfileirada (dependente do driver) |
Pool de Workers
- Workers executam como goroutines concorrentes
- Cada worker processa uma mensagem por vez
- Mensagens distribuídas round-robin do canal de entrega
- Buffer de prefetch permite driver entregar antecipadamente
Exemplo
concurrency: 3
prefetch: 10
Fluxo:
1. Driver entrega até 10 mensagens para o buffer
2. 3 workers pegam do buffer concorrentemente
3. Conforme workers terminam, buffer reabastece
4. Contrapressão quando todos workers ocupados e buffer cheio
Encerramento Gracioso
Ao parar:
- Para de aceitar novas entregas
- Cancela contextos de workers
- Aguarda mensagens em voo (com timeout)
- Retorna erro de timeout se workers não terminarem
Declaração de Fila
# Driver de fila (memória para dev/teste)
- name: queue_driver
kind: queue.driver.memory
lifecycle:
auto_start: true
# Definição de fila
- name: orders
kind: queue.queue
driver: app:queue_driver
options:
queue_name: orders # Sobrescreve nome (padrão: nome da entrada)
max_length: 10000 # Tamanho máximo da fila
durable: true # Sobrevive a reinicializações
| Opção | Descrição |
|---|---|
queue_name |
Sobrescreve nome da fila (padrão: nome do ID da entrada) |
max_length |
Tamanho máximo da fila |
durable |
Sobrevive a reinicializações (dependente do driver) |
Driver de Memória
Fila em memória embutida para desenvolvimento/testes:
- Tipo:
queue.driver.memory - Mensagens armazenadas em memória
- Nack reenfileira mensagem no início da fila
- Sem persistência entre reinicializações
Veja Também
- Fila de Mensagens - Referência do módulo de filas
- Configuração de Filas - Drivers de fila e definições de entrada
- Árvores de Supervisão - Ciclo de vida do consumidor
- Gerenciamento de Processos - Criação e comunicação de processos