Salta al contenuto
AImpact
IT EN
AI per sviluppatori 6 min di lettura

Celery: quando le tue API devono fare cose lente in background

Celery con Redis per eseguire task pesanti in modo asincrono: invio email, elaborazione AI, conversione file. Setup con FastAPI e Docker Compose in 20 minuti.

Pubblicato: 3 giugno 2025

L’utente carica un PDF. Tu vuoi estrarne il testo con un modello AI, generare un riassunto, e mandargli un’email con i risultati. Se fai tutto nel thread HTTP, la richiesta resta aperta 30-40 secondi. Il proxy va in timeout, il browser si spazientisce, l’utente clicca di nuovo e hai due elaborazioni in corso.

Celery risolve questo con tre componenti: la tua API riceve la richiesta e risponde subito con “preso in carico”, Redis tiene in coda i task da eseguire, il worker Celery li processa in background. L’utente vede “in elaborazione”, poi riceve l’email quando è pronto.

Architettura in 3 componenti

FastAPI  →  Redis (broker)  →  Celery Worker
  │              │                   │
riceve         coda               esegue
richiesta      task               task
→ risponde
  subito

FastAPI non sa nulla di come il task viene eseguito. Redis non sa nulla del dominio. Il worker non sa nulla dell’HTTP. Ogni pezzo fa una cosa sola.

Task minimo: tasks.py

from celery import Celery

app = Celery(
    "tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0"
)

@app.task
def process_document(file_path: str, user_email: str):
    # Chiama Ollama, processa, manda email
    result = analyze_with_ai(file_path)
    send_email(user_email, result)
    return result

def analyze_with_ai(path: str) -> str:
    from ollama import Client
    client = Client()
    with open(path) as f:
        content = f.read()
    response = client.chat(
        model="qwen2.5:7b",
        messages=[{"role": "user", "content": f"Riassumi: {content[:4000]}"}]
    )
    return response['message']['content']

Il decorator @app.task trasforma la funzione in un task Celery. broker è dove i task vengono messi in coda. backend è dove vengono salvati i risultati — serve se vuoi interrogare lo stato dopo.

Integrazione con FastAPI

from fastapi import FastAPI, UploadFile
from celery.result import AsyncResult
from tasks import process_document
import shutil, uuid

app = FastAPI()

@app.post("/upload")
async def upload(file: UploadFile, email: str):
    # Salva il file
    file_id = str(uuid.uuid4())
    file_path = f"/tmp/{file_id}_{file.filename}"
    with open(file_path, "wb") as f:
        shutil.copyfileobj(file.file, f)

    # Metti in coda e rispondi subito
    task = process_document.delay(file_path, email)
    return {"task_id": task.id, "status": "in elaborazione"}

@app.get("/status/{task_id}")
async def get_status(task_id: str):
    result = AsyncResult(task_id)
    return {
        "status": result.status,       # PENDING, STARTED, SUCCESS, FAILURE
        "result": result.result if result.ready() else None
    }

.delay() è la chiamata che mette il task in coda e ritorna immediatamente. Il frontend fa polling su /status/{task_id} ogni 2-3 secondi finché status non diventa SUCCESS.

Docker Compose: i 3 servizi insieme

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - REDIS_URL=redis://redis:6379/0
    depends_on: [redis]
    command: uvicorn main:app --host 0.0.0.0 --port 8000

  worker:
    build: .
    environment:
      - REDIS_URL=redis://redis:6379/0
    depends_on: [redis]
    command: celery -A tasks worker --loglevel=info --concurrency=2

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data

volumes:
  redis_data:

--concurrency=2 avvia due processi worker in parallelo. Su un server con 4 core e task CPU-bound, 2-3 worker è il punto di partenza giusto. Per task I/O-bound (richieste HTTP, lettura file), puoi alzarlo a 8-10.

Monitoraggio con Flower

Flower è la UI web per Celery: vedi i task in coda, quelli in esecuzione, quelli falliti, i tempi medi.

  flower:
    image: mher/flower
    command: celery flower --broker=redis://redis:6379/0 --port=5555
    ports:
      - "5555:5555"
    depends_on: [redis]

Apri http://localhost:5555. Nessuna configurazione aggiuntiva. Se un task fallisce vedi lo stack trace completo — indispensabile in produzione.

Cosa fare

  • Usa Celery solo quando il task dura più di 2-3 secondi o quando non puoi permetterti di perdere un’operazione se il server si riavvia — per task brevi, BackgroundTasks di FastAPI è sufficiente.
  • Aggiungi Flower subito, anche in sviluppo: capire perché un task è in FAILURE senza UI significa leggere log grezzi, che è lento e frustrante.
  • Imposta CELERY_TASK_SERIALIZER = "json" e passa solo dati serializzabili ai task (path, ID, stringhe) — mai oggetti Python complessi, o avrai problemi di pickle tra versioni.