selenium_keyan/selenium_django/api/scheduler.py

127 lines
4.0 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.date import DateTrigger
from django.utils import timezone
from datetime import datetime, date
from .models import Task,CrawlQueue
from .tasks import trigger_task_execution
import logging
logger = logging.getLogger(__name__)
scheduler = BackgroundScheduler(timezone=None) # 使用本地时间
scheduler_started = False
def start_scheduler():
global scheduler_started
if scheduler_started:
return
scheduler_started = True
scheduler.start()
logger.info("APScheduler 启动成功")
# 定期检查一次性任务每30秒
scheduler.add_job(check_predefined_tasks, 'interval', seconds=30)
# 定期检查新创建的每日定时任务每30秒
scheduler.add_job(sync_scheduled_tasks, 'interval', seconds=30)
def check_predefined_tasks():
"""检查一次性任务并加入队列"""
logger.info("检查一次性任务: 开始")
now = datetime.now()
tasks = Task.objects.filter(status='idle', execution_type='predefined')
logger.debug(f"[Predefined] 检查 {len(tasks)} 个一次性任务, 当前时间 {now}")
for task in tasks:
exec_time = task.execution_time
if not exec_time:
logger.warning(f"Task {task.id} 没有设置 execution_time跳过")
continue
if exec_time <= now:
try:
# 排队逻辑
task.status = 'running' # 前端显示为进行中
task.save(update_fields=['status'])
CrawlQueue.objects.create(
task=task,
texts=task.description,
parse_flag=task.parse_flag,
limit=task.limit,
sort_options=[],
status="pending"
)
process_crawl_queue.delay()
logger.info(f"Task {task.id} 已加入队列")
except Exception as e:
logger.error(f"触发 Task {task.id} 时出错: {e}")
def sync_scheduled_tasks():
"""同步每日定时任务到 APScheduler"""
today = date.today()
now = datetime.now()
tasks = Task.objects.filter(status='idle', execution_type='scheduled')
logger.debug(f"[Scheduled] 检查 {len(tasks)} 个每日任务, 当前时间 {now}")
for task in tasks:
st = task.scheduled_time
if not st:
continue
# 解析时间字符串
try:
scheduled_time_obj = datetime.strptime(st, "%H:%M:%S").time()
except ValueError:
scheduled_time_obj = datetime.strptime(st, "%H:%M").time()
last_run = task.last_run_date
if last_run != today:
exec_datetime = datetime.combine(today, scheduled_time_obj)
job_id = f"scheduled_task_{task.id}"
if not scheduler.get_job(job_id):
scheduler.add_job(
run_scheduled_task,
trigger=DateTrigger(run_date=exec_datetime),
id=job_id,
args=[task.id],
replace_existing=True,
misfire_grace_time=1 # 只允许 1 秒的延迟,超过就跳过
)
def run_scheduled_task(task_id):
"""执行每日定时任务"""
try:
task = Task.objects.get(id=task_id)
except Task.DoesNotExist:
logger.warning(f"[Scheduled] Task {task_id} 不存在")
return
try:
# 排队逻辑
task.status = 'running'
task.save(update_fields=['status'])
CrawlQueue.objects.create(
task=task,
texts=task.description,
parse_flag=task.parse_flag,
limit=task.limit,
sort_options=[],
status="pending"
)
process_crawl_queue.delay()
logger.info(f"[Scheduled] Task {task.id} 已加入队列")
task.last_run_date = date.today()
task.save(update_fields=['last_run_date'])
except Exception as e:
logger.error(f"[Scheduled] 执行 Task {task.id} 出错: {e}")