2025-10-12 05:43:20 +00:00
|
|
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
|
|
|
|
from apscheduler.triggers.date import DateTrigger
|
|
|
|
|
|
from django.utils import timezone
|
|
|
|
|
|
from datetime import datetime, date
|
|
|
|
|
|
from .models import Task,CrawlQueue
|
|
|
|
|
|
from .tasks import trigger_task_execution
|
|
|
|
|
|
import logging
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
scheduler = BackgroundScheduler(timezone=None) # 使用本地时间
|
|
|
|
|
|
scheduler_started = False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def start_scheduler():
|
|
|
|
|
|
global scheduler_started
|
|
|
|
|
|
if scheduler_started:
|
|
|
|
|
|
return
|
|
|
|
|
|
scheduler_started = True
|
|
|
|
|
|
|
|
|
|
|
|
scheduler.start()
|
|
|
|
|
|
logger.info("APScheduler 启动成功")
|
|
|
|
|
|
# 定期检查一次性任务,每30秒
|
|
|
|
|
|
scheduler.add_job(check_predefined_tasks, 'interval', seconds=30)
|
|
|
|
|
|
|
|
|
|
|
|
# 定期检查新创建的每日定时任务,每30秒
|
|
|
|
|
|
scheduler.add_job(sync_scheduled_tasks, 'interval', seconds=30)
|
|
|
|
|
|
|
|
|
|
|
|
def check_predefined_tasks():
|
|
|
|
|
|
"""检查一次性任务并加入队列"""
|
|
|
|
|
|
logger.info("检查一次性任务: 开始")
|
|
|
|
|
|
now = datetime.now()
|
|
|
|
|
|
tasks = Task.objects.filter(status='idle', execution_type='predefined')
|
|
|
|
|
|
logger.debug(f"[Predefined] 检查 {len(tasks)} 个一次性任务, 当前时间 {now}")
|
|
|
|
|
|
|
|
|
|
|
|
for task in tasks:
|
|
|
|
|
|
exec_time = task.execution_time
|
|
|
|
|
|
if not exec_time:
|
|
|
|
|
|
logger.warning(f"Task {task.id} 没有设置 execution_time,跳过")
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
if exec_time <= now:
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 排队逻辑
|
|
|
|
|
|
task.status = 'running' # 前端显示为进行中
|
|
|
|
|
|
task.save(update_fields=['status'])
|
|
|
|
|
|
|
|
|
|
|
|
CrawlQueue.objects.create(
|
|
|
|
|
|
task=task,
|
|
|
|
|
|
texts=task.description,
|
|
|
|
|
|
parse_flag=task.parse_flag,
|
|
|
|
|
|
limit=task.limit,
|
|
|
|
|
|
sort_options=[],
|
|
|
|
|
|
status="pending"
|
|
|
|
|
|
)
|
|
|
|
|
|
process_crawl_queue.delay()
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"Task {task.id} 已加入队列")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"触发 Task {task.id} 时出错: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def sync_scheduled_tasks():
|
|
|
|
|
|
"""同步每日定时任务到 APScheduler"""
|
|
|
|
|
|
today = date.today()
|
|
|
|
|
|
now = datetime.now()
|
|
|
|
|
|
tasks = Task.objects.filter(status='idle', execution_type='scheduled')
|
|
|
|
|
|
logger.debug(f"[Scheduled] 检查 {len(tasks)} 个每日任务, 当前时间 {now}")
|
|
|
|
|
|
|
|
|
|
|
|
for task in tasks:
|
|
|
|
|
|
st = task.scheduled_time
|
|
|
|
|
|
if not st:
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
# 解析时间字符串
|
|
|
|
|
|
try:
|
|
|
|
|
|
scheduled_time_obj = datetime.strptime(st, "%H:%M:%S").time()
|
|
|
|
|
|
except ValueError:
|
|
|
|
|
|
scheduled_time_obj = datetime.strptime(st, "%H:%M").time()
|
|
|
|
|
|
|
|
|
|
|
|
last_run = task.last_run_date
|
|
|
|
|
|
if last_run != today:
|
|
|
|
|
|
exec_datetime = datetime.combine(today, scheduled_time_obj)
|
|
|
|
|
|
job_id = f"scheduled_task_{task.id}"
|
|
|
|
|
|
|
|
|
|
|
|
if not scheduler.get_job(job_id):
|
|
|
|
|
|
scheduler.add_job(
|
|
|
|
|
|
run_scheduled_task,
|
|
|
|
|
|
trigger=DateTrigger(run_date=exec_datetime),
|
|
|
|
|
|
id=job_id,
|
|
|
|
|
|
args=[task.id],
|
|
|
|
|
|
replace_existing=True,
|
|
|
|
|
|
misfire_grace_time=1 # 只允许 1 秒的延迟,超过就跳过
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_scheduled_task(task_id):
|
|
|
|
|
|
"""执行每日定时任务"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
task = Task.objects.get(id=task_id)
|
|
|
|
|
|
except Task.DoesNotExist:
|
|
|
|
|
|
logger.warning(f"[Scheduled] Task {task_id} 不存在")
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 排队逻辑
|
|
|
|
|
|
task.status = 'running'
|
|
|
|
|
|
task.save(update_fields=['status'])
|
|
|
|
|
|
|
|
|
|
|
|
CrawlQueue.objects.create(
|
|
|
|
|
|
task=task,
|
|
|
|
|
|
texts=task.description,
|
|
|
|
|
|
parse_flag=task.parse_flag,
|
|
|
|
|
|
limit=task.limit,
|
|
|
|
|
|
sort_options=[],
|
|
|
|
|
|
status="pending"
|
|
|
|
|
|
)
|
|
|
|
|
|
process_crawl_queue.delay()
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"[Scheduled] Task {task.id} 已加入队列")
|
|
|
|
|
|
task.last_run_date = date.today()
|
|
|
|
|
|
task.save(update_fields=['last_run_date'])
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"[Scheduled] 执行 Task {task.id} 出错: {e}")
|
|
|
|
|
|
|