redis+celery(python)解决fastapi(示例)异步任务,定时任务详解
from fastapi import FastAPI
from fastapi_utils.tasks import repeat_every_second
from celery import Celery
# 初始化FastAPI应用
app = FastAPI()
# 创建Celery实例
def create_celery_app(app: FastAPI):
celery = Celery('fastapi_celery', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
celery.conf.update(
result_expires=3600,
)
celery.conf.beat_schedule = {
'add-every-second': {
'task': 'fastapi_celery.tasks.add',
'schedule': 1.0,
'args': (1, 2)
},
}
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# 自定义定时任务
pass
return celery
# 创建Celery对象
celery_app = create_celery_app(app)
# 定义Celery任务
@celery_app.task
def add(x, y):
return x + y
# 定义FastAPI路由
@app.get("/")
async def root():
return {"message": "Hello World"}
# 启动FastAPI和Celery Worker
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
这个代码示例展示了如何在FastAPI应用中集成Redis和Celery来实现异步任务和定时任务。首先,我们创建了一个Celery实例,指定了Redis作为消息代理和结果存储。然后,我们定义了一个定时任务“add-every-second”,它会每秒调用一个名为fastapi_celery.tasks.add
的异步任务。最后,我们启动了FastAPI服务和Celery Worker。这个例子简洁明了,展示了如何将FastAPI与异步任务框架结合使用。
评论已关闭