selenium_keyan/selenium_django/api/tasks.py

152 lines
5.3 KiB
Python
Raw Normal View History

# tasks.py
import requests
from datetime import datetime, date
2025-10-12 05:43:20 +00:00
from django.db import transaction, DatabaseError
2025-10-12 05:43:20 +00:00
from .models import Task, TaskDetail,CrawlQueue
from django.utils import timezone
import threading
import time
from celery import shared_task
from selenium_django.settings import CRAWL_API_URL
def safe_dict_get(d, key, default=None):
"""安全获取字典 key"""
if isinstance(d, dict):
return d.get(key, default)
return default
2025-10-12 05:43:20 +00:00
@shared_task(bind=True, queue='crawler', max_retries=3, default_retry_delay=60)
def trigger_task_execution(self, task_id):
2025-10-12 05:43:20 +00:00
"""接收任务 → 入队等待处理"""
try:
task = Task.objects.get(id=task_id)
2025-10-12 05:43:20 +00:00
# 标记为排队状态
task.status = 'queued'
task.save(update_fields=['status'])
2025-10-12 05:43:20 +00:00
print(f"任务 {task_id} 状态更新为 queued")
# 将任务存入 CrawlQueue
CrawlQueue.objects.create(
task=task,
texts=task.description,
parse_flag=task.parse_flag,
limit=task.limit,
sort_options=[],
status="pending",
)
print(f"任务 {task_id} 已加入爬虫队列")
# 立即触发队列处理任务
process_crawl_queue.delay()
2025-10-12 05:43:20 +00:00
except Task.DoesNotExist:
print(f"Task {task_id} 不存在")
except Exception as e:
print(f"Task {task_id} 入队失败: {e}")
raise self.retry(exc=e)
@shared_task(bind=True, queue='crawl_worker', max_retries=3, default_retry_delay=60)
def process_crawl_queue(self):
"""
顺序执行队列任务确保一个接着一个执行
"""
item = None
try:
# 获取最早 pending 任务(加锁避免并发)
with transaction.atomic():
item = (
CrawlQueue.objects
.select_for_update(skip_locked=True)
.filter(status='pending')
.order_by('created_at')
.first()
)
if not item:
return "no task"
# 标记队列和任务状态
item.status = 'processing'
item.save(update_fields=['status'])
task = item.task
task.status = 'running'
task.save(update_fields=['status'])
# 事务之外执行网络请求,减少锁表时间
payload = {
2025-10-12 05:43:20 +00:00
"texts": item.texts,
"parse": item.parse_flag,
"limit": item.limit,
"sort": item.sort_options
}
2025-10-12 05:43:20 +00:00
print(f"开始请求爬虫 task_id={task.id}")
resp = requests.post(CRAWL_API_URL, json=payload, timeout=300)
resp.raise_for_status()
try:
data = resp.json()
except ValueError:
2025-10-12 05:43:20 +00:00
print(f"Task {task.id} 返回非 JSON 数据: {resp.text[:200]}")
data = {}
2025-10-12 05:43:20 +00:00
results = data.get("results", [])
if not isinstance(results, list):
results = []
2025-10-12 05:43:20 +00:00
# 保存结果,事务保护
with transaction.atomic():
2025-10-12 05:43:20 +00:00
for idx, r in enumerate(results, start=1):
TaskDetail.objects.get_or_create(
task=task,
original_link=r.get("originalLink") or "",
defaults={
"author": ";".join(r.get("author", [])) if isinstance(r.get("author"), list) else (r.get("author") or ""),
"date": str(r.get("date")) if r.get("date") else None,
"download": int(r.get("download") or 0),
"keywords": ";".join(r.get("keywords", [])) if isinstance(r.get("keywords"), list) else (r.get("keywords") or ""),
"pdf_url": r.get("pdfUrl") or "",
"parsed_summary": r.get("parsed_summary") or {},
"quote": r.get("quote") or "",
"site": r.get("site") or "",
"source": r.get("source") or "",
"summary": r.get("summary") or "",
"title": r.get("title") or "",
}
)
print(f"Task {task.id} 保存第 {idx} 条结果成功")
# 标记完成
with transaction.atomic():
task.status = 'done'
task.save(update_fields=['status'])
item.status = 'done'
item.save(update_fields=['status'])
print(f"任务 {task.id} 执行完成")
except requests.RequestException as e:
print(f"网络请求失败 task_id={item.task.id if item else 'N/A'}: {e}")
if item:
with transaction.atomic():
item.status = 'pending'
item.save(update_fields=['status'])
raise self.retry(exc=e)
except DatabaseError as e:
print(f"数据库异常 task_id={item.task.id if item else 'N/A'}: {e}")
raise self.retry(exc=e)
except Exception as e:
2025-10-12 05:43:20 +00:00
print(f"任务执行失败 task_id={item.task.id if item else 'N/A'}: {e}")
if item:
with transaction.atomic():
task.status = 'failed'
task.save(update_fields=['status'])
2025-10-12 05:43:20 +00:00
item.status = 'failed'
item.save(update_fields=['status'])
raise self.retry(exc=e)
finally:
# 触发下一个队列任务
process_crawl_queue.apply_async(countdown=1)