Cola de Tareas
Construya una API REST que encola tareas para procesamiento en background con persistencia en base de datos.
Resumen
Este tutorial crea una API de gestión de tareas demostrando:
- Endpoints REST - POST tareas, GET resultados
- Publicación en cola - Despacho asíncrono de trabajos
- Consumidores de cola - Workers en background
- Persistencia en base de datos - Almacenamiento SQLite
- Migraciones - Proceso one-shot que termina
flowchart LR
subgraph api["Servidor HTTP"]
POST["/tasks POST"]
GET["/tasks GET"]
end
subgraph queue["Cola"]
Q[("cola de tareas")]
end
subgraph workers["Workers"]
W1["Consumidor 1"]
W2["Consumidor 2"]
end
subgraph storage["Almacenamiento"]
DB[(SQLite)]
end
POST -->|publish| Q
Q --> W1
Q --> W2
W1 -->|INSERT| DB
W2 -->|INSERT| DB
GET -->|SELECT| DB
Estructura del Proyecto
task-queue/
├── wippy.lock
└── src/
├── _index.yaml
├── migrate.lua
├── create_task.lua
├── list_tasks.lua
└── process_task.lua
Definiciones de Entradas
Cree src/_index.yaml:
version: "1.0"
namespace: app
entries:
# Base de datos SQLite
- name: db
kind: db.sql.sqlite
file: "./data/tasks.db"
lifecycle:
auto_start: true
# Driver de cola en memoria
- name: queue_driver
kind: queue.driver.memory
lifecycle:
auto_start: true
# Cola de tareas
- name: tasks_queue
kind: queue.queue
driver: app:queue_driver
# Servidor HTTP
- name: gateway
kind: http.service
addr: ":8080"
lifecycle:
auto_start: true
# Router
- name: router
kind: http.router
meta:
server: app:gateway
# Proceso de migración (ejecuta una vez, termina)
- name: migrate
kind: process.lua
source: file://migrate.lua
method: main
modules:
- sql
- logger
# Servicio de migración (auto-inicia, termina al éxito)
- name: migrate-service
kind: process.service
process: app:migrate
host: app:processes
lifecycle:
auto_start: true
# Process host
- name: processes
kind: process.host
lifecycle:
auto_start: true
# Manejadores de API
- name: create_task
kind: function.lua
source: file://create_task.lua
method: handler
modules:
- http
- queue
- uuid
- name: list_tasks
kind: function.lua
source: file://list_tasks.lua
method: handler
modules:
- http
- sql
# Worker de cola
- name: process_task
kind: function.lua
source: file://process_task.lua
method: main
modules:
- sql
- logger
- json
# Endpoints
- name: create_task.endpoint
kind: http.endpoint
meta:
router: app:router
method: POST
path: /tasks
func: app:create_task
- name: list_tasks.endpoint
kind: http.endpoint
meta:
router: app:router
method: GET
path: /tasks
func: app:list_tasks
# Consumidor de cola
- name: task_consumer
kind: queue.consumer
queue: app:tasks_queue
func: app:process_task
concurrency: 2
prefetch: 5
lifecycle:
auto_start: true
Proceso de Migración
Cree src/migrate.lua:
local sql = require("sql")
local logger = require("logger")
local function main()
local db, err = sql.get("app:db")
if err then
logger:error("failed to connect", {error = tostring(err)})
return 1
end
local _, exec_err = db:execute([[
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
payload TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
result TEXT,
created_at INTEGER NOT NULL,
processed_at INTEGER
)
]])
db:release()
if exec_err then
logger:error("migration failed", {error = tostring(exec_err)})
return 1
end
logger:info("migration complete")
return 0
end
return { main = main }
Endpoint Crear Tarea
Cree src/create_task.lua:
local http = require("http")
local queue = require("queue")
local uuid = require("uuid")
local function handler()
local req = http.request()
local res = http.response()
local body, parse_err = req:body_json()
if parse_err then
res:set_status(http.STATUS.BAD_REQUEST)
res:write_json({error = "JSON inválido"})
return
end
if not body.action then
res:set_status(http.STATUS.BAD_REQUEST)
res:write_json({error = "action requerido"})
return
end
local task_id = uuid.v4()
local task = {
id = task_id,
action = body.action,
data = body.data or {},
created_at = os.time()
}
local ok, err = queue.publish("app:tasks_queue", task)
if err then
res:set_status(http.STATUS.INTERNAL_ERROR)
res:write_json({error = "falló al encolar tarea"})
return
end
res:set_status(http.STATUS.ACCEPTED)
res:write_json({
id = task_id,
status = "queued"
})
end
return { handler = handler }
Endpoint Listar Tareas
Cree src/list_tasks.lua:
local http = require("http")
local sql = require("sql")
local function handler()
local req = http.request()
local res = http.response()
local db, db_err = sql.get("app:db")
if db_err then
res:set_status(http.STATUS.INTERNAL_ERROR)
res:write_json({error = "base de datos no disponible"})
return
end
local status_filter = req:query("status")
local query = sql.builder.select("id", "payload", "status", "result", "created_at", "processed_at")
:from("tasks")
:order_by("created_at DESC")
:limit(100)
if status_filter then
query = query:where({status = status_filter})
end
local rows, query_err = query:run_with(db):query()
db:release()
if query_err then
res:set_status(http.STATUS.INTERNAL_ERROR)
res:write_json({error = "query falló"})
return
end
res:set_status(http.STATUS.OK)
res:write_json({
tasks = rows,
count = #rows
})
end
return { handler = handler }
Worker de Cola
Cree src/process_task.lua:
local sql = require("sql")
local logger = require("logger")
local json = require("json")
local function main(task)
logger:info("processing task", {
id = task.id,
action = task.action
})
local result
if task.action == "uppercase" then
result = {output = string.upper(task.data.text or "")}
elseif task.action == "sum" then
local nums = task.data.numbers or {}
local total = 0
for _, n in ipairs(nums) do
total = total + n
end
result = {output = total}
else
result = {output = "processed"}
end
local db, db_err = sql.get("app:db")
if db_err then
error("database unavailable: " .. tostring(db_err))
end
local _, exec_err = db:execute(
"INSERT OR REPLACE INTO tasks (id, payload, status, result, created_at, processed_at) VALUES (?, ?, ?, ?, ?, ?)",
{ task.id, json.encode(task), "completed", json.encode(result), task.created_at, os.time() }
)
db:release()
if exec_err then
error("failed to store result: " .. tostring(exec_err))
end
logger:info("task completed", {id = task.id})
end
return { main = main }
Ejecutando el Servicio
Inicializar y ejecutar:
mkdir -p data
wippy init
wippy run
Probar la API:
# Crear una tarea
curl -X POST http://localhost:8080/tasks \
-H "Content-Type: application/json" \
-d '{"action": "uppercase", "data": {"text": "hello world"}}'
# Respuesta: {"id": "550e8400-...", "status": "queued"}
# Esperar un momento para procesamiento, luego listar tareas
curl http://localhost:8080/tasks
# Respuesta: {"tasks": [...], "count": 1}
# Filtrar por status
curl "http://localhost:8080/tasks?status=completed"
Flujo de Mensajes
- POST /tasks recibe solicitud, genera UUID, publica a cola
- Consumidor de cola toma mensaje (2 workers concurrentes)
- Worker procesa tarea, escribe resultado a SQLite
- GET /tasks lee tareas completadas desde base de datos
Siguientes Pasos
- Módulo HTTP - Manejo de request/response
- Módulo Queue - Operaciones de cola de mensajes
- Módulo SQL - Acceso a base de datos
- Consumidores de Cola - Configuración de colas