Complete Guide to FastAPIasync-task-queue-celery
📂 Stage: Stage 6 - 2026 Featured Topics (AI Integration) 🔗 Related chapters: 流式响应 StreamingResponse · Redis 集成
Asynchronous task queue: why do we need it?
Suppose your user request hides these operations:
- 🎨AI generates avatars in one second, and it takes 5 seconds to load the model
- 📧 Send 5000 weekly emails in batches
- 🎥 Transcode a 10GB HD video
If you use ordinaryasync defOr synchronous function processing, what happens?
- The user stares at a blank browser and waits, or even reports an error after timeout
- Server resources are heavily occupied by a single request, and subsequent requests are queued up.
- Once you fail, you won’t even be able to try again.
Asynchronous task queue was born to solve this type of "time-consuming/high-risk operations". Its core idea is very simple: Extract complex work from the request thread and hand it over to the background's full-time worker (Worker) to slowly digest.
Use Celery + FastAPI to implement this process, which is roughly like this:
During the entire process, the user only submitted a task description and immediately received a "pickup order number". They could inquire about the progress at any time without having to wait. The server-side FastAPI application always remains lightweight and responsive.
Get started quickly: Set up the infrastructure in three steps
1. Install core dependencies
Using Redis as the Broker (message broker) and Backend (result storage) of the task queue is currently the most concise and efficient way:
incelery[redis]All components required for Celery to interact with Redis are automatically installed.
2. Write a Celery application and a sample task
Create filecelery_app.py, which is the brain of the entire background task:
3. Integrate FastAPI and start the entire service
Newmain.py, providing two interfaces: submitting tasks and querying status.
Start three processes in sequence (it is recommended to use three terminal windows for local debugging):
-
Start the Redis service (if not already started):
-
Start Celery Worker and specify the number of concurrencies:
💡
--concurrency=4Indicates processing 4 tasks at the same time, which can be adjusted according to the number of CPU cores. -
Start the FastAPI application (enable hot reloading to facilitate development):
At this time, visithttp://127.0.0.1:8000/docsYou can see the Swagger document, and you can directly test the submission task and query status.
Core Advanced: Enterprise-level common configurations
Although the previous example can run, the production environment requires more details to be considered: timeouts, retry strategies, queue isolation, memory leak protection, etc. We've compiled these best practices into a unified configuration class.
1. Complete Celery configuration file
Newconfig/celery_config.py:
existcelery_app.pyIntroduce configuration in:
This gives your Celery production-grade reliability and flexibility.
2. Scheduled task configuration (Celery Beat)
Many businesses need to perform tasks on a regular basis, such as cleaning up temporary files every early morning and checking queue health status every 15 minutes. This can be achieved through Celery Beat + scheduled scheduling.
existcelery_beat.pyConfigure beats in:
Start the Beat service (requires a separate process):
⚠️ Note: Do not share the same process with Worker. Beat is only responsible for sending tasks on time, and Worker is responsible for execution.
Key points of production environment
1. Monitoring and alarming——Flower
Celery officially provides an extremely easy-to-use real-time monitoring panel Flower. You can intuitively see the status of all tasks, Worker load, success rate and other key indicators.
Access after startuphttp://your-server:5555, enter the username and password you set to see the overall task.
It is strongly recommended to use a reverse proxy (such as Nginx) and enable HTTPS in the production environment.
2. Docker deployment
Containerized deployment can keep the Worker environment unified and expand quickly. An example of a safe Dockerfile is as follows:
💡 passed
--queuesParameters, you can let specialized Workers only consume certain queues, for example, let machines with GPUs only runai_tasks。
Summarize
Celery is the most mature and flexible asynchronous task queue solution in the FastAPI ecosystem. Its value can be condensed into:
- ✅ Completely decouple user requests from time-consuming operations
- ✅ Worker supports horizontal expansion and can easily increase or decrease with traffic
- ✅ Powerful retry, dead letter queue, and scheduled task functions
- ✅ Mature monitoring tools (Flower / Prometheus integration)
- ✅ Integration with FastAPI is elegant and natural
When you face scenarios such as AI reasoning, image/video processing, batch emails, report generation, etc., the combination of Celery + FastAPI will get you twice the result with half the effort.
FAQ
Q1: The task has not been executed after it was submitted?
Possible reasons:
- The Redis service is not started, or the Broker address connected by the Worker is incorrect
- The Worker multi-queue configuration is wrong. The task goes to a certain queue but no Worker is listening.
- The task routing key was written incorrectly, causing the task to be discarded to
celeryDefault queue, while Worker only listens to specific queues
Quick troubleshooting:
- Check whether it appears in the Worker log
received taskwords - Use the Flower panel to check queue accumulation
- Temporarily change all Workers to listen
defaultqueue test
Q2: How to implement task priority?
Celery does not have a direct task priority field natively, but it can be indirectly implemented through multiple queues + different numbers of Workers:
- Create one for high priority tasks
high_priorityqueue - Allocate more Worker processes or independent high-configuration servers to the queue
- Low-priority queues can be consumed later or even flow-limited
Another more granular control method is through RedisBRPOPCooperateLPUSHManually insert the head of the queue, but over-reliance is generally not recommended. The multi-queue solution is sufficient for most scenarios.
Q3: Can the task be retried after timeout?
- soft timeout (
SOFT_TIME_LIMIT): Can be captured within the missionSoftTimeLimitExceededException, decide whether to retry or clean up. - Hard timeout (
TIME_LIMIT): operating system directlySIGKILLKilling the child process is not aware of the child process and therefore cannot be retried within the task. But in this case, Celery itself will consider the task failed. You can configureautoretry_forOr use a global retry policy to let the Worker retry automatically.
It is recommended to always set a hard timeout that is slightly longer than the soft timeout as a last resort.
🔗 Related tutorials

