Complete Guide to FastAPIasync-task-queue-celery

📂 Stage: Stage 6 - 2026 Featured Topics (AI Integration) 🔗 Related chapters: 流式响应 StreamingResponse · Redis 集成


Asynchronous task queue: why do we need it?

Suppose your user request hides these operations:

  • 🎨AI generates avatars in one second, and it takes 5 seconds to load the model
  • 📧 Send 5000 weekly emails in batches
  • 🎥 Transcode a 10GB HD video

If you use ordinaryasync defOr synchronous function processing, what happens?

  • The user stares at a blank browser and waits, or even reports an error after timeout
  • Server resources are heavily occupied by a single request, and subsequent requests are queued up.
  • Once you fail, you won’t even be able to try again.

Asynchronous task queue was born to solve this type of "time-consuming/high-risk operations". Its core idea is very simple: Extract complex work from the request thread and hand it over to the background's full-time worker (Worker) to slowly digest.

Use Celery + FastAPI to implement this process, which is roughly like this:

sequenceDiagram
    participant User as 用户
    participant FastAPI as FastAPI(Producer)
    participant Broker as Broker(Redis/RabbitMQ)
    participant Worker as Celery Worker
    participant Backend as Backend(Redis)
    
    User->>FastAPI: 生成一张动漫头像
    FastAPI->>Broker: 提交任务到队列
    FastAPI->>User: 立即返回「任务ID + 预估完成时间」
    Broker->>Worker: 分配空闲Worker
    Worker->>Backend: 存储进度/结果
    User->>FastAPI: 轮询或WebSocket查询状态
    FastAPI->>Backend: 获取最新状态
    Backend->>FastAPI: 返回头像URL
    FastAPI->>User: 展示结果

During the entire process, the user only submitted a task description and immediately received a "pickup order number". They could inquire about the progress at any time without having to wait. The server-side FastAPI application always remains lightweight and responsive.


Get started quickly: Set up the infrastructure in three steps

1. Install core dependencies

Using Redis as the Broker (message broker) and Backend (result storage) of the task queue is currently the most concise and efficient way:

pip install fastapi uvicorn celery[redis] redis

incelery[redis]All components required for Celery to interact with Redis are automatically installed.

2. Write a Celery application and a sample task

Create filecelery_app.py, which is the brain of the entire background task:

# celery_app.py
from celery import Celery
import time

# 创建 Celery 实例,broker 和 backend 可以分别指定不同的 Redis 数据库
celery = Celery(
    'daoman_fastapi_celery',
    broker='redis://localhost:6379/0',   # 任务队列存储在 Redis 0 号库
    backend='redis://localhost:6379/1'   # 结果存放在 Redis 1 号库(隔离,便于管理)
)

# 定义一个模拟邮件发送任务
@celery.task(bind=True, name="tasks.send_email")
def send_email(self, to: str, subject: str, body: str):
    """
    异步发送邮件,附带进度上报与自动重试
    """
    try:
        # 更新任务状态为 PROGRESS,并告知当前步骤
        self.update_state(state='PROGRESS', meta={'step': 'preparing'})
        time.sleep(1)   # 模拟准备阶段耗时

        self.update_state(state='PROGRESS', meta={'step': 'sending'})
        time.sleep(2)   # 模拟实际发送

        # 成功完成,返回结果字典
        return {'status': 'success', 'to': to, 'time': time.time()}
    except Exception as exc:
        # 失败时自动重试:延迟60秒,最多重试3次
        raise self.retry(exc=exc, countdown=60, max_retries=3)

3. Integrate FastAPI and start the entire service

Newmain.py, providing two interfaces: submitting tasks and querying status.

# main.py
from fastapi import FastAPI, HTTPException
from celery.result import AsyncResult
from celery_app import celery, send_email
from pydantic import BaseModel, EmailStr

app = FastAPI(title="道满FastAPI-Celery教程")

# 请求体模型,利用 Pydantic 自动校验入参
class EmailReq(BaseModel):
    to: EmailStr
    subject: str
    body: str

@app.post("/tasks/email")
async def create_email_task(req: EmailReq):
    # 将任务推入队列,传入收件人、主题、正文
    task = send_email.delay(req.to, req.subject, req.body)
    return {
        "task_id": task.id,
        "status": "submitted",
        "message": "邮件已进入队列,预计3秒左右完成"
    }

