Python实现定时任务的三种方案——schedule、APScheduler、Celery
以下是使用schedule、APScheduler、Celery实现定时任务的简单示例:
- 使用schedule库:
import schedule
import time
def task():
print("任务执行中...")
schedule.every(10).seconds.do(task) # 每10秒执行一次
while True:
schedule.run_pending()
time.sleep(1)
- 使用APScheduler库:
from apscheduler.schedulers.blocking import BlockingScheduler
def task():
print("任务执行中...")
scheduler = BlockingScheduler()
scheduler.add_job(task, 'interval', seconds=10) # 每10秒执行一次
scheduler.start()
- 使用Celery库:
首先需要配置Celery和消息中间件(例如:RabbitMQ或Redis)。
celery_tasks.py
:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def task():
print("任务执行中...")
调度任务:
from celery_tasks import task
# 每10秒执行一次
task.delay()
以上代码仅展示了定时任务的简单调用方式,实际应用中可能需要根据具体需求进行更复杂的配置。
评论已关闭