Процессы и обмен сообщениями
Создание изолированных процессов и взаимодействие через передачу сообщений.
Обзор
Процессы предоставляют изолированные единицы выполнения, которые взаимодействуют через передачу сообщений. Каждый процесс имеет собственный почтовый ящик и может подписываться на конкретные темы сообщений.
Основные концепции:
- Создание процессов с помощью
process.spawn()и его вариантов - Отправка сообщений по PID или зарегистрированным именам через темы
- Получение сообщений через
process.listen()илиprocess.inbox() - Мониторинг жизненного цикла процессов через события
- Связывание процессов для координированной обработки отказов
Создание процессов
Создание нового процесса по ссылке на запись.
local pid, err = process.spawn("app.test.process:echo_worker", "app:processes", "hello")
if err then
return false, "spawn failed: " .. err
end
-- pid is a string identifier for the spawned process
print("Started worker:", pid)
Параметры:
- Ссылка на запись (например,
"app.test.process:echo_worker") - Ссылка на хост (например,
"app:processes") - Необязательные аргументы, передаваемые в функцию main воркера
Получение собственного PID
local my_pid = process.pid()
-- Returns string PID of current process
Передача сообщений
Сообщения используют маршрутизацию на основе тем. Отправляйте сообщения на PID с указанием темы, затем получайте через подписку на тему или через почтовый ящик.
Отправка сообщений
-- Send to process by PID
local sent, err = process.send(worker_pid, "messages", "hello from parent")
if err then
return false, "send failed: " .. err
end
-- send returns (bool, error)
Получение через подписку на тему
Подписывайтесь на конкретные темы с помощью process.listen():
-- Worker that listens for messages on "messages" topic
local function main()
local ch = process.listen("messages")
local msg = ch:receive()
if msg then
-- msg is the payload directly
print("Received:", msg)
return true
end
return false
end
return { main = main }
Получение через почтовый ящик
Почтовый ящик получает сообщения, не соответствующие ни одному слушателю тем:
local function main()
local inbox_ch = process.inbox()
local specific_ch = process.listen("specific_topic")
while true do
local result = channel.select({
specific_ch:case_receive(),
inbox_ch:case_receive()
})
if result.channel == specific_ch then
-- Messages to "specific_topic" arrive here
local payload = result.value
elseif result.channel == inbox_ch then
-- Messages to any OTHER topic arrive here
local msg = result.value
print("Inbox got:", msg.topic, msg.payload)
end
end
end
Режим сообщений для информации об отправителе
Используйте { message = true } для доступа к PID отправителя и теме:
-- Worker that echoes messages back to sender
local function main()
local ch = process.listen("echo", { message = true })
local msg = ch:receive()
if msg then
local sender = msg:from()
local payload = msg:payload()
if sender then
process.send(sender, "reply", payload)
end
return true
end
return false
end
return { main = main }
Мониторинг процессов
Мониторинг процессов для получения событий EXIT при их завершении.
Создание с мониторингом
local events_ch = process.events()
local worker_pid, err = process.spawn_monitored(
"app.test.process:events_exit_worker",
"app:processes"
)
if err then
return false, "spawn failed: " .. err
end
-- Wait for EXIT event
local timeout = time.after("3s")
local result = channel.select {
events_ch:case_receive(),
timeout:case_receive(),
}
if result.channel == timeout then
return false, "timeout waiting for EXIT event"
end
local event = result.value
if event.kind == process.event.EXIT then
print("Worker exited:", event.from)
if event.error then
print("Exit error:", event.error)
end
-- Access return value via event.result
end
Явный мониторинг
Мониторинг уже запущенного процесса:
local events_ch = process.events()
-- Spawn without monitoring
local worker_pid, err = process.spawn("app.test.process:long_worker", "app:processes")
if err then
return false, "spawn failed: " .. err
end
-- Add monitoring explicitly
local ok, monitor_err = process.monitor(worker_pid)
if monitor_err then
return false, "monitor failed: " .. monitor_err
end
-- Now will receive EXIT events for this worker
Прекращение мониторинга:
local ok, err = process.unmonitor(worker_pid)
Связывание процессов
Связывание процессов для координированного управления жизненным циклом. Связанные процессы получают события LINK_DOWN, когда связанный процесс завершается с ошибкой.
Создание связанного процесса
-- Child terminates if parent crashes (unless trap_links is set)
local pid, err = process.spawn_linked("app.test.process:child_worker", "app:processes")
if err then
return false, "spawn_linked failed: " .. err
end
Явное связывание
-- Link to existing process
local ok, err = process.link(target_pid)
if err then
return false, "link failed: " .. err
end
-- Unlink
local ok, err = process.unlink(target_pid)
Обработка событий LINK_DOWN
По умолчанию LINK_DOWN приводит к завершению процесса с ошибкой. Включите trap_links для получения события вместо падения:
local function main()
-- Enable trap_links to receive LINK_DOWN events instead of crashing
local ok, err = process.set_options({ trap_links = true })
if not ok then
return false, "set_options failed: " .. err
end
-- Verify trap_links is enabled
local opts = process.get_options()
if not opts.trap_links then
return false, "trap_links should be true"
end
local events_ch = process.events()
-- Spawn a linked process that will fail
local error_pid, err2 = process.spawn_linked(
"app.test.process:error_exit_worker",
"app:processes"
)
if err2 then
return false, "spawn error worker failed: " .. err2
end
-- Wait for LINK_DOWN event
local timeout = time.after("2s")
local result = channel.select {
events_ch:case_receive(),
timeout:case_receive(),
}
if result.channel == timeout then
return false, "timeout waiting for LINK_DOWN"
end
local event = result.value
if event.kind == process.event.LINK_DOWN then
print("Linked process died:", event.from)
-- Handle gracefully instead of crashing
return true
end
return false, "expected LINK_DOWN, got: " .. tostring(event.kind)
end
return { main = main }
Реестр процессов
Регистрация имён для процессов для поиска и обмена сообщениями по имени.
Регистрация имён
local function main()
local test_name = "my_service_" .. tostring(os.time())
-- Register current process with a name
local ok, err = process.registry.register(test_name)
if err then
return false, "register failed: " .. err
end
-- Lookup the registered name
local pid, lookup_err = process.registry.lookup(test_name)
if lookup_err then
return false, "lookup failed: " .. lookup_err
end
-- Verify it resolves to our PID
if pid ~= process.pid() then
return false, "lookup returned wrong pid"
end
return true
end
return { main = main }
Снятие регистрации
-- Unregister explicitly
local unregistered = process.registry.unregister(test_name)
if not unregistered then
print("Name was not registered")
end
-- Lookup after unregister returns nil + error
local pid, err = process.registry.lookup(test_name)
-- pid will be nil, err will be non-nil
Имена автоматически освобождаются при завершении процесса.
Полный пример: пул воркеров с мониторингом
Этот пример показывает родительский процесс, создающий несколько воркеров с мониторингом и отслеживающий их завершение.
-- Parent process
local time = require("time")
local function main()
local events_ch = process.events()
-- Track spawned workers
local workers = {}
local worker_count = 5
-- Spawn multiple monitored workers
for i = 1, worker_count do
local worker_pid, err = process.spawn_monitored(
"app.test.process:task_worker",
"app:processes",
{ task_id = i, value = i * 10 }
)
if err then
return false, "spawn worker " .. i .. " failed: " .. err
end
workers[worker_pid] = { task_id = i, started = os.time() }
end
-- Wait for all workers to complete
local completed = 0
local timeout = time.after("10s")
while completed < worker_count do
local result = channel.select {
events_ch:case_receive(),
timeout:case_receive(),
}
if result.channel == timeout then
return false, "timeout waiting for workers"
end
local event = result.value
if event.kind == process.event.EXIT then
local worker = workers[event.from]
if worker then
if event.error then
print("Worker " .. worker.task_id .. " failed:", event.error)
else
print("Worker " .. worker.task_id .. " completed:", event.result)
end
completed = completed + 1
end
end
end
return true
end
return { main = main }
Процесс-воркер:
-- task_worker.lua
local time = require("time")
local function main(task)
-- Simulate work
time.sleep("100ms")
-- Process task
local result = task.value * 2
return result
end
return { main = main }
Сводка
Создание процессов:
process.spawn()- Базовое создание, возвращает PIDprocess.spawn_monitored()- Создание с автоматическим мониторингомprocess.spawn_linked()- Создание со связыванием жизненных цикловprocess.pid()- Получить PID текущего процесса
Обмен сообщениями:
process.send(pid, topic, payload)- Отправить сообщение на PIDprocess.listen(topic)- Подписаться на тему, получать полезную нагрузкуprocess.listen(topic, { message = true })- Получать полное сообщение с:from(),:payload(),:topic()process.inbox()- Получать сообщения, не соответствующие слушателям
Мониторинг:
process.events()- Канал для событий EXIT и LINK_DOWNprocess.monitor(pid)- Мониторить существующий процессprocess.unmonitor(pid)- Прекратить мониторинг
Связывание:
process.link(pid)- Связать с процессомprocess.unlink(pid)- Отвязать от процессаprocess.set_options({ trap_links = true })- Получать LINK_DOWN как событие вместо паденияprocess.get_options()- Получить текущие параметры процесса
Реестр:
process.registry.register(name)- Зарегистрировать имя для текущего процессаprocess.registry.lookup(name)- Найти PID по имениprocess.registry.unregister(name)- Удалить регистрацию имени
Смотрите также
- Справочник модуля процессов - Полная документация API
- Каналы - Операции с каналами для обработки сообщений