Django 集成 Celery:状态监控与任务管理
from django.core.management.base import BaseCommand
from django_celery_beat.models import PeriodicTask, IntervalSchedule
from celery import current_app
from datetime import timedelta
class Command(BaseCommand):
help = '监控Celery任务队列状态和管理周期性任务'
def handle(self, *args, **options):
# 获取所有周期性任务
periodic_tasks = PeriodicTask.objects.all()
for task in periodic_tasks:
# 打印任务名称和下一次执行时间
self.stdout.write(f"任务名称: {task.name}")
self.stdout.write(f"下次执行时间: {task.get_next_run_time()}")
# 获取所有的Celery beat调度
intervals = IntervalSchedule.objects.all()
for interval in intervals:
# 打印调度频率
self.stdout.write(f"调度频率: {interval.every} {interval.period}")
# 获取当前Celery应用的任务列表
self.stdout.write("Celery任务列表:")
for task_name in current_app.tasks:
self.stdout.write(f"- {task_name}")
这段代码定义了一个Django管理命令,用于监控周期性任务的执行状态,并列出了所有Celery任务。它展示了如何通过Django ORM查询周期性任务和调度,以及如何获取和展示Celery应用的任务列表。这对于需要管理和监控Celery任务的开发者来说是一个很好的学习资源。
评论已关闭