Asynchronous tasks and Celery: processing time-consuming background tasks

📂 Stage: Stage 5 - Advanced Advancement (Performance and Architecture) 🔗 Related chapters: Redis 集成 · RESTful API 开发


0. A real jamming scene

When uploading a profile picture to the blog backend last week, I conveniently uploaded a 12MB HEIC photo taken with an iPhone (converted to PNG and submitted). As a result, the front-end icon rotated for more than 3 seconds before I got the save link. I almost thought the interface was down. After checking the logs, I found that the culprit was PIL image compression - it took a full 2.8 seconds to process this image.

In fact, there are similar "time assassins" hidden in many web requests: compressing images, sending emails, exporting reports, calling third-party APIs... Once these operations are executed synchronously, the user will experience endless loading, and server resources will be slowly consumed.

The standard approach to solving this problem is to introduce asynchronous task queue Celery.


1. Core idea: Turn "waiting for others" into "return first, work later"

To sum it up in one sentence: Split the synchronous process of "the user must wait for the result" into an asynchronous process of "immediately return the acceptance receipt and work secretly in the background".

Taking the image upload mentioned earlier as an example, the modified process is as follows:

flowchart LR
    A[用户上传原图] --> B{Web 服务器接收}
    B --> C[保存临时原图]
    C --> D[提交 「压缩任务」到消息队列]
    D --> E[立即返回 「任务ID + 处理中状态」 给前端]
    F[Celery Worker 监听队列] --> G[拿到任务ID执行压缩]
    G --> H[删除临时图/更新数据库]
    E --> I[前端用任务ID轮询状态接口]
    H --> J[轮询接口返回「完成状态 + 压缩后链接」]

Compared with before the transformation, the change in experience is: From "waiting for 3 seconds" to "receiving submission feedback within 100ms"; for the server, the web process can be released immediately to process the next request, and will not be overwhelmed by blocked time-consuming tasks.


2. Infrastructure construction

We choose Redis to serve as both message middleware (Broker) and result storage (Backend). The reason is very practical: most projects already use Redis for caching, and there is no need to introduce RabbitMQ or Kafka separately for Celery, which will only increase maintenance complexity.

2.1 Install dependencies

Install directly in the virtual environment:

pip install celery redis pillow flask-mail

pillowandflask-mailIt is an additional library used in this example, increase or decrease as needed)

2.2 Minimize Celery configuration

Newapp/celery_app.py, separating the Celery instance can not only avoid circular imports, but also facilitate the management of the production environment:

# app/celery_app.py
from celery import Celery

# 1. 初始化 Celery 核心实例
celery_app = Celery(
    main="daoman",               # 项目名,会在日志和任务命名空间中用到
    broker="redis://localhost:6379/0",   # Broker:存放待处理的任务
    backend="redis://localhost:6379/1",  # Backend:存放任务结果和状态
)

# 2. 补充通用配置(生产环境建议抽取为独立配置类)
celery_app.conf.update(
    # 序列化格式:JSON 兼容性最好,但不能传函数、类等复杂对象
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    # 时区:与项目保持一致
    timezone="Asia/Shanghai",
    enable_utc=False,
    # 追踪任务开始状态
    task_track_started=True,
    # 超时保护,避免某个任务卡死 Worker
    task_time_limit=300,         # 硬超时:5 分钟后强制终止
    task_soft_time_limit=270,    # 软超时:4.5 分钟时抛出异常,可捕获处理
)

3. Write reusable background tasks

Celery tasks are essentially adding a decorator to ordinary functions. We can extract common time-consuming operations into independent task modules.

3.1 Image compression task

Newapp/tasks/image_tasks.py, with necessary fault-tolerance logic, such as processing damaged pictures, cleaning up failed deletions, etc.:

# app/tasks/image_tasks.py
import os
from PIL import Image
from PIL.UnidentifiedImageError import UnidentifiedImageError
from app.celery_app import celery_app

@celery_app.task(bind=True, name="tasks.compress_image")  # bind=True 能拿到 self(当前任务实例)
def compress_image(
    self,
    temp_path: str,
    output_path: str,
    quality: int = 85,
    max_width: int = 1920,
    max_height: int = 1080,
):
    """后台压缩图片:统一转 JPG、限制尺寸、删除临时原图"""
    try:
        # 1. 读取并转换格式
        with Image.open(temp_path) as img:
            # 统一转成 RGB(丢弃 Alpha 通道,适配 JPG)
            if img.mode in ("RGBA", "P"):
                img = img.convert("RGB")
            # 等比例缩放,不超过最大宽高
            img.thumbnail((max_width, max_height), Image.Resampling.LANCZOS)
            # 保存压缩后的图片
            img.save(output_path, "JPEG", quality=quality, optimize=True)

        # 2. 压缩成功后删除临时原图
        if os.path.exists(temp_path):
            os.remove(temp_path)

        # 3. 返回最终结果(会存入 Backend)
        return {"status": "success", "path": output_path}

    except UnidentifiedImageError as e:
        # 遇到损坏或非图片格式,5 秒后重试,最多 2 次
        self.retry(exc=e, countdown=5, max_retries=2)
    except Exception as e:
        # 其他错误(磁盘满、权限不足……)直接返回错误信息
        return {"status": "error", "msg": str(e)}

3.2 Send welcome email

Newapp/tasks/mail_tasks.py. Note that the Worker process and the Web process are separated, so when using extensions that require application context such as Flask-Mail, you must manually activate:

# app/tasks/mail_tasks.py
from flask import current_app
from flask_mail import Message
from app.celery_app import celery_app
from app.extensions import mail

