Django定时任务之django_apscheduler使用
import pymysql
from apscheduler.schedulers.blocking import BlockingScheduler
from django.utils import timezone
from django.core.management import call_command
# 连接数据库
pymysql.install_as_MySQLdb()
# 定义调度器
scheduler = BlockingScheduler()
# 定义定时任务的函数
def my_job():
# 执行Django管理命令
call_command('migrate')
print('执行数据库迁移...')
# 添加定时任务
@scheduler.scheduled_job('cron', hour=2, minute=30) # 每天凌晨2点30分执行数据库迁移
def scheduled_job():
my_job()
# 启动调度器
scheduler.start()
这段代码首先导入了必要的模块,并通过pymysql.install_as_MySQLdb()
连接了MySQL数据库。然后定义了一个调度器,并定义了一个定时任务函数my_job
,该函数执行Django的数据库迁移命令,并打印一条消息。接着,定义了一个装饰器scheduled_job
,用于添加APScheduler的定时任务(在每天凌晨2点30分执行my_job
函数)。最后,调用scheduler.start()
来启动调度器,使得定时任务开始工作。
评论已关闭