# tasks.py import requests from datetime import datetime, date from django.db import transaction from .models import Task, TaskDetail from django.utils import timezone import threading import time from celery import shared_task from selenium_django.settings import CRAWL_API_URL def safe_dict_get(d, key, default=None): """安全获取字典 key""" if isinstance(d, dict): return d.get(key, default) return default @shared_task(bind=True, max_retries=3, default_retry_delay=60) def trigger_task_execution(self, task_id): """异步执行单个任务""" task = None try: # 获取任务 task = Task.objects.get(id=task_id) task.status = 'running' task.save(update_fields=['status']) print(f"任务 {task_id} 状态更新为 running") # 爬虫请求 payload = { "texts": task.description, "parse": task.parse_flag, "limit": task.limit } try: resp = requests.post(CRAWL_API_URL, json=payload, timeout=30000) resp.raise_for_status() except requests.RequestException as e: print(f"Task {task_id} 爬虫请求失败: {e}") raise self.retry(exc=e) # 安全解析 JSON try: data = resp.json() if not isinstance(data, dict): print(f"Task {task_id} 返回数据不是字典,用空 dict 代替") data = {} except ValueError: print(f"Task {task_id} 返回非 JSON 数据: {resp.text[:200]}") data = {} # code==20000 说明提取失败 if safe_dict_get(data, "code") == 20000: print(f"Task {task_id} 爬虫返回 code=20000, message={data.get('message')}") return {"success": False, "message": data.get("message", "提取不到关键词")} # 保存任务详情 results = safe_dict_get(data, "results", []) if not isinstance(results, list): results = [] with transaction.atomic(): for idx, item in enumerate(results, start=1): if not isinstance(item, dict): print(f"Task {task_id} results 第 {idx} 个元素不是字典,跳过") continue download_val = item.get("download") or 0 try: download_val = int(download_val) except (ValueError, TypeError): download_val = 0 date_val = str(item.get("date")) if item.get("date") else None author_val = item.get("author") if isinstance(author_val, list): author_val = ';'.join(author_val) elif author_val is None: author_val = '' keywords_val = item.get("keywords") if isinstance(keywords_val, list): keywords_val = ';'.join(keywords_val) else: keywords_val = '' pdf_url = item.get("pdfUrl") or '' parsed_summary = item.get("parsed_summary") or {} quote_val = item.get("quote") or '' site_val = item.get("site") or '' source_val = item.get("source") or '' summary_val = item.get("summary") or '' title_val = item.get("title") or '' original_link = item.get("originalLink") or '' # 保存 TaskDetail,单条失败不影响其他条 try: TaskDetail.objects.get_or_create( task=task, original_link=original_link, defaults={ 'author': author_val, 'date': date_val, 'download': download_val, 'keywords': keywords_val, 'pdf_url': pdf_url, 'parsed_summary': parsed_summary, 'quote': quote_val, 'site': site_val, 'source': source_val, 'summary': summary_val, 'title': title_val } ) print(f"Task {task_id} 保存第 {idx} 条结果成功") except Exception as e: print(f"Task {task_id} 保存第 {idx} 条结果失败: {e}") continue # 更新任务状态为 done task.status = 'done' task.save(update_fields=['status']) print(f"任务 {task_id} 执行完成") except Task.DoesNotExist: print(f"Task {task_id} 不存在") except Exception as e: print(f"Task {task_id} 执行失败: {e}") try: if task: task.status = 'failed' task.save(update_fields=['status']) except Exception as e2: print(f"更新任务失败状态失败: {e2}") raise self.retry(exc=e)