Activities
Activities sind Funktionen, die nicht-deterministische Operationen ausführen. Jeder function.lua- oder process.lua-Eintrag kann als Temporal-Activity registriert werden, indem Metadaten hinzugefügt werden.
Activities registrieren
meta.temporal.activity hinzufügen, um eine Funktion als Activity zu registrieren:
- name: charge_payment
kind: function.lua
source: file://payment.lua
method: charge
modules:
- http_client
- json
meta:
temporal:
activity:
worker: app:worker
Metadaten-Felder
| Feld | Erforderlich | Beschreibung |
|---|---|---|
worker |
Ja | Referenz auf temporal.worker-Eintrag |
local |
Nein | Als lokale Activity ausführen (Standard: false) |
Implementierung
Activities sind reguläre Lua-Funktionen:
-- payment.lua
local http = require("http_client")
local json = require("json")
local function charge(input)
local response, err = http.post("https://api.stripe.com/v1/charges", {
headers = {
["Authorization"] = "Bearer " .. input.api_key,
["Content-Type"] = "application/json"
},
body = json.encode({
amount = input.amount,
currency = input.currency,
source = input.token
})
})
if err then
return nil, err
end
return json.decode(response:body())
end
return { charge = charge }
Activities aufrufen
Aus Workflows das funcs-Modul verwenden:
local funcs = require("funcs")
local result, err = funcs.call("app:charge_payment", {
amount = 5000,
currency = "usd",
token = "tok_visa",
api_key = ctx.stripe_key
})
if err then
return nil, err
end
Activity-Optionen
Timeouts, Retry-Verhalten und andere Ausführungsparameter mit dem Executor-Builder konfigurieren:
local funcs = require("funcs")
local executor = funcs.new():with_options({
["activity.start_to_close_timeout"] = "30s",
["activity.schedule_to_close_timeout"] = "5m",
["activity.heartbeat_timeout"] = "10s",
["activity.retry_policy"] = {
maximum_attempts = 3,
initial_interval = 1000,
backoff_coefficient = 2.0,
maximum_interval = 60000,
}
})
local result, err = executor:call("app:charge_payment", input)
Der Executor ist unveränderlich und wiederverwendbar. Einmal erstellen und für mehrere Aufrufe verwenden:
local reliable = funcs.new():with_options({
["activity.start_to_close_timeout"] = "60s",
["activity.retry_policy"] = {
maximum_attempts = 5,
initial_interval = 2000,
backoff_coefficient = 2.0,
maximum_interval = 120000,
}
})
local a, err = reliable:call("app:step_one", input)
local b, err = reliable:call("app:step_two", a)
Optionsreferenz
| Option | Typ | Standard | Beschreibung |
|---|---|---|---|
activity.start_to_close_timeout |
duration | 10m | Maximale Zeit für Activity-Ausführung |
activity.schedule_to_close_timeout |
duration | - | Maximale Zeit von Planung bis Abschluss |
activity.schedule_to_start_timeout |
duration | - | Maximale Zeit bis Activity startet |
activity.heartbeat_timeout |
duration | - | Maximale Zeit zwischen Heartbeats |
activity.id |
string | - | Benutzerdefinierte Activity-Ausführungs-ID |
activity.task_queue |
string | - | Task-Queue für diesen Aufruf überschreiben |
activity.wait_for_cancellation |
boolean | false | Auf Activity-Stornierung warten |
activity.disable_eager_execution |
boolean | false | Sofortige Ausführung deaktivieren |
activity.retry_policy |
table | - | Retry-Konfiguration (siehe unten) |
Duration-Werte akzeptieren Strings ("5s", "10m", "1h") oder Millisekunden als Zahlen.
Retry-Richtlinie
Automatisches Retry-Verhalten für fehlgeschlagene Activities konfigurieren:
["activity.retry_policy"] = {
initial_interval = 1000, -- ms before first retry
backoff_coefficient = 2.0, -- multiplier for each retry
maximum_interval = 300000, -- max interval between retries (ms)
maximum_attempts = 10, -- max retry attempts (0 = unlimited)
non_retryable_error_types = { -- errors that skip retries
"INVALID",
"PERMISSION_DENIED"
}
}
| Feld | Typ | Standard | Beschreibung |
|---|---|---|---|
initial_interval |
number | 1000 | Millisekunden vor dem ersten Retry |
backoff_coefficient |
number | 2.0 | Multiplikator, der bei jedem Retry auf das Intervall angewendet wird |
maximum_interval |
number | - | Obergrenze für Retry-Intervall (ms) |
maximum_attempts |
number | 0 | Maximale Versuche (0 = unbegrenzt) |
non_retryable_error_types |
array | - | Fehlerarten, die Retries umgehen |
Timeout-Beziehungen
|--- schedule_to_close_timeout --------------------------------|
|--- schedule_to_start_timeout ---|--- start_to_close_timeout -|
(waiting in queue) (executing)
start_to_close_timeout: Wie lange die Activity selbst laufen darf. Dies ist das am häufigsten verwendete Timeout.schedule_to_close_timeout: Gesamtzeit von der Planung der Activity bis zum Abschluss, einschließlich Wartezeit in der Queue und Retries.schedule_to_start_timeout: Maximale Zeit, die die Activity in der Task-Queue auf einen Worker warten kann.heartbeat_timeout: Für langlebige Activities die maximale Zeit zwischen Heartbeat-Meldungen.
Lokale Activities
Lokale Activities werden im Workflow-Worker-Prozess ohne separates Task-Queue-Polling ausgeführt:
- name: validate_input
kind: function.lua
source: file://validate.lua
method: validate
modules:
- json
meta:
temporal:
activity:
worker: app:worker
local: true
Eigenschaften:
- Werden im Workflow-Worker-Prozess ausgeführt
- Geringere Latenz (kein Task-Queue-Roundtrip)
- Kein separater Task-Queue-Overhead
- Beschränkt auf kurze Ausführungszeiten
- Kein Heartbeating
Lokale Activities eignen sich für schnelle, kurze Operationen wie Eingabevalidierung, Datentransformation oder Cache-Abfragen.
Activity-Benennung
Activities werden mit ihrer vollständigen Entry-ID als Namen registriert:
namespace: app
entries:
- name: charge_payment
kind: function.lua
# ...
Activity-Name: app:charge_payment
Kontextpropagierung
Kontextwerte, die beim Starten des Workflows gesetzt werden, sind innerhalb von Activities verfügbar:
-- spawner sets context
local spawner = process.with_context({
user_id = "user-1",
tenant = "tenant-1",
})
local pid = spawner:spawn("app:order_workflow", "app:worker", order)
-- activity reads context
local ctx = require("ctx")
local function process_order(input)
local user_id = ctx.get("user_id") -- "user-1"
local tenant = ctx.get("tenant") -- "tenant-1"
-- use context for authorization, logging, etc.
end
Activities, die aus einem Workflow mit funcs.new():with_context() aufgerufen werden, propagieren ebenfalls den Kontext:
-- inside workflow
local executor = funcs.new():with_context({trace_id = "abc-123"})
local result, err = executor:call("app:charge_payment", input)
Fehlerbehandlung
Fehler über das Standard-Lua-Muster zurückgeben:
local errors = require("errors")
local function charge(input)
if not input.amount or input.amount <= 0 then
return nil, errors.new("INVALID", "amount must be positive")
end
local response, err = http.post(url, options)
if err then
return nil, errors.wrap(err, "payment API failed")
end
if response:status() >= 400 then
return nil, errors.new("FAILED", "payment declined")
end
return json.decode(response:body())
end
Fehlerobjekte
An Workflows weitergegebene Activity-Fehler enthalten strukturierte Metadaten:
local result, err = funcs.call("app:charge_payment", input)
if err then
err:kind() -- error classification string
err:retryable() -- boolean, whether retry makes sense
err:message() -- human-readable error message
end
Fehlermodi
| Fehler | Fehlerart | Wiederholbar | Beschreibung |
|---|---|---|---|
| Anwendungsfehler | variiert | variiert | Von Activity-Code zurückgegebener Fehler |
| Laufzeitabsturz | INTERNAL |
ja | Unbehandelter Lua-Fehler in Activity |
| Fehlende Activity | NOT_FOUND |
nein | Activity nicht beim Worker registriert |
| Timeout | TIMEOUT |
ja | Activity hat konfiguriertes Timeout überschritten |
local executor = funcs.new():with_options({
["activity.retry_policy"] = {maximum_attempts = 1}
})
local result, err = executor:call("app:missing_activity", input)
if err then
print(err:kind()) -- "NOT_FOUND"
print(err:retryable()) -- false
end
Prozess-Activities
process.lua-Einträge können ebenfalls als Activities für langlebige Operationen registriert werden:
- name: long_task
kind: process.lua
source: file://long_task.lua
method: main
modules:
- http_client
meta:
temporal:
activity:
worker: app:worker
Siehe auch
- Übersicht - Konfiguration
- Workflows - Workflow-Implementierung
- Funktionen - Funktionsmodul
- Fehlerbehandlung - Fehlertypen und -muster