Celery: when your API needs to do slow things in the background
Celery with Redis for running heavy tasks asynchronously: email sending, AI processing, file conversion. Setup with FastAPI and Docker Compose in 20 minutes.
Published: June 3, 2025
The user uploads a PDF. You want to extract the text with an AI model, generate a summary, and send them an email with the results. If you do all of that in the HTTP thread, the request stays open for 30-40 seconds. The proxy times out, the browser loses patience, the user clicks again and now you have two concurrent jobs running.
Celery solves this with three components: your API receives the request and responds immediately with “received”, Redis queues the tasks to be executed, the Celery worker processes them in the background. The user sees “processing”, then gets the email when it’s ready.
Architecture in 3 components
FastAPI → Redis (broker) → Celery Worker
│ │ │
receives task executes
request queue task
→ responds
immediately
FastAPI knows nothing about how the task is executed. Redis knows nothing about the domain. The worker knows nothing about HTTP. Each piece does one thing.
Minimal task: tasks.py
from celery import Celery
app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@app.task
def process_document(file_path: str, user_email: str):
# Call Ollama, process, send email
result = analyze_with_ai(file_path)
send_email(user_email, result)
return result
def analyze_with_ai(path: str) -> str:
from ollama import Client
client = Client()
with open(path) as f:
content = f.read()
response = client.chat(
model="qwen2.5:7b",
messages=[{"role": "user", "content": f"Summarize: {content[:4000]}"}]
)
return response['message']['content']
The @app.task decorator turns the function into a Celery task. broker is where tasks are queued. backend is where results are stored — needed if you want to check status afterwards.
FastAPI integration
from fastapi import FastAPI, UploadFile
from celery.result import AsyncResult
from tasks import process_document
import shutil, uuid
app = FastAPI()
@app.post("/upload")
async def upload(file: UploadFile, email: str):
# Save the file
file_id = str(uuid.uuid4())
file_path = f"/tmp/{file_id}_{file.filename}"
with open(file_path, "wb") as f:
shutil.copyfileobj(file.file, f)
# Queue and respond immediately
task = process_document.delay(file_path, email)
return {"task_id": task.id, "status": "processing"}
@app.get("/status/{task_id}")
async def get_status(task_id: str):
result = AsyncResult(task_id)
return {
"status": result.status, # PENDING, STARTED, SUCCESS, FAILURE
"result": result.result if result.ready() else None
}
.delay() is the call that queues the task and returns immediately. The frontend polls /status/{task_id} every 2-3 seconds until status becomes SUCCESS.
Docker Compose: 3 services together
services:
api:
build: .
ports:
- "8000:8000"
environment:
- REDIS_URL=redis://redis:6379/0
depends_on: [redis]
command: uvicorn main:app --host 0.0.0.0 --port 8000
worker:
build: .
environment:
- REDIS_URL=redis://redis:6379/0
depends_on: [redis]
command: celery -A tasks worker --loglevel=info --concurrency=2
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
volumes:
redis_data:
--concurrency=2 starts two worker processes in parallel. On a 4-core server with CPU-bound tasks, 2-3 workers is the right starting point. For I/O-bound tasks (HTTP requests, file reads), you can raise it to 8-10.
Monitoring with Flower
Flower is the web UI for Celery: see queued tasks, running tasks, failed tasks, average execution times.
flower:
image: mher/flower
command: celery flower --broker=redis://redis:6379/0 --port=5555
ports:
- "5555:5555"
depends_on: [redis]
Open http://localhost:5555. No extra configuration. When a task fails you see the full stack trace — essential in production.
What to do
- Use Celery only when the task takes more than 2-3 seconds or when you can’t afford to lose an operation if the server restarts — for short tasks, FastAPI’s built-in
BackgroundTasksis enough. - Add Flower immediately, even in development: understanding why a task is in
FAILUREwithout a UI means reading raw logs, which is slow and frustrating. - Set
CELERY_TASK_SERIALIZER = "json"and pass only serializable data to tasks (paths, IDs, strings) — never complex Python objects, or you’ll hit pickle compatibility issues between versions.