Procesos y Mensajería

Genere procesos aislados y comuníquese mediante paso de mensajes.

Resumen

Los procesos proporcionan unidades de ejecución aisladas que se comunican mediante paso de mensajes. Cada proceso tiene su propio inbox y puede suscribirse a temas de mensajes específicos.

Esta página es una introducción: cada fragmento muestra una API de forma aislada. Para una aplicación completa y ejecutable que conecta generación, monitoreo y mensajería, consulta el tutorial de Echo Service.

Conceptos clave:

  • Generar procesos con process.spawn() y sus variantes
  • Enviar mensajes a PIDs o nombres registrados mediante temas
  • Recibir mensajes usando process.listen() o process.inbox()
  • Monitorear el ciclo de vida del proceso con eventos
  • Enlazar procesos para el manejo coordinado de fallos

Generación de Procesos

Genere un nuevo proceso desde una referencia de entrada.

local pid, err = process.spawn("app.test.process:echo_worker", "app:processes", "hello")
if err then
    return false, "spawn failed: " .. err
end

-- pid es un identificador string para el proceso generado
print("Started worker:", pid)

Parámetros:

  • Referencia de entrada (p. ej., "app.test.process:echo_worker")
  • Referencia al host (p. ej., "app:processes")
  • Argumentos opcionales pasados a la función main del worker

Obtener tu Propio PID

local my_pid = process.pid()
-- Devuelve el PID del proceso actual como string

Paso de Mensajes

Los mensajes usan un sistema de enrutamiento basado en temas. Envíe mensajes a PIDs con un tema y luego recíbalos por suscripción a tema o por inbox.

Envío de Mensajes

-- Enviar a proceso por PID
local sent, err = process.send(worker_pid, "messages", "hello from parent")
if err then
    return false, "send failed: " .. err
end

-- send devuelve (bool, error)

Recepción por Suscripción a Tema

Suscríbase a temas específicos usando process.listen():

-- Worker que escucha mensajes en el tema "messages"
local function main()
    local ch = process.listen("messages")

    local msg = ch:receive()
    if msg then
        -- msg es el payload directamente
        print("Received:", msg)
        return true
    end

    return false
end

return { main = main }

Recepción por Inbox

El inbox recibe mensajes que no coinciden con ningún listener de tema:

local function main()
    local inbox_ch = process.inbox()
    local specific_ch = process.listen("specific_topic")

    while true do
        local result = channel.select({
            specific_ch:case_receive(),
            inbox_ch:case_receive()
        })

        if result.channel == specific_ch then
            -- Los mensajes a "specific_topic" llegan aquí
            local payload = result.value
        elseif result.channel == inbox_ch then
            -- Los mensajes a CUALQUIER otro tema llegan aquí
            local msg = result.value
            print("Inbox got:", msg:topic(), msg:payload():data())
        end
    end
end

Modo Mensaje para Información del Remitente

Use { message = true } para acceder al PID del remitente y al tema:

-- Worker que devuelve los mensajes al remitente
local function main()
    local ch = process.listen("echo", { message = true })

    local msg = ch:receive()
    if msg then
        local sender = msg:from()
        local data = msg:payload():data()

        if sender then
            process.send(sender, "reply", data)
        end
        return true
    end

    return false
end

return { main = main }

Monitoreo de Procesos

Monitoree procesos para recibir eventos EXIT cuando terminen.

Generar con Monitoreo

local events_ch = process.events()

local worker_pid, err = process.spawn_monitored(
    "app.test.process:events_exit_worker",
    "app:processes"
)
if err then
    return false, "spawn failed: " .. err
end

-- Esperar evento EXIT
local timeout = time.after("3s")
local result = channel.select {
    events_ch:case_receive(),
    timeout:case_receive(),
}

if result.channel == timeout then
    return false, "timeout waiting for EXIT event"
end

local event = result.value
if event.kind == process.event.EXIT then
    print("Worker exited:", event.from)
    if event.result and event.result.error then
        print("Exit error:", event.result.error)
    elseif event.result then
        print("Return value:", event.result.value)
    end
end

Monitoreo Explícito

Monitorear un proceso ya en ejecución:

local events_ch = process.events()

-- Generar sin monitoreo
local worker_pid, err = process.spawn("app.test.process:long_worker", "app:processes")
if err then
    return false, "spawn failed: " .. err
end

-- Añadir monitoreo explícitamente
local ok, monitor_err = process.monitor(worker_pid)
if monitor_err then
    return false, "monitor failed: " .. monitor_err
end

-- Ahora recibirá eventos EXIT de este worker

Detener el monitoreo:

local ok, err = process.unmonitor(worker_pid)

Enlace de Procesos