@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
    task = AsyncResult(task_id, app=celery)
    # 根据任务状态返回不同的信息
    return {
        "task_id": task_id,
        "state": task.state,
        "info": task.info if task.state in ['PROGRESS', 'FAILURE'] else None,
        "result": task.result if task.state == 'SUCCESS' else None
    }

Start three processes in sequence (it is recommended to use three terminal windows for local debugging):

  1. Start the Redis service (if not already started):

    redis-server
  2. Start Celery Worker and specify the number of concurrencies:

    celery -A celery_app worker --loglevel=INFO --concurrency=4

    💡 --concurrency=4Indicates processing 4 tasks at the same time, which can be adjusted according to the number of CPU cores.

  3. Start the FastAPI application (enable hot reloading to facilitate development):

    uvicorn main:app --reload

At this time, visithttp://127.0.0.1:8000/docsYou can see the Swagger document, and you can directly test the submission task and query status.


Core Advanced: Enterprise-level common configurations

Although the previous example can run, the production environment requires more details to be considered: timeouts, retry strategies, queue isolation, memory leak protection, etc. We've compiled these best practices into a unified configuration class.

1. Complete Celery configuration file

Newconfig/celery_config.py

# config/celery_config.py
from kombu import Queue
import os

class CeleryConfig:
    # ---------- Broker & Backend ----------
    BROKER_URL = os.getenv('CELERY_BROKER', 'redis://localhost:6379/0')
    RESULT_BACKEND = os.getenv('CELERY_BACKEND', 'redis://localhost:6379/1')
    
    # ---------- 序列化方式 ----------
    TASK_SERIALIZER = 'json'
    ACCEPT_CONTENT = ['json']
    RESULT_SERIALIZER = 'json'
    
    # ---------- 时区 ----------
    TIMEZONE = 'Asia/Shanghai'
    ENABLE_UTC = False
    
    # ---------- 任务执行控制 ----------
    TASK_TRACK_STARTED = True          # 开启 STARTED 状态跟踪
    TASK_TIME_LIMIT = 300               # 硬超时5分钟(强制杀死子进程)
    TASK_SOFT_TIME_LIMIT = 240          # 软超时4分钟(会抛出 SoftTimeLimitExceeded 异常,可在任务内处理)
    
    # ---------- 重试策略 ----------
    TASK_RETRY_BACKOFF = True           # 指数退避:重试间隔逐渐增加,防止雪崩
    TASK_RETRY_BACKOFF_MAX = 700        # 最大退避间隔700秒
    TASK_RETRY_JITTER = True            # 随机抖动:避免大量任务在同一时刻集中重试
    
    # ---------- Worker 优化 ----------
    WORKER_PREFETCH_MULTIPLIER = 1      # 每个 Worker 一次只预取1个任务,防止任务积压在某个 Worker 上
    WORKER_MAX_TASKS_PER_CHILD = 1000   # 子进程处理1000个任务后重启,避免内存泄漏
    WORKER_DISABLE_RATE_LIMITS = True   # 在应用层不限制速率(利用 Redis 更快)
    
    # ---------- 队列路由:按任务类型分流 ----------
    TASK_QUEUES = (
        Queue('default', routing_key='default'),
        Queue('ai_tasks', routing_key='ai.#'),
        Queue('email_tasks', routing_key='email.#'),
    )
    TASK_ROUTES = {
        'tasks.ai.*': {'queue': 'ai_tasks'},      # AI相关任务进入 ai_tasks 队列
        'tasks.email.*': {'queue': 'email_tasks'}, # 邮件任务进入 email_tasks 队列
    }

existcelery_app.pyIntroduce configuration in:

from config.celery_config import CeleryConfig

celery = Celery('daoman_fastapi_celery')
celery.config_from_object(CeleryConfig)

This gives your Celery production-grade reliability and flexibility.

2. Scheduled task configuration (Celery Beat)

Many businesses need to perform tasks on a regular basis, such as cleaning up temporary files every early morning and checking queue health status every 15 minutes. This can be achieved through Celery Beat + scheduled scheduling.

