액티비티

액티비티는 비결정론적 작업을 실행하는 함수입니다. 모든 function.lua 또는 process.lua 엔트리는 메타데이터를 추가하여 Temporal 액티비티로 등록할 수 있습니다.

액티비티 등록

함수를 액티비티로 등록하려면 meta.temporal.activity를 추가하세요:

- name: charge_payment
  kind: function.lua
  source: file://payment.lua
  method: charge
  modules:
    - http_client
    - json
  meta:
    temporal:
      activity:
        worker: app:worker

메타데이터 필드

필드 필수 설명
worker temporal.worker 엔트리 참조
local 아니오 로컬 액티비티로 실행 (기본값: false)

구현

액티비티는 일반 Lua 함수입니다:

-- payment.lua
local http = require("http_client")
local json = require("json")

local function charge(input)
    local response, err = http.post("https://api.stripe.com/v1/charges", {
        headers = {
            ["Authorization"] = "Bearer " .. input.api_key,
            ["Content-Type"] = "application/json"
        },
        body = json.encode({
            amount = input.amount,
            currency = input.currency,
            source = input.token
        })
    })

    if err then
        return nil, err
    end

    return json.decode(response:body())
end

return { charge = charge }

액티비티 호출

워크플로우에서 funcs 모듈을 사용합니다:

local funcs = require("funcs")

local result, err = funcs.call("app:charge_payment", {
    amount = 5000,
    currency = "usd",
    token = "tok_visa",
    api_key = ctx.stripe_key
})

if err then
    return nil, err
end

액티비티 옵션

executor 빌더를 사용하여 타임아웃, 재시도 동작, 기타 실행 파라미터를 설정합니다:

local funcs = require("funcs")

local executor = funcs.new():with_options({
    ["activity.start_to_close_timeout"] = "30s",
    ["activity.schedule_to_close_timeout"] = "5m",
    ["activity.heartbeat_timeout"] = "10s",
    ["activity.retry_policy"] = {
        maximum_attempts = 3,
        initial_interval = 1000,
        backoff_coefficient = 2.0,
        maximum_interval = 60000,
    }
})

local result, err = executor:call("app:charge_payment", input)

executor는 불변이며 재사용 가능합니다. 한 번 빌드하고 여러 호출에 사용하세요:

local reliable = funcs.new():with_options({
    ["activity.start_to_close_timeout"] = "60s",
    ["activity.retry_policy"] = {
        maximum_attempts = 5,
        initial_interval = 2000,
        backoff_coefficient = 2.0,
        maximum_interval = 120000,
    }
})

local a, err = reliable:call("app:step_one", input)
local b, err = reliable:call("app:step_two", a)

옵션 레퍼런스

옵션 타입 기본값 설명
activity.start_to_close_timeout duration 10m 액티비티 실행 최대 시간
activity.schedule_to_close_timeout duration - 스케줄링부터 완료까지 최대 시간
activity.schedule_to_start_timeout duration - 액티비티 시작 전 최대 대기 시간
activity.heartbeat_timeout duration - heartbeat 사이 최대 시간
activity.id string - 커스텀 액티비티 실행 ID
activity.task_queue string - 이 호출의 태스크 큐 오버라이드
activity.wait_for_cancellation boolean false 액티비티 취소 대기
activity.disable_eager_execution boolean false 즉시 실행 비활성화
activity.retry_policy table - 재시도 설정 (아래 참조)

duration 값은 문자열 ("5s", "10m", "1h") 또는 밀리초 숫자를 허용합니다.

재시도 정책

실패한 액티비티의 자동 재시도 동작을 설정합니다:

["activity.retry_policy"] = {
    initial_interval = 1000,         -- 첫 재시도 전 ms
    backoff_coefficient = 2.0,       -- 재시도마다 적용되는 승수
    maximum_interval = 300000,       -- 재시도 간격 최대값 (ms)
    maximum_attempts = 10,           -- 최대 재시도 횟수 (0 = 무제한)
    non_retryable_error_types = {    -- 재시도를 건너뛰는 에러
        "INVALID",
        "PERMISSION_DENIED"
    }
}
필드 타입 기본값 설명
initial_interval number 1000 첫 번째 재시도 전 밀리초
backoff_coefficient number 2.0 재시도마다 간격에 적용되는 승수
maximum_interval number - 재시도 간격 상한 (ms)
maximum_attempts number 0 최대 시도 횟수 (0 = 무제한)
non_retryable_error_types array - 재시도를 건너뛰는 에러 종류

타임아웃 관계

