アクティビティ

アクティビティは非決定論的な操作を実行する関数です。任意のfunction.luaまたはprocess.luaエントリはメタデータを追加することでTemporalアクティビティとして登録できます。

アクティビティの登録

関数をアクティビティとして登録するにはmeta.temporal.activityを追加:

- name: charge_payment
  kind: function.lua
  source: file://payment.lua
  method: charge
  modules:
    - http_client
    - json
  meta:
    temporal:
      activity:
        worker: app:worker

メタデータフィールド

フィールド 必須 説明
worker はい temporal.workerエントリへの参照
local いいえ ローカルアクティビティとして実行(デフォルト: false)

実装

アクティビティは通常のLua関数です:

-- payment.lua
local http = require("http_client")
local json = require("json")

local function charge(input)
    local response, err = http.post("https://api.stripe.com/v1/charges", {
        headers = {
            ["Authorization"] = "Bearer " .. input.api_key,
            ["Content-Type"] = "application/json"
        },
        body = json.encode({
            amount = input.amount,
            currency = input.currency,
            source = input.token
        })
    })

    if err then
        return nil, err
    end

    return json.decode(response:body())
end

return { charge = charge }

アクティビティの呼び出し

ワークフローからfuncsモジュールを使用:

local funcs = require("funcs")

local result, err = funcs.call("app:charge_payment", {
    amount = 5000,
    currency = "usd",
    token = "tok_visa",
    api_key = ctx.stripe_key
})

if err then
    return nil, err
end

アクティビティオプション

executor ビルダーを使用して、タイムアウト、リトライ動作、その他の実行パラメータを設定:

local funcs = require("funcs")

local executor = funcs.new():with_options({
    ["activity.start_to_close_timeout"] = "30s",
    ["activity.schedule_to_close_timeout"] = "5m",
    ["activity.heartbeat_timeout"] = "10s",
    ["activity.retry_policy"] = {
        maximum_attempts = 3,
        initial_interval = 1000,
        backoff_coefficient = 2.0,
        maximum_interval = 60000,
    }
})

local result, err = executor:call("app:charge_payment", input)

executor は不変で再利用可能です。一度構築すれば複数の呼び出しに使用できます:

local reliable = funcs.new():with_options({
    ["activity.start_to_close_timeout"] = "60s",
    ["activity.retry_policy"] = {
        maximum_attempts = 5,
        initial_interval = 2000,
        backoff_coefficient = 2.0,
        maximum_interval = 120000,
    }
})

local a, err = reliable:call("app:step_one", input)
local b, err = reliable:call("app:step_two", a)

オプションリファレンス

オプション デフォルト 説明
activity.start_to_close_timeout duration 10m アクティビティ実行の最大時間
activity.schedule_to_close_timeout duration - スケジューリングから完了までの最大時間
activity.schedule_to_start_timeout duration - アクティビティ開始までの最大時間
activity.heartbeat_timeout duration - ハートビート間の最大時間
activity.id string - カスタムアクティビティ実行ID
activity.task_queue string - この呼び出しのタスクキューをオーバーライド
activity.wait_for_cancellation boolean false アクティビティキャンセルを待機
activity.disable_eager_execution boolean false イーガー実行を無効化
activity.retry_policy table - リトライ設定(下記参照)

duration値は文字列("5s""10m""1h")またはミリ秒の数値を受け付けます。

リトライポリシー

失敗したアクティビティの自動リトライ動作を設定:

["activity.retry_policy"] = {
    initial_interval = 1000,         -- ms before first retry
    backoff_coefficient = 2.0,       -- multiplier for each retry
    maximum_interval = 300000,       -- max interval between retries (ms)
    maximum_attempts = 10,           -- max retry attempts (0 = unlimited)
    non_retryable_error_types = {    -- errors that skip retries
        "INVALID",
        "PERMISSION_DENIED"
    }
}
フィールド デフォルト 説明
initial_interval number 1000 最初のリトライまでのミリ秒
backoff_coefficient number 2.0 リトライごとに間隔に乗算される係数
maximum_interval number - リトライ間隔の上限(ミリ秒)
maximum_attempts number 0 最大試行回数(0 = 無制限)
non_retryable_error_types array - リトライをバイパスするエラー種別

