Python 库之 Celery 详解
Python 库之 Celery 详解
Celery 是一个用于分布式任务队列的强大 Python 库,可以帮助开发者轻松实现异步任务调度、高并发、任务重试等功能。它常用于 Web 开发、定时任务处理和后台作业。
本文将详细讲解 Celery 的核心概念、安装与配置,以及代码示例和图解,帮助你快速掌握 Celery 的使用。
一、什么是 Celery?
1. 核心概念
- 分布式任务队列:Celery 使用生产者-消费者模型,将任务推送到队列中,由多个工作者(worker)异步执行。
- 异步任务:任务可以独立于主进程执行,不阻塞主程序。
- 高并发:Celery 能处理大量任务,支持任务优先级和调度。
2. 工作流程
- 任务生产者(Producer):定义并发送任务。
- 消息代理(Broker):管理任务队列。常用 RabbitMQ、Redis 等。
- 任务执行者(Worker):从队列中取出任务并执行。
- 结果后端(Backend):存储任务的执行结果。
图解:
+----------------+ +----------------+
| Task Producer | ---> | Message |
| (e.g., Web App)| | Broker |
+----------------+ +----------------+
↓
+----------------+
| Worker |
| (Executor) |
+----------------+
↓
+----------------+
| Result Backend |
+----------------+
二、Celery 的安装与配置
1. 安装 Celery
使用 pip
安装:
pip install celery
安装 Redis 作为消息代理:
pip install redis
确保 Redis 服务已启动:
redis-server
2. 配置 Celery
创建一个名为 tasks.py
的文件:
from celery import Celery
# 配置 Celery 应用
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
三、使用 Celery 执行任务
1. 启动 Celery Worker
在终端运行以下命令,启动 Worker:
celery -A tasks worker --loglevel=info
Worker 会监听任务队列并执行任务。
2. 发送任务
在另一个 Python 脚本中调用任务:
from tasks import add
# 调用任务
result = add.delay(4, 6) # 异步调用
print("任务提交完成,结果:", result.id)
# 检查任务结果
print("任务结果:", result.get(timeout=10))
3. 任务结果查看
运行代码后,你会在 Worker 的日志中看到类似以下的输出:
[2024-11-27 12:00:00,000: INFO/MainProcess] Received task: tasks.add[1234abcd]
[2024-11-27 12:00:00,010: INFO/MainProcess] Task tasks.add[1234abcd] succeeded in 0.01s: 10
四、进阶使用
1. 定时任务
结合 celery-beat
实现定时任务:
pip install celery[redis] celery[django] django-celery-beat
定义周期性任务:
from celery import Celery
from celery.schedules import crontab
app = Celery('periodic_tasks', broker='redis://localhost:6379/0')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10 seconds')
sender.add_periodic_task(
crontab(minute='*/1'),
test.s('world'),
name='say hello every minute',
)
@app.task
def test(arg):
print(arg)
启动 Worker 和 celery-beat
调度:
celery -A periodic_tasks worker --loglevel=info
celery -A periodic_tasks beat --loglevel=info
2. 任务重试
任务失败时可以重试:
from celery import Celery
app = Celery('retry_task', broker='redis://localhost:6379/0')
@app.task(bind=True, max_retries=3)
def retry_task(self):
try:
# 模拟错误
raise ValueError("模拟任务失败")
except Exception as exc:
raise self.retry(exc=exc, countdown=5) # 5 秒后重试
五、Celery 优势和应用场景
1. 优势
- 异步执行:主程序不必等待任务完成。
- 高扩展性:支持分布式任务调度。
- 灵活性:支持多种消息代理和结果存储后端。
2. 应用场景
- Web 应用:处理后台作业(如邮件发送、图片处理)。
- 数据处理:处理批量任务(如数据清洗、ETL 操作)。
- 定时任务:定时触发特定任务。
六、总结
Celery 是一个功能强大的分布式任务调度库,其灵活性和高效性使其成为异步任务处理的首选工具。从简单的异步任务到复杂的定时任务,Celery 都能胜任。
通过本文的图解和代码示例,你可以快速上手 Celery 并应用到实际项目中。进一步学习可以深入研究 Celery 的任务优先级、路由和监控工具(如 Flower)。
推荐阅读:
评论已关闭