Fila de Tarefas
Construa uma REST API que enfileira tarefas para processamento em background com persistência em banco de dados.
Visão Geral
Este tutorial cria uma API de gerenciamento de tarefas demonstrando:
- Endpoints REST - POST tarefas, GET resultados
- Publicação em fila - Despacho assíncrono de jobs
- Consumidores de fila - Workers em background
- Persistência em banco - Armazenamento SQLite
- Migrações - Processo único que termina
flowchart LR
subgraph api["HTTP Server"]
POST["/tasks POST"]
GET["/tasks GET"]
end
subgraph queue["Queue"]
Q[("tasks queue")]
end
subgraph workers["Workers"]
W1["Consumer 1"]
W2["Consumer 2"]
end
subgraph storage["Storage"]
DB[(SQLite)]
end
POST -->|publish| Q
Q --> W1
Q --> W2
W1 -->|INSERT| DB
W2 -->|INSERT| DB
GET -->|SELECT| DB
Estrutura do Projeto
task-queue/
├── wippy.lock
└── src/
├── _index.yaml
├── migrate.lua
├── create_task.lua
├── list_tasks.lua
└── process_task.lua
Definições de Entradas
Crie src/_index.yaml:
version: "1.0"
namespace: app
entries:
# Banco de dados SQLite
- name: db
kind: db.sql.sqlite
file: "./data/tasks.db"
lifecycle:
auto_start: true
# Driver de fila em memória
- name: queue_driver
kind: queue.driver.memory
lifecycle:
auto_start: true
# Fila de tarefas
- name: tasks_queue
kind: queue.queue
driver: app:queue_driver
# Servidor HTTP
- name: gateway
kind: http.service
addr: ":8080"
lifecycle:
auto_start: true
# Router
- name: router
kind: http.router
meta:
server: app:gateway
# Processo de migração (executa uma vez, termina)
- name: migrate
kind: process.lua
source: file://migrate.lua
method: main
modules:
- sql
- logger
# Serviço de migração (auto-inicia, termina em sucesso)
- name: migrate-service
kind: process.service
process: app:migrate
host: app:processes
lifecycle:
auto_start: true
# Host de processos
- name: processes
kind: process.host
lifecycle:
auto_start: true
# Handlers da API
- name: create_task
kind: function.lua
source: file://create_task.lua
method: handler
modules:
- http
- queue
- uuid
- name: list_tasks
kind: function.lua
source: file://list_tasks.lua
method: handler
modules:
- http
- sql
# Worker da fila
- name: process_task
kind: function.lua
source: file://process_task.lua
method: main
modules:
- sql
- logger
- json
# Endpoints
- name: create_task.endpoint
kind: http.endpoint
meta:
router: app:router
method: POST
path: /tasks
func: app:create_task
- name: list_tasks.endpoint
kind: http.endpoint
meta:
router: app:router
method: GET
path: /tasks
func: app:list_tasks
# Consumidor da fila
- name: task_consumer
kind: queue.consumer
queue: app:tasks_queue
func: app:process_task
concurrency: 2
prefetch: 5
lifecycle:
auto_start: true
Processo de Migração
Crie src/migrate.lua:
local sql = require("sql")
local logger = require("logger")
local function main()
local db, err = sql.get("app:db")
if err then
logger:error("failed to connect", {error = tostring(err)})
return 1
end
local _, exec_err = db:execute([[
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
payload TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
result TEXT,
created_at INTEGER NOT NULL,
processed_at INTEGER
)
]])
db:release()
if exec_err then
logger:error("migration failed", {error = tostring(exec_err)})
return 1
end
logger:info("migration complete")
return 0
end
return { main = main }
Endpoint de Criação de Tarefa
Crie src/create_task.lua:
local http = require("http")
local queue = require("queue")
local uuid = require("uuid")
local function handler()
local req = http.request()
local res = http.response()
local body, parse_err = req:body_json()
if parse_err then
res:set_status(http.STATUS.BAD_REQUEST)
res:write_json({error = "invalid JSON"})
return
end
if not body.action then
res:set_status(http.STATUS.BAD_REQUEST)
res:write_json({error = "action required"})
return
end
local task_id = uuid.v4()
local task = {
id = task_id,
action = body.action,
data = body.data or {},
created_at = os.time()
}
local ok, err = queue.publish("app:tasks_queue", task)
if err then
res:set_status(http.STATUS.INTERNAL_ERROR)
res:write_json({error = "failed to queue task"})
return
end
res:set_status(http.STATUS.ACCEPTED)
res:write_json({
id = task_id,
status = "queued"
})
end
return { handler = handler }
Endpoint de Listagem de Tarefas
Crie src/list_tasks.lua:
local http = require("http")
local sql = require("sql")
local function handler()
local req = http.request()
local res = http.response()
local db, db_err = sql.get("app:db")
if db_err then
res:set_status(http.STATUS.INTERNAL_ERROR)
res:write_json({error = "database unavailable"})
return
end
local status_filter = req:query("status")
local query = sql.builder.select("id", "payload", "status", "result", "created_at", "processed_at")
:from("tasks")
:order_by("created_at DESC")
:limit(100)
if status_filter then
query = query:where({status = status_filter})
end
local rows, query_err = query:run_with(db):query()
db:release()
if query_err then
res:set_status(http.STATUS.INTERNAL_ERROR)
res:write_json({error = "query failed"})
return
end
res:set_status(http.STATUS.OK)
res:write_json({
tasks = rows,
count = #rows
})
end
return { handler = handler }
Worker da Fila
Crie src/process_task.lua:
local sql = require("sql")
local logger = require("logger")
local json = require("json")
local function main(task)
logger:info("processing task", {
id = task.id,
action = task.action
})
local result
if task.action == "uppercase" then
result = {output = string.upper(task.data.text or "")}
elseif task.action == "sum" then
local nums = task.data.numbers or {}
local total = 0
for _, n in ipairs(nums) do
total = total + n
end
result = {output = total}
else
result = {output = "processed"}
end
local db, db_err = sql.get("app:db")
if db_err then
error("database unavailable: " .. tostring(db_err))
end
local _, exec_err = db:execute(
"INSERT OR REPLACE INTO tasks (id, payload, status, result, created_at, processed_at) VALUES (?, ?, ?, ?, ?, ?)",
{ task.id, json.encode(task), "completed", json.encode(result), task.created_at, os.time() }
)
db:release()
if exec_err then
error("failed to store result: " .. tostring(exec_err))
end
logger:info("task completed", {id = task.id})
end
return { main = main }
Executando o Serviço
Inicialize e execute:
mkdir -p data
wippy init
wippy run
Teste a API:
# Criar uma tarefa
curl -X POST http://localhost:8080/tasks \
-H "Content-Type: application/json" \
-d '{"action": "uppercase", "data": {"text": "hello world"}}'
# Resposta: {"id": "550e8400-...", "status": "queued"}
# Aguarde um momento para processamento, depois liste as tarefas
curl http://localhost:8080/tasks
# Resposta: {"tasks": [...], "count": 1}
# Filtrar por status
curl "http://localhost:8080/tasks?status=completed"
Fluxo de Mensagens
- POST /tasks recebe requisição, gera UUID, publica na fila
- Consumidor da fila pega a mensagem (2 workers concorrentes)
- Worker processa tarefa, escreve resultado no SQLite
- GET /tasks lê tarefas completadas do banco de dados
Próximos Passos
- HTTP Module - Tratamento de request/response
- Queue Module - Operações de fila de mensagens
- SQL Module - Acesso a banco de dados
- Queue Consumers - Configuração de filas