タイムアウトの関係

|--- schedule_to_close_timeout --------------------------------|
|--- schedule_to_start_timeout ---|--- start_to_close_timeout -|
     (waiting in queue)                (executing)
  • start_to_close_timeout: アクティビティ本体の実行時間。最も一般的に使用されるタイムアウトです。
  • schedule_to_close_timeout: アクティビティがスケジュールされてから完了するまでの合計時間。キュー待機時間とリトライを含みます。
  • schedule_to_start_timeout: ワーカーがアクティビティを取得するまでのタスクキュー内の最大待機時間。
  • heartbeat_timeout: 長時間実行アクティビティにおけるハートビート報告間の最大時間。

ローカルアクティビティ

ローカルアクティビティはワークフローワーカープロセス内で、別のタスクキューポーリングなしで実行されます:

- name: validate_input
  kind: function.lua
  source: file://validate.lua
  method: validate
  modules:
    - json
  meta:
    temporal:
      activity:
        worker: app:worker
        local: true

特性:

  • ワークフローワーカープロセス内で実行
  • 低レイテンシー(タスクキューのラウンドトリップなし)
  • 別のタスクキューオーバーヘッドなし
  • 短い実行時間に限定
  • ハートビートなし

入力バリデーション、データ変換、キャッシュルックアップなどの高速で短い操作にローカルアクティビティを使用します。

アクティビティの命名

アクティビティはフルエントリIDを名前として登録されます:

namespace: app
entries:
  - name: charge_payment
    kind: function.lua
    # ...

アクティビティ名:app:charge_payment

コンテキスト伝播

ワークフローのスポーン時に設定されたコンテキスト値はアクティビティ内で利用可能です:

-- Spawner sets context
local spawner = process.with_context({
    user_id = "user-1",
    tenant = "tenant-1",
})
local pid = spawner:spawn("app:order_workflow", "app:worker", order)
-- Activity reads context
local ctx = require("ctx")

local function process_order(input)
    local user_id = ctx.get("user_id")   -- "user-1"
    local tenant = ctx.get("tenant")     -- "tenant-1"
    -- use context for authorization, logging, etc.
end

funcs.new():with_context()で呼び出されたアクティビティもコンテキストを伝播します:

-- Inside workflow
local executor = funcs.new():with_context({trace_id = "abc-123"})
local result, err = executor:call("app:charge_payment", input)

エラー処理

標準のLuaパターンでエラーを返す:

local errors = require("errors")

local function charge(input)
    if not input.amount or input.amount <= 0 then
        return nil, errors.new("INVALID", "amount must be positive")
    end

    local response, err = http.post(url, options)
    if err then
        return nil, errors.wrap(err, "payment API failed")
    end

    if response:status() >= 400 then
        return nil, errors.new("FAILED", "payment declined")
    end

    return json.decode(response:body())
end

エラーオブジェクト

ワークフローに伝播されるアクティビティエラーは構造化されたメタデータを持つ:

local result, err = funcs.call("app:charge_payment", input)
if err then
    err:kind()       -- error classification string
    err:retryable()  -- boolean, whether retry makes sense
    err:message()    -- human-readable error message
end

障害モード

障害 エラー種別 リトライ可能 説明
アプリケーションエラー 可変 可変 アクティビティコードが返したエラー
ランタイムクラッシュ INTERNAL はい アクティビティ内の未処理Luaエラー
アクティビティ未登録 NOT_FOUND いいえ ワーカーに登録されていないアクティビティ
タイムアウト TIMEOUT はい アクティビティが設定されたタイムアウトを超過
local executor = funcs.new():with_options({
    ["activity.retry_policy"] = {maximum_attempts = 1}
})

local result, err = executor:call("app:missing_activity", input)
if err then
    print(err:kind())      -- "NOT_FOUND"
    print(err:retryable())  -- false
end

プロセスアクティビティ

process.luaエントリも長時間実行操作用にアクティビティとして登録可能:

- name: long_task
  kind: process.lua
  source: file://long_task.lua
  method: main
  modules:
    - http_client
  meta:
    temporal:
      activity:
        worker: app:worker

関連項目