@celery_app.task(name="tasks.send_welcome_email")
def send_welcome_email(user_email: str, username: str):
    """后台发送注册欢迎邮件"""
    try:
        with current_app.app_context():  # 手动激活 Flask 应用上下文
            msg = Message(
                subject=f"🎉 欢迎来到{current_app.config['SITE_NAME']}!",
                recipients=[user_email],
                body=f"亲爱的 {username}\n\n感谢注册道满博客!\n\n你可以开始发布文章、评论互动了~\n\n道满博客团队",
            )
            mail.send(msg)
        return {"status": "success", "to": user_email}
    except Exception as e:
        return {"status": "error", "msg": str(e)}

4. Call tasks in Web routing

Calling Celery tasks** cannot be usedfunction()**, so use:

  • function.delay()— Simplified version, only supports positional parameters and keyword parameters
  • function.apply_async()— Full version, you can set countdown, retry strategy, specify queue, etc.

4.1 Submit compression task

Suppose there is an upload blueprintapp/routes/upload.py

# app/routes/upload.py
import os, uuid
from flask import Blueprint, request, jsonify, current_app
from flask_login import login_required, current_user
from app.tasks.image_tasks import compress_image

upload_bp = Blueprint("upload", __name__, url_prefix="/api/upload")

@upload_bp.route("/avatar", methods=["POST"])
@login_required
def upload_avatar():
    # 1. 接收并保存临时原图
    file = request.files.get("avatar")
    if not file or not file.filename:
        return jsonify({"code": 400, "msg": "请上传头像"}), 400

    # 生成临时文件名(用 UUID 防止重复)
    temp_name = f"temp_{current_user.id}_{uuid.uuid4().hex}{os.path.splitext(file.filename)[0]}.png"
    temp_path = os.path.join(current_app.config["TEMP_UPLOAD_FOLDER"], temp_name)
    file.save(temp_path)

    # 2. 生成压缩后的文件名
    compressed_name = f"avatar_{current_user.id}_{uuid.uuid4().hex}.jpg"
    compressed_path = os.path.join(current_app.config["AVATAR_UPLOAD_FOLDER"], compressed_name)

    # 3. 提交后台压缩任务(delay 简化调用)
    task = compress_image.delay(
        temp_path=temp_path,
        output_path=compressed_path,
        quality=90,
        max_width=256,
        max_height=256,
    )

    # 4. 立即返回任务 ID 和当前状态
    return jsonify({
        "code": 200,
        "msg": "头像上传中,请稍后刷新",
        "data": {"task_id": task.id, "state": task.state},
    })

4.2 Query task status

After the front end gets the task ID, it can poll the following interface every 200ms~1s:

# app/routes/task.py
from flask import Blueprint, jsonify
from app.celery_app import celery_app

task_bp = Blueprint("task", __name__, url_prefix="/api/task")

@task_bp.route("/<task_id>", methods=["GET"])
def get_task_status(task_id):
    """查询任意任务的状态与结果"""
    task = celery_app.AsyncResult(task_id)

    resp = {
        "code": 200,
        "data": {
            "task_id": task.id,
            "state": task.state,   # PENDING / STARTED / SUCCESS / FAILURE / RETRY
        },
    }

    # 只有任务执行完毕后才返回结果
    if task.ready():
        resp["data"]["result"] = task.result

    return jsonify(resp)

5. Start the Celery service

Remember: Worker and Web processes must be started separately and cannot be squeezed into the same terminal.

5.1 Development environment

# 注意:Celery 5.x 官方不再支持 Windows,建议使用 WSL2 或 Docker
# 确实需要在 Windows 上调试,可降级到 Celery 4.4.7 + gevent
celery -A app.celery_app worker --loglevel=info --concurrency=2
  • -ASpecify the Celery instance location (will automatically findcelery_app
  • --loglevel=infoOutput details to facilitate debugging
  • --concurrency=2Start 2 Worker processes (generally set to 1 to 2 times the number of CPU cores)

5.2 Production environment

The production environment cannot be started directly from the terminal. It is recommended to use systemd (Linux) or Docker for daemonization. Here is a simple Docker Compose configuration snippet:

# docker-compose.yml
version: '3.8'
services:
  # 你的 Web 服务和 Redis 等省略...
  celery-worker:
    build: .
    command: celery -A app.celery_app worker --loglevel=warning --concurrency=4
    volumes:
      - ./uploads:/app/uploads   # 挂载上传目录,保证 Worker 能操作文件
    depends_on:
      - redis
    environment:
      - FLASK_ENV=production

6. Core process shorthand

A minimalist checklist to help you remember the entire process in your mind:

Celery 极简流程
1. 准备环境:安装 celery + redis
2. 抽离配置:单独建 celery_app.py
3. 封装任务:用 @celery_app.task 装饰普通函数
4. 提交任务:用 task.delay() 或 apply_async()
5. 启动监听:终端 / 守护进程启动 Worker
6. 查询结果:用 AsyncResult(task_id)

7. Situation Suitability & Pitfall Avoidance Guide

  • Picture, video, audio and other media processing
  • Batch data export (Excel, PDF)
  • Send notifications such as emails, text messages, and push notifications
  • Third-party API calls that take more than 1 second
  • Background statistics, data cleaning and other scheduled tasks

❌ Do not use background tasks in scenarios

  • Operations that must return results immediately (such as login verification, simple queries)
  • An operation with extremely simple logic and time-consuming < 50ms (submitting to Redis itself also has overhead)

🔗 Extended reading