Celery: quando le tue API devono fare cose lente in background
Celery con Redis per eseguire task pesanti in modo asincrono: invio email, elaborazione AI, conversione file. Setup con FastAPI e Docker Compose in 20 minuti.
Pubblicato: 3 giugno 2025
L’utente carica un PDF. Tu vuoi estrarne il testo con un modello AI, generare un riassunto, e mandargli un’email con i risultati. Se fai tutto nel thread HTTP, la richiesta resta aperta 30-40 secondi. Il proxy va in timeout, il browser si spazientisce, l’utente clicca di nuovo e hai due elaborazioni in corso.
Celery risolve questo con tre componenti: la tua API riceve la richiesta e risponde subito con “preso in carico”, Redis tiene in coda i task da eseguire, il worker Celery li processa in background. L’utente vede “in elaborazione”, poi riceve l’email quando è pronto.
Architettura in 3 componenti
FastAPI → Redis (broker) → Celery Worker
│ │ │
riceve coda esegue
richiesta task task
→ risponde
subito
FastAPI non sa nulla di come il task viene eseguito. Redis non sa nulla del dominio. Il worker non sa nulla dell’HTTP. Ogni pezzo fa una cosa sola.
Task minimo: tasks.py
from celery import Celery
app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@app.task
def process_document(file_path: str, user_email: str):
# Chiama Ollama, processa, manda email
result = analyze_with_ai(file_path)
send_email(user_email, result)
return result
def analyze_with_ai(path: str) -> str:
from ollama import Client
client = Client()
with open(path) as f:
content = f.read()
response = client.chat(
model="qwen2.5:7b",
messages=[{"role": "user", "content": f"Riassumi: {content[:4000]}"}]
)
return response['message']['content']
Il decorator @app.task trasforma la funzione in un task Celery. broker è dove i task vengono messi in coda. backend è dove vengono salvati i risultati — serve se vuoi interrogare lo stato dopo.
Integrazione con FastAPI
from fastapi import FastAPI, UploadFile
from celery.result import AsyncResult
from tasks import process_document
import shutil, uuid
app = FastAPI()
@app.post("/upload")
async def upload(file: UploadFile, email: str):
# Salva il file
file_id = str(uuid.uuid4())
file_path = f"/tmp/{file_id}_{file.filename}"
with open(file_path, "wb") as f:
shutil.copyfileobj(file.file, f)
# Metti in coda e rispondi subito
task = process_document.delay(file_path, email)
return {"task_id": task.id, "status": "in elaborazione"}
@app.get("/status/{task_id}")
async def get_status(task_id: str):
result = AsyncResult(task_id)
return {
"status": result.status, # PENDING, STARTED, SUCCESS, FAILURE
"result": result.result if result.ready() else None
}
.delay() è la chiamata che mette il task in coda e ritorna immediatamente. Il frontend fa polling su /status/{task_id} ogni 2-3 secondi finché status non diventa SUCCESS.
Docker Compose: i 3 servizi insieme
services:
api:
build: .
ports:
- "8000:8000"
environment:
- REDIS_URL=redis://redis:6379/0
depends_on: [redis]
command: uvicorn main:app --host 0.0.0.0 --port 8000
worker:
build: .
environment:
- REDIS_URL=redis://redis:6379/0
depends_on: [redis]
command: celery -A tasks worker --loglevel=info --concurrency=2
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
volumes:
redis_data:
--concurrency=2 avvia due processi worker in parallelo. Su un server con 4 core e task CPU-bound, 2-3 worker è il punto di partenza giusto. Per task I/O-bound (richieste HTTP, lettura file), puoi alzarlo a 8-10.
Monitoraggio con Flower
Flower è la UI web per Celery: vedi i task in coda, quelli in esecuzione, quelli falliti, i tempi medi.
flower:
image: mher/flower
command: celery flower --broker=redis://redis:6379/0 --port=5555
ports:
- "5555:5555"
depends_on: [redis]
Apri http://localhost:5555. Nessuna configurazione aggiuntiva. Se un task fallisce vedi lo stack trace completo — indispensabile in produzione.
Cosa fare
- Usa Celery solo quando il task dura più di 2-3 secondi o quando non puoi permetterti di perdere un’operazione se il server si riavvia — per task brevi,
BackgroundTasksdi FastAPI è sufficiente. - Aggiungi Flower subito, anche in sviluppo: capire perché un task è in
FAILUREsenza UI significa leggere log grezzi, che è lento e frustrante. - Imposta
CELERY_TASK_SERIALIZER = "json"e passa solo dati serializzabili ai task (path, ID, stringhe) — mai oggetti Python complessi, o avrai problemi di pickle tra versioni.