|--- schedule_to_close_timeout --------------------------------|
|--- schedule_to_start_timeout ---|--- start_to_close_timeout -|
     (waiting in queue)                (executing)
  • start_to_close_timeout: 액티비티 자체가 실행될 수 있는 시간. 가장 많이 사용되는 타임아웃입니다.
  • schedule_to_close_timeout: 액티비티가 스케줄링된 시점부터 완료까지의 총 시간으로, 큐 대기 시간과 재시도를 포함합니다.
  • schedule_to_start_timeout: 워커가 액티비티를 선택하기 전 태스크 큐에서 대기할 수 있는 최대 시간입니다.
  • heartbeat_timeout: 장기 실행 액티비티에서 heartbeat 보고 사이의 최대 시간입니다.

로컬 액티비티

로컬 액티비티는 별도의 태스크 큐 폴링 없이 워크플로우 워커 프로세스에서 실행됩니다:

- name: validate_input
  kind: function.lua
  source: file://validate.lua
  method: validate
  modules:
    - json
  meta:
    temporal:
      activity:
        worker: app:worker
        local: true

특징:

  • 워크플로우 워커 프로세스에서 실행
  • 낮은 지연 시간 (태스크 큐 왕복 없음)
  • 별도의 태스크 큐 오버헤드 없음
  • 짧은 실행 시간으로 제한 (local_activity_options.schedule_to_close_timeout에 의해 제한되며, 일반적으로 몇 초)
  • heartbeat 없음

입력 검증, 데이터 변환, 캐시 조회와 같은 빠르고 짧은 작업에 로컬 액티비티를 사용하세요. 장기 실행 작업에는 일반 액티비티를 사용하세요.

액티비티 명명

액티비티는 전체 엔트리 ID를 이름으로 등록됩니다:

namespace: app
entries:
  - name: charge_payment
    kind: function.lua
    # ...

액티비티 이름: app:charge_payment

컨텍스트 전파

워크플로우 스폰 시 설정된 컨텍스트 값은 액티비티 내에서 사용할 수 있습니다:

-- Spawner sets context
local spawner = process.with_context({
    user_id = "user-1",
    tenant = "tenant-1",
})
local pid = spawner:spawn("app:order_workflow", "app:worker", order)
-- Activity reads context
local ctx = require("ctx")

local function process_order(input)
    local user_id = ctx.get("user_id")   -- "user-1"
    local tenant = ctx.get("tenant")     -- "tenant-1"
    -- use context for authorization, logging, etc.
end

워크플로우 내에서 funcs.new():with_context()로 호출된 액티비티에도 컨텍스트가 전파됩니다:

-- Inside workflow
local executor = funcs.new():with_context({trace_id = "abc-123"})
local result, err = executor:call("app:charge_payment", input)

에러 처리

표준 Lua 패턴으로 에러를 반환합니다:

local errors = require("errors")

local function charge(input)
    if not input.amount or input.amount <= 0 then
        return nil, errors.new("INVALID", "amount must be positive")
    end

    local response, err = http.post(url, options)
    if err then
        return nil, errors.wrap(err, "payment API failed")
    end

    if response:status() >= 400 then
        return nil, errors.new("FAILED", "payment declined")
    end

    return json.decode(response:body())
end

에러 객체

워크플로우에 전파된 액티비티 에러는 구조화된 메타데이터를 포함합니다:

local result, err = funcs.call("app:charge_payment", input)
if err then
    err:kind()       -- error classification string
    err:retryable()  -- boolean, whether retry makes sense
    err:message()    -- human-readable error message
end

실패 모드

실패 에러 종류 재시도 가능 설명
애플리케이션 에러 액티비티가 반환한 것 반환된 에러에서 상속됨 return nil, err로 액티비티 코드가 반환한 에러
런타임 크래시 INTERNAL true 액티비티의 처리되지 않은 Lua 에러
누락된 액티비티 NOT_FOUND false 워커에 등록되지 않은 액티비티
타임아웃 TIMEOUT true 설정된 타임아웃을 초과한 액티비티
local executor = funcs.new():with_options({
    ["activity.retry_policy"] = {maximum_attempts = 1}
})

local result, err = executor:call("app:missing_activity", input)
if err then
    print(err:kind())      -- "NOT_FOUND"
    print(err:retryable())  -- false
end

프로세스 액티비티

process.lua 엔트리도 장기 실행 작업을 위해 액티비티로 등록할 수 있습니다:

- name: long_task
  kind: process.lua
  source: file://long_task.lua
  method: main
  modules:
    - http_client
  meta:
    temporal:
      activity:
        worker: app:worker

참고