from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.date import DateTrigger from django.utils import timezone from datetime import datetime, date from .models import Task from .tasks import trigger_task_execution import logging logger = logging.getLogger(__name__) scheduler = BackgroundScheduler(timezone=None) # 使用本地时间 scheduler_started = False def start_scheduler(): global scheduler_started if scheduler_started: return scheduler_started = True scheduler.start() logger.info("APScheduler 启动成功") # 定期检查一次性任务,每30秒 scheduler.add_job(check_predefined_tasks, 'interval', seconds=30) # 定期检查新创建的每日定时任务,每30秒 scheduler.add_job(sync_scheduled_tasks, 'interval', seconds=30) def check_predefined_tasks(): """检查一次性任务并触发 Celery 异步执行""" logger.info("检查一次性任务: 开始") now = datetime.now() # 使用本地时间 tasks = Task.objects.filter(status='idle', execution_type='predefined') logger.debug(f"[Predefined] 检查 {len(tasks)} 个一次性任务, 当前时间 {now}") for task in tasks: exec_time = task.execution_time if not exec_time: logger.warning(f"Task {task.id} 没有设置 execution_time,跳过") continue # 数据库里已经是本地时间,不需要再做 timezone aware if exec_time <= now: try: # 异步调用 Celery 执行任务,只传 task.id trigger_task_execution.delay(task.id) logger.info(f"Task {task.id} 已触发 Celery 异步执行") # 更新任务状态为 done,避免重复触发 task.status = 'done' task.save(update_fields=['status']) except Exception as e: logger.error(f"触发 Task {task.id} 时出错: {e}") def sync_scheduled_tasks(): """同步每日定时任务到 APScheduler""" today = date.today() now = datetime.now() # 本地时间 tasks = Task.objects.filter(status='idle', execution_type='scheduled') logger.debug(f"[Scheduled] 检查 {len(tasks)} 个每日任务, 当前时间 {now}") for task in tasks: st = task.scheduled_time if not st: continue # 解析时间字符串 try: scheduled_time_obj = datetime.strptime(st, "%H:%M:%S").time() except ValueError: scheduled_time_obj = datetime.strptime(st, "%H:%M").time() last_run = task.last_run_date if last_run != today: # 直接用本地时间,不再 make_aware exec_datetime = datetime.combine(today, scheduled_time_obj) job_id = f"scheduled_task_{task.id}" if not scheduler.get_job(job_id): scheduler.add_job( run_scheduled_task, trigger=DateTrigger(run_date=exec_datetime), id=job_id, args=[task.id], replace_existing=True, misfire_grace_time=1 # 只允许 1 秒的延迟,超过就跳过 ) def run_scheduled_task(task_id): """执行每日定时任务""" try: task = Task.objects.get(id=task_id) except Task.DoesNotExist: logger.warning(f"[Scheduled] Task {task_id} 不存在") return try: trigger_task_execution.delay(task.id) logger.info(f"[Scheduled] Task {task.id} 已触发 Celery 执行") task.last_run_date = date.today() task.save(update_fields=['last_run_date']) except Exception as e: logger.error(f"[Scheduled] 执行 Task {task.id} 出错: {e}")