# tasks.py import requests from datetime import datetime, date from django.db import transaction, DatabaseError from .models import Task, TaskDetail,CrawlQueue from django.utils import timezone import threading import time from celery import shared_task from selenium_django.settings import CRAWL_API_URL def safe_dict_get(d, key, default=None): """安全获取字典 key""" if isinstance(d, dict): return d.get(key, default) return default @shared_task(bind=True, queue='crawler', max_retries=3, default_retry_delay=60) def trigger_task_execution(self, task_id): """接收任务 → 入队等待处理""" try: task = Task.objects.get(id=task_id) # 标记为排队状态 task.status = 'queued' task.save(update_fields=['status']) print(f"任务 {task_id} 状态更新为 queued") # 将任务存入 CrawlQueue CrawlQueue.objects.create( task=task, texts=task.description, parse_flag=task.parse_flag, limit=task.limit, sort_options=[], status="pending", ) print(f"任务 {task_id} 已加入爬虫队列") # 立即触发队列处理任务 process_crawl_queue.delay() except Task.DoesNotExist: print(f"Task {task_id} 不存在") except Exception as e: print(f"Task {task_id} 入队失败: {e}") raise self.retry(exc=e) @shared_task(bind=True, queue='crawl_worker', max_retries=3, default_retry_delay=60) def process_crawl_queue(self): """ 顺序执行队列任务,确保一个接着一个执行 """ item = None try: # 获取最早 pending 任务(加锁避免并发) with transaction.atomic(): item = ( CrawlQueue.objects .select_for_update(skip_locked=True) .filter(status='pending') .order_by('created_at') .first() ) if not item: return "no task" # 标记队列和任务状态 item.status = 'processing' item.save(update_fields=['status']) task = item.task task.status = 'running' task.save(update_fields=['status']) # 事务之外执行网络请求,减少锁表时间 payload = { "texts": item.texts, "parse": item.parse_flag, "limit": item.limit, "sort": item.sort_options } print(f"开始请求爬虫 task_id={task.id}") resp = requests.post(CRAWL_API_URL, json=payload, timeout=300) resp.raise_for_status() try: data = resp.json() except ValueError: print(f"Task {task.id} 返回非 JSON 数据: {resp.text[:200]}") data = {} results = data.get("results", []) if not isinstance(results, list): results = [] # 保存结果,事务保护 with transaction.atomic(): for idx, r in enumerate(results, start=1): TaskDetail.objects.get_or_create( task=task, original_link=r.get("originalLink") or "", defaults={ "author": ";".join(r.get("author", [])) if isinstance(r.get("author"), list) else (r.get("author") or ""), "date": str(r.get("date")) if r.get("date") else None, "download": int(r.get("download") or 0), "keywords": ";".join(r.get("keywords", [])) if isinstance(r.get("keywords"), list) else (r.get("keywords") or ""), "pdf_url": r.get("pdfUrl") or "", "parsed_summary": r.get("parsed_summary") or {}, "quote": r.get("quote") or "", "site": r.get("site") or "", "source": r.get("source") or "", "summary": r.get("summary") or "", "title": r.get("title") or "", } ) print(f"Task {task.id} 保存第 {idx} 条结果成功") # 标记完成 with transaction.atomic(): task.status = 'done' task.save(update_fields=['status']) item.status = 'done' item.save(update_fields=['status']) print(f"任务 {task.id} 执行完成") except requests.RequestException as e: print(f"网络请求失败 task_id={item.task.id if item else 'N/A'}: {e}") if item: with transaction.atomic(): item.status = 'pending' item.save(update_fields=['status']) raise self.retry(exc=e) except DatabaseError as e: print(f"数据库异常 task_id={item.task.id if item else 'N/A'}: {e}") raise self.retry(exc=e) except Exception as e: print(f"任务执行失败 task_id={item.task.id if item else 'N/A'}: {e}") if item: with transaction.atomic(): task.status = 'failed' task.save(update_fields=['status']) item.status = 'failed' item.save(update_fields=['status']) raise self.retry(exc=e) finally: # 触发下一个队列任务 process_crawl_queue.apply_async(countdown=1)