Queue
Wippy bietet ein Queue-System für asynchrone Nachrichtenverarbeitung mit konfigurierbaren Treibern und Konsumenten.
Architektur
flowchart LR
P[Publisher] --> D[Driver]
D --> Q[Queue]
Q --> C[Consumer]
C --> W[Worker Pool]
W --> F[Function]
- Driver - Backend-Implementierung (Memory, AMQP, Redis)
- Queue - Logische Queue gebunden an einen Driver
- Consumer - Verbindet Queue mit Handler mit Nebenläufigkeits-Einstellungen
- Worker Pool - Nebenläufige Nachrichtenverarbeiter
Mehrere Queues können einen Driver teilen. Mehrere Consumer können aus derselben Queue verarbeiten.
Entry-Typen
| Kind | Beschreibung |
|---|---|
queue.driver.memory |
In-Memory-Queue-Treiber |
queue.queue |
Queue-Deklaration mit Driver-Referenz |
queue.consumer |
Consumer der Nachrichten verarbeitet |
Driver-Konfiguration
Memory-Driver
In-Memory-Driver für Entwicklung und Tests.
- name: memory_driver
kind: queue.driver.memory
lifecycle:
auto_start: true
Queue-Konfiguration
- name: tasks
kind: queue.queue
driver: app.queue:memory_driver
| Feld | Typ | Erforderlich | Beschreibung |
|---|---|---|---|
driver |
Registry-ID | Ja | Referenz auf Queue-Driver |
options |
Map | Nein | Treiberspezifische Optionen |
Consumer-Konfiguration
- name: task_consumer
kind: queue.consumer
queue: app.queue:tasks
func: app.queue:task_handler
concurrency: 4
prefetch: 20
lifecycle:
auto_start: true
depends_on:
- app.queue:tasks
| Feld | Standard | Max | Beschreibung |
|---|---|---|---|
queue |
Erforderlich | - | Queue-Registry-ID |
func |
Erforderlich | - | Handler-Funktions-Registry-ID |
concurrency |
1 | 1000 | Parallele Worker-Anzahl |
prefetch |
10 | 10000 | Nachrichtenpuffer-Größe |
Worker-Pool
Worker laufen als nebenläufige Goroutinen:
concurrency: 3, prefetch: 10
1. Driver liefert bis zu 10 Nachrichten in den Puffer
2. 3 Worker holen nebenläufig aus dem Puffer
3. Wenn Worker fertig sind, füllt sich der Puffer nach
4. Gegendruck wenn alle Worker beschäftigt und Puffer voll
Handler-Funktion
Consumer-Funktionen empfangen Nachrichtendaten und geben Erfolg oder Fehler zurück:
local json = require("json")
local logger = require("logger")
local function handler(body)
local data = json.decode(body)
logger.info("Verarbeite", {task_id = data.id})
local result, err = process_task(data)
if err then
return nil, err -- Nack: Nachricht erneut einreihen
end
return result -- Ack: Aus Queue entfernen
end
return handler
- name: task_handler
kind: function.lua
source: file://task_handler.lua
modules:
- json
- logger
Bestätigung
| Handler-Ergebnis | Aktion | Effekt |
|---|---|---|
| Rückgabewert | Ack | Nachricht aus Queue entfernt |
| Fehler zurückgeben | Nack | Nachricht erneut eingereiht (treiberabhängig) |
Nachrichten veröffentlichen
Aus Lua-Code:
local queue = require("queue")
queue.publish("app.queue:tasks", {
id = "task-123",
action = "process",
data = payload
})
Siehe Queue-Modul für vollständige API.
Kontrolliertes Herunterfahren
Beim Stoppen des Consumers:
- Keine neuen Lieferungen mehr annehmen
- Worker-Kontexte abbrechen
- Auf laufende Nachrichten warten (mit Timeout)
- Fehler zurückgeben wenn Worker nicht rechtzeitig fertig werden
Siehe auch
- Queue-Modul - Lua-API-Referenz
- Queue-Konsumenten-Anleitung - Consumer-Muster und Worker-Pools
- Supervision - Consumer-Lebenszyklus-Verwaltung