Queue-Konsumenten
Queue-Konsumenten verarbeiten Nachrichten aus Queues mittels Worker-Pools.
Übersicht
flowchart LR
subgraph Consumer[Konsument]
QD[Queue-Treiber] --> DC[Zustellkanal
prefetch=10]
DC --> WP[Worker-Pool
concurrency]
WP --> FH[Funktions-Handler]
FH --> AN[Ack/Nack]
end
Konfiguration
| Option | Standard | Max | Beschreibung |
|---|---|---|---|
queue |
Erforderlich | - | Queue-Registry-ID |
func |
Erforderlich | - | Handler-Funktions-Registry-ID |
concurrency |
1 | 1000 | Worker-Anzahl |
prefetch |
10 | 10000 | Nachrichtenpuffer-Größe |
auto_ack |
false | - | Automatisch bestätigen, bevor der Handler ausgeführt wird |
driver_options |
{} |
- | Treiberspezifische Consumer-Optionen |
Entry-Definition
- name: order_consumer
kind: queue.consumer
queue: app:orders
func: app:process_order
concurrency: 5
prefetch: 20
lifecycle:
auto_start: true
depends_on:
- app:orders
Handler-Funktion
Die Handler-Funktion empfängt den Nachrichteninhalt:
-- process_order.lua
local json = require("json")
local function handler(body)
local order = json.decode(body)
-- Bestellung verarbeiten
local result, err = process_order(order)
if err then
-- Fehler zurückgeben löst Nack aus (Requeue)
return nil, err
end
-- Erfolg löst Ack aus
return result
end
return handler
- name: process_order
kind: function.lua
source: file://process_order.lua
modules:
- json
Bestätigung
| Ergebnis | Aktion | Effekt |
|---|---|---|
| Erfolg | Ack | Nachricht aus Queue entfernt |
| Fehler | Nack | Nachricht erneut eingereiht (treiberabhängig) |
Worker-Pool
- Worker laufen als nebenläufige Goroutinen
- Jeder Worker verarbeitet eine Nachricht auf einmal
- Nachrichten werden Round-Robin aus dem Delivery-Channel verteilt
- Prefetch-Puffer ermöglicht es dem Treiber, voraus zu liefern
Beispiel
concurrency: 3
prefetch: 10
Ablauf:
1. Treiber liefert bis zu 10 Nachrichten in den Puffer
2. 3 Worker holen nebenläufig aus dem Puffer
3. Wenn Worker fertig sind, füllt sich der Puffer nach
4. Gegendruck wenn alle Worker beschäftigt und Puffer voll
Kontrolliertes Herunterfahren
Beim Stoppen:
- Keine neuen Lieferungen mehr annehmen
- Worker-Kontexte abbrechen
- Auf laufende Nachrichten warten (mit Timeout)
- Timeout-Fehler zurückgeben wenn Worker nicht fertig werden
Queue-Deklaration
# Queue-Treiber (Memory für Dev/Test)
- name: queue_driver
kind: queue.driver.memory
lifecycle:
auto_start: true
# Queue-Definition
- name: orders
kind: queue.queue
driver: app:queue_driver
queue_name: orders # Namen überschreiben (Standard: Entry-Name)
codec: json # Payload-Codec (optional)
dead_letter: # Dead-Letter-Behandlung (optional)
queue: app:dlq
max_attempts: 5
driver_options:
memory:
max_length: 10000 # Memory-Treiber: begrenzte Queue-Größe
| Feld | Beschreibung |
|---|---|
queue_name |
Queue-Namen überschreiben (Standard: Entry-ID-Name) |
codec |
Name des Payload-Codecs |
dead_letter.queue |
Registry-ID der Dead-Letter-Queue |
dead_letter.max_attempts |
Maximale Zustellversuche vor Weiterleitung an die DLQ |
driver_options |
Treiberspezifische Einstellungen, nach Treibernamen geschlüsselt |
Memory-Treiber
Eingebaute In-Memory-Queue für Entwicklung/Tests:
- Kind:
queue.driver.memory - Nachrichten im Speicher gehalten
- Nack reiht die Nachricht wieder am Ende der Queue ein
- Keine Persistenz über Neustarts hinweg
Siehe auch
- Message Queue - Queue-Modul-Referenz
- Queue-Konfiguration - Queue-Treiber und Entry-Definitionen
- Supervision-Bäume - Consumer-Lebenszyklus
- Prozessverwaltung - Prozess-Spawning und Kommunikation