Activities

Activities are functions that execute non-deterministic operations. Any function.lua or process.lua entry can be registered as a Temporal activity by adding metadata.

Registering Activities

Add meta.temporal.activity to register a function as an activity:

- name: charge_payment
  kind: function.lua
  source: file://payment.lua
  method: charge
  modules:
    - http_client
    - json
  meta:
    temporal:
      activity:
        worker: app:worker

Metadata Fields

Field Required Description
worker Yes Reference to temporal.worker entry
local No Execute as local activity (default: false)

Implementation

Activities are regular Lua functions:

-- payment.lua
local http = require("http_client")
local json = require("json")

local function charge(input)
    local response, err = http.post("https://api.stripe.com/v1/charges", {
        headers = {
            ["Authorization"] = "Bearer " .. input.api_key,
            ["Content-Type"] = "application/json"
        },
        body = json.encode({
            amount = input.amount,
            currency = input.currency,
            source = input.token
        })
    })

    if err then
        return nil, err
    end

    return json.decode(response:body())
end

return { charge = charge }

Calling Activities

From workflows, use the funcs module:

local funcs = require("funcs")

local result, err = funcs.call("app:charge_payment", {
    amount = 5000,
    currency = "usd",
    token = "tok_visa",
    api_key = ctx.stripe_key
})

if err then
    return nil, err
end

Activity Options

Configure timeouts, retry behavior, and other execution parameters using the executor builder:

local funcs = require("funcs")

local executor = funcs.new():with_options({
    ["activity.start_to_close_timeout"] = "30s",
    ["activity.schedule_to_close_timeout"] = "5m",
    ["activity.heartbeat_timeout"] = "10s",
    ["activity.retry_policy"] = {
        maximum_attempts = 3,
        initial_interval = 1000,
        backoff_coefficient = 2.0,
        maximum_interval = 60000,
    }
})

local result, err = executor:call("app:charge_payment", input)

The executor is immutable and reusable. Build it once and use it for multiple calls:

local reliable = funcs.new():with_options({
    ["activity.start_to_close_timeout"] = "60s",
    ["activity.retry_policy"] = {
        maximum_attempts = 5,
        initial_interval = 2000,
        backoff_coefficient = 2.0,
        maximum_interval = 120000,
    }
})

local a, err = reliable:call("app:step_one", input)
local b, err = reliable:call("app:step_two", a)

Options Reference

Option Type Default Description
activity.start_to_close_timeout duration 10m Max time for activity execution
activity.schedule_to_close_timeout duration - Max time from scheduling to completion
activity.schedule_to_start_timeout duration - Max time before activity starts
activity.heartbeat_timeout duration - Max time between heartbeats
activity.id string - Custom activity execution ID
activity.task_queue string - Override task queue for this call
activity.wait_for_cancellation boolean false Wait for activity cancellation
activity.disable_eager_execution boolean false Disable eager execution
activity.retry_policy table - Retry configuration (see below)

Duration values accept strings ("5s", "10m", "1h") or milliseconds as numbers.

Retry Policy

Configure automatic retry behavior for failed activities:

["activity.retry_policy"] = {
    initial_interval = 1000,         -- ms before first retry
    backoff_coefficient = 2.0,       -- multiplier for each retry
    maximum_interval = 300000,       -- max interval between retries (ms)
    maximum_attempts = 10,           -- max retry attempts (0 = unlimited)
    non_retryable_error_types = {    -- errors that skip retries
        "INVALID",
        "PERMISSION_DENIED"
    }
}
Field Type Default Description
initial_interval number 1000 Milliseconds before first retry
backoff_coefficient number 2.0 Multiplier applied to interval each retry
maximum_interval number - Cap on retry interval (ms)
maximum_attempts number 0 Max attempts (0 = unlimited)
non_retryable_error_types array - Error kinds that bypass retries

Timeout Relationships

|--- schedule_to_close_timeout --------------------------------|
|--- schedule_to_start_timeout ---|--- start_to_close_timeout -|
     (waiting in queue)                (executing)
  • start_to_close_timeout: How long the activity itself can run. This is the most commonly used timeout.
  • schedule_to_close_timeout: Total time from when the activity is scheduled until it completes, including queue wait time and retries.
  • schedule_to_start_timeout: Max time the activity can wait in the task queue before a worker picks it up.
  • heartbeat_timeout: For long-running activities, the max time between heartbeat reports.

Local Activities

Local activities execute in the workflow worker process without separate task queue polling:

- name: validate_input
  kind: function.lua
  source: file://validate.lua
  method: validate
  modules:
    - json
  meta:
    temporal:
      activity:
        worker: app:worker
        local: true

Characteristics:

  • Execute in workflow worker process
  • Lower latency (no task queue roundtrip)
  • No separate task queue overhead
  • Limited to short execution times
  • No heartbeating

Use local activities for fast, short operations like input validation, data transformation, or cache lookups.

Activity Naming

Activities are registered with their full entry ID as the name:

namespace: app
entries:
  - name: charge_payment
    kind: function.lua
    # ...

Activity name: app:charge_payment

Context Propagation

Context values set when spawning the workflow are available inside activities:

-- Spawner sets context
local spawner = process.with_context({
    user_id = "user-1",
    tenant = "tenant-1",
})
local pid = spawner:spawn("app:order_workflow", "app:worker", order)
-- Activity reads context
local ctx = require("ctx")

local function process_order(input)
    local user_id = ctx.get("user_id")   -- "user-1"
    local tenant = ctx.get("tenant")     -- "tenant-1"
    -- use context for authorization, logging, etc.
end

Activities called from a workflow with funcs.new():with_context() also propagate context:

-- Inside workflow
local executor = funcs.new():with_context({trace_id = "abc-123"})
local result, err = executor:call("app:charge_payment", input)

Error Handling

Return errors via the standard Lua pattern:

local errors = require("errors")

local function charge(input)
    if not input.amount or input.amount <= 0 then
        return nil, errors.new("INVALID", "amount must be positive")
    end

    local response, err = http.post(url, options)
    if err then
        return nil, errors.wrap(err, "payment API failed")
    end

    if response:status() >= 400 then
        return nil, errors.new("FAILED", "payment declined")
    end

    return json.decode(response:body())
end

Error Objects

Activity errors propagated to workflows carry structured metadata:

local result, err = funcs.call("app:charge_payment", input)
if err then
    err:kind()       -- error classification string
    err:retryable()  -- boolean, whether retry makes sense
    err:message()    -- human-readable error message
end

Failure Modes

Failure Error Kind Retryable Description
Application error varies varies Error returned by activity code
Runtime crash INTERNAL true Unhandled Lua error in activity
Missing activity NOT_FOUND false Activity not registered with worker
Timeout TIMEOUT true Activity exceeded configured timeout
local executor = funcs.new():with_options({
    ["activity.retry_policy"] = {maximum_attempts = 1}
})

local result, err = executor:call("app:missing_activity", input)
if err then
    print(err:kind())      -- "NOT_FOUND"
    print(err:retryable())  -- false
end

Process Activities

process.lua entries can also be registered as activities for long-running operations:

- name: long_task
  kind: process.lua
  source: file://long_task.lua
  method: main
  modules:
    - http_client
  meta:
    temporal:
      activity:
        worker: app:worker

See Also