existcelery_beat.pyConfigure beats in:

# celery_beat.py
from celery.schedules import crontab
from celery_app import celery
import os

celery.conf.beat_schedule = {
    # 每天凌晨3点清理过期临时文件
    "cleanup-temp-files": {
        "task": "tasks.maintenance.cleanup_temp",
        "schedule": crontab(hour=3, minute=0),
        "options": {"queue": "maintenance"}
    },
    # 每15分钟检查队列健康状态
    "check-queue-health": {
        "task": "tasks.monitoring.check_queue",
        "schedule": crontab(minute="*/15"),
        "options": {"queue": "monitoring"}
    }
}

Start the Beat service (requires a separate process):

celery -A celery_app beat --loglevel=INFO

⚠️ Note: Do not share the same process with Worker. Beat is only responsible for sending tasks on time, and Worker is responsible for execution.


Key points of production environment

1. Monitoring and alarming——Flower

Celery officially provides an extremely easy-to-use real-time monitoring panel Flower. You can intuitively see the status of all tasks, Worker load, success rate and other key indicators.

pip install flower
celery -A celery_app flower --port=5555 --basic_auth=admin:你的强密码

Access after startuphttp://your-server:5555, enter the username and password you set to see the overall task. It is strongly recommended to use a reverse proxy (such as Nginx) and enable HTTPS in the production environment.

2. Docker deployment

Containerized deployment can keep the Worker environment unified and expand quickly. An example of a safe Dockerfile is as follows:

# Dockerfile.worker
FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .

# 创建非 root 用户,提升安全性
RUN useradd -m celery
USER celery

# 启动命令,指定监听的队列
CMD ["celery", "-A", "celery_app", "worker", \
     "--loglevel=INFO", "--queues=default,ai_tasks"]

💡 passed--queuesParameters, you can let specialized Workers only consume certain queues, for example, let machines with GPUs only runai_tasks


Summarize

Celery is the most mature and flexible asynchronous task queue solution in the FastAPI ecosystem. Its value can be condensed into:

  • ✅ Completely decouple user requests from time-consuming operations
  • ✅ Worker supports horizontal expansion and can easily increase or decrease with traffic
  • ✅ Powerful retry, dead letter queue, and scheduled task functions
  • ✅ Mature monitoring tools (Flower / Prometheus integration)
  • ✅ Integration with FastAPI is elegant and natural

When you face scenarios such as AI reasoning, image/video processing, batch emails, report generation, etc., the combination of Celery + FastAPI will get you twice the result with half the effort.


FAQ

Q1: The task has not been executed after it was submitted?

Possible reasons:

  • The Redis service is not started, or the Broker address connected by the Worker is incorrect
  • The Worker multi-queue configuration is wrong. The task goes to a certain queue but no Worker is listening.
  • The task routing key was written incorrectly, causing the task to be discarded toceleryDefault queue, while Worker only listens to specific queues

Quick troubleshooting:

  1. Check whether it appears in the Worker logreceived taskwords
  2. Use the Flower panel to check queue accumulation
  3. Temporarily change all Workers to listendefaultqueue test

Q2: How to implement task priority?

Celery does not have a direct task priority field natively, but it can be indirectly implemented through multiple queues + different numbers of Workers:

  • Create one for high priority taskshigh_priorityqueue
  • Allocate more Worker processes or independent high-configuration servers to the queue
  • Low-priority queues can be consumed later or even flow-limited

Another more granular control method is through RedisBRPOPCooperateLPUSHManually insert the head of the queue, but over-reliance is generally not recommended. The multi-queue solution is sufficient for most scenarios.

Q3: Can the task be retried after timeout?

  • soft timeout (SOFT_TIME_LIMIT): Can be captured within the missionSoftTimeLimitExceededException, decide whether to retry or clean up.
  • Hard timeout (TIME_LIMIT): operating system directlySIGKILLKilling the child process is not aware of the child process and therefore cannot be retried within the task. But in this case, Celery itself will consider the task failed. You can configureautoretry_forOr use a global retry policy to let the Worker retry automatically.

It is recommended to always set a hard timeout that is slightly longer than the soft timeout as a last resort.


🔗 Related tutorials