105 lines
3.8 KiB
Python
105 lines
3.8 KiB
Python
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|||
|
from apscheduler.triggers.date import DateTrigger
|
|||
|
from django.utils import timezone
|
|||
|
from datetime import datetime, date
|
|||
|
from .models import Task
|
|||
|
from .tasks import trigger_task_execution
|
|||
|
import logging
|
|||
|
logger = logging.getLogger(__name__)
|
|||
|
scheduler = BackgroundScheduler(timezone=None) # 使用本地时间
|
|||
|
scheduler_started = False
|
|||
|
|
|||
|
|
|||
|
def start_scheduler():
|
|||
|
global scheduler_started
|
|||
|
if scheduler_started:
|
|||
|
return
|
|||
|
scheduler_started = True
|
|||
|
|
|||
|
scheduler.start()
|
|||
|
logger.info("APScheduler 启动成功")
|
|||
|
# 定期检查一次性任务,每30秒
|
|||
|
scheduler.add_job(check_predefined_tasks, 'interval', seconds=30)
|
|||
|
|
|||
|
# 定期检查新创建的每日定时任务,每30秒
|
|||
|
scheduler.add_job(sync_scheduled_tasks, 'interval', seconds=30)
|
|||
|
|
|||
|
def check_predefined_tasks():
|
|||
|
"""检查一次性任务并触发 Celery 异步执行"""
|
|||
|
logger.info("检查一次性任务: 开始")
|
|||
|
now = datetime.now() # 使用本地时间
|
|||
|
tasks = Task.objects.filter(status='idle', execution_type='predefined')
|
|||
|
logger.debug(f"[Predefined] 检查 {len(tasks)} 个一次性任务, 当前时间 {now}")
|
|||
|
|
|||
|
for task in tasks:
|
|||
|
exec_time = task.execution_time
|
|||
|
if not exec_time:
|
|||
|
logger.warning(f"Task {task.id} 没有设置 execution_time,跳过")
|
|||
|
continue
|
|||
|
|
|||
|
# 数据库里已经是本地时间,不需要再做 timezone aware
|
|||
|
if exec_time <= now:
|
|||
|
try:
|
|||
|
# 异步调用 Celery 执行任务,只传 task.id
|
|||
|
trigger_task_execution.delay(task.id)
|
|||
|
logger.info(f"Task {task.id} 已触发 Celery 异步执行")
|
|||
|
|
|||
|
# 更新任务状态为 done,避免重复触发
|
|||
|
task.status = 'done'
|
|||
|
task.save(update_fields=['status'])
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"触发 Task {task.id} 时出错: {e}")
|
|||
|
|
|||
|
|
|||
|
def sync_scheduled_tasks():
|
|||
|
"""同步每日定时任务到 APScheduler"""
|
|||
|
today = date.today()
|
|||
|
now = datetime.now() # 本地时间
|
|||
|
tasks = Task.objects.filter(status='idle', execution_type='scheduled')
|
|||
|
logger.debug(f"[Scheduled] 检查 {len(tasks)} 个每日任务, 当前时间 {now}")
|
|||
|
|
|||
|
for task in tasks:
|
|||
|
st = task.scheduled_time
|
|||
|
if not st:
|
|||
|
continue
|
|||
|
|
|||
|
# 解析时间字符串
|
|||
|
try:
|
|||
|
scheduled_time_obj = datetime.strptime(st, "%H:%M:%S").time()
|
|||
|
except ValueError:
|
|||
|
scheduled_time_obj = datetime.strptime(st, "%H:%M").time()
|
|||
|
|
|||
|
last_run = task.last_run_date
|
|||
|
if last_run != today:
|
|||
|
# 直接用本地时间,不再 make_aware
|
|||
|
exec_datetime = datetime.combine(today, scheduled_time_obj)
|
|||
|
|
|||
|
job_id = f"scheduled_task_{task.id}"
|
|||
|
if not scheduler.get_job(job_id):
|
|||
|
scheduler.add_job(
|
|||
|
run_scheduled_task,
|
|||
|
trigger=DateTrigger(run_date=exec_datetime),
|
|||
|
id=job_id,
|
|||
|
args=[task.id],
|
|||
|
replace_existing=True,
|
|||
|
misfire_grace_time=1 # 只允许 1 秒的延迟,超过就跳过
|
|||
|
)
|
|||
|
|
|||
|
|
|||
|
|
|||
|
def run_scheduled_task(task_id):
|
|||
|
"""执行每日定时任务"""
|
|||
|
try:
|
|||
|
task = Task.objects.get(id=task_id)
|
|||
|
except Task.DoesNotExist:
|
|||
|
logger.warning(f"[Scheduled] Task {task_id} 不存在")
|
|||
|
return
|
|||
|
|
|||
|
try:
|
|||
|
trigger_task_execution.delay(task.id)
|
|||
|
logger.info(f"[Scheduled] Task {task.id} 已触发 Celery 执行")
|
|||
|
task.last_run_date = date.today()
|
|||
|
task.save(update_fields=['last_run_date'])
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"[Scheduled] 执行 Task {task.id} 出错: {e}")
|