在Flask中使用Celery完成异步和定时任务(Flask、Celery、Redis)
from flask import Flask
from celery import Celery
# 创建Flask应用
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
# 创建Celery实例
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])
celery.conf.update(app.config)
# 定义一个异步任务
@celery.task
def add_numbers(a, b):
return a + b
# 使用Flask blueprint注册Celery服务
from celery.utils.log import get_task_logger
from flask_celery import CeleryBlueprint
celery_blueprint = CeleryBlueprint(celery)
app.register_blueprint(celery_blueprint, url_prefix='/celery')
logger = get_task_logger(__name__)
# 定义一个定时任务,每5分钟执行一次
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# CELERYBEAT_SCHEDULE是Celery的配置项,用于定义定时任务
sender.add_periodic_task(5*60.0, my_regular_task.s('Hello'), name='add every 5 minutes')
@celery.task
def my_regular_task(arg):
logger.info(f"Executing regular task: {arg}")
# 运行应用
if __name__ == '__main__':
app.run(debug=True)
这段代码示例展示了如何在Flask应用中集成Celery完成异步任务和定时任务。首先创建了Flask应用实例,并配置了Celery的消息代理和结果存储后端。接着定义了一个异步任务add_numbers
,以及一个定时任务my_regular_task
。最后,使用@celery.on_after_configure.connect
装饰器设置周期性任务,并启动Flask应用进行调试。
评论已关闭