Enlace procesos para la gestión coordinada del ciclo de vida. Los procesos enlazados reciben eventos LINK_DOWN cuando los procesos enlazados fallan.

Generar Proceso Enlazado

-- El hijo termina si el padre falla (a menos que trap_links esté activo)
local pid, err = process.spawn_linked("app.test.process:child_worker", "app:processes")
if err then
    return false, "spawn_linked failed: " .. err
end

Enlace Explícito

-- Enlazar con proceso existente
local ok, err = process.link(target_pid)
if err then
    return false, "link failed: " .. err
end

-- Desenlazar
local ok, err = process.unlink(target_pid)

Por defecto, LINK_DOWN hace que el proceso falle. Active trap_links para recibirlo como evento:

local function main()
    -- Activar trap_links para recibir eventos LINK_DOWN en vez de fallar
    local ok, err = process.set_options({ trap_links = true })
    if not ok then
        return false, "set_options failed: " .. err
    end

    -- Verificar que trap_links está activo
    local opts = process.get_options()
    if not opts.trap_links then
        return false, "trap_links should be true"
    end

    local events_ch = process.events()

    -- Generar un proceso enlazado que fallará
    local error_pid, err2 = process.spawn_linked(
        "app.test.process:error_exit_worker",
        "app:processes"
    )
    if err2 then
        return false, "spawn error worker failed: " .. err2
    end

    -- Esperar evento LINK_DOWN
    local timeout = time.after("2s")
    local result = channel.select {
        events_ch:case_receive(),
        timeout:case_receive(),
    }

    if result.channel == timeout then
        return false, "timeout waiting for LINK_DOWN"
    end

    local event = result.value
    if event.kind == process.event.LINK_DOWN then
        print("Linked process died:", event.from)
        -- Manejar el fallo sin hacer crash
        return true
    end

    return false, "expected LINK_DOWN, got: " .. tostring(event.kind)
end

return { main = main }

Registro de Procesos

Registre nombres para procesos para habilitar búsquedas y mensajería por nombre.

Registrar Nombres

local function main()
    local test_name = "my_service_" .. tostring(os.time())

    -- Registrar el proceso actual con un nombre
    local ok, err = process.registry.register(test_name)
    if err then
        return false, "register failed: " .. err
    end

    -- Buscar el nombre registrado
    local pid, lookup_err = process.registry.lookup(test_name)
    if lookup_err then
        return false, "lookup failed: " .. lookup_err
    end

    -- Verificar que resuelve a nuestro PID
    if pid ~= process.pid() then
        return false, "lookup returned wrong pid"
    end

    return true
end

return { main = main }

Desregistrar Nombres

-- Desregistrar explícitamente
local unregistered = process.registry.unregister(test_name)
if not unregistered then
    print("Name was not registered")
end

-- La búsqueda tras desregistrar devuelve nil + error
local pid, err = process.registry.lookup(test_name)
-- pid será nil, err será no-nil

Los nombres se liberan automáticamente cuando el proceso termina.

Ejemplo Completo: Pool de Workers Monitoreados

Este ejemplo muestra un proceso padre que genera múltiples workers monitoreados y rastrea su finalización.

-- Proceso padre
local time = require("time")

local function main()
    local events_ch = process.events()

    -- Rastrear workers generados
    local workers = {}
    local worker_count = 5

    -- Generar múltiples workers monitoreados
    for i = 1, worker_count do
        local worker_pid, err = process.spawn_monitored(
            "app.test.process:task_worker",
            "app:processes",
            { task_id = i, value = i * 10 }
        )

        if err then
            return false, "spawn worker " .. i .. " failed: " .. err
        end

        workers[worker_pid] = { task_id = i, started = os.time() }
    end

    -- Esperar a que todos los workers terminen
    local completed = 0
    local timeout = time.after("10s")

    while completed < worker_count do
        local result = channel.select {
            events_ch:case_receive(),
            timeout:case_receive(),
        }

        if result.channel == timeout then
            return false, "timeout waiting for workers"
        end

        local event = result.value
        if event.kind == process.event.EXIT then
            local worker = workers[event.from]
            if worker then
                if event.result and event.result.error then
                    print("Worker " .. worker.task_id .. " failed:", event.result.error)
                else
                    print("Worker " .. worker.task_id .. " completed:", event.result and event.result.value)
                end
                completed = completed + 1
            end
        end
    end

    return true
end

return { main = main }

Proceso worker:

-- task_worker.lua
local time = require("time")

local function main(task)
    -- Simular trabajo
    time.sleep("100ms")

    -- Procesar tarea
    local result = task.value * 2

    return result
end

return { main = main }

Siguientes Pasos