selenium_keyan/selenium_django/api/tasks.py

140 lines
5.0 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# tasks.py
import requests
from datetime import datetime, date
from django.db import transaction
from .models import Task, TaskDetail
from django.utils import timezone
import threading
import time
from celery import shared_task
from selenium_django.settings import CRAWL_API_URL
def safe_dict_get(d, key, default=None):
"""安全获取字典 key"""
if isinstance(d, dict):
return d.get(key, default)
return default
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def trigger_task_execution(self, task_id):
"""异步执行单个任务"""
task = None
try:
# 获取任务
task = Task.objects.get(id=task_id)
task.status = 'running'
task.save(update_fields=['status'])
print(f"任务 {task_id} 状态更新为 running")
# 爬虫请求
payload = {
"texts": task.description,
"parse": task.parse_flag,
"limit": task.limit
}
try:
resp = requests.post(CRAWL_API_URL, json=payload, timeout=30000)
resp.raise_for_status()
except requests.RequestException as e:
print(f"Task {task_id} 爬虫请求失败: {e}")
raise self.retry(exc=e)
# 安全解析 JSON
try:
data = resp.json()
if not isinstance(data, dict):
print(f"Task {task_id} 返回数据不是字典,用空 dict 代替")
data = {}
except ValueError:
print(f"Task {task_id} 返回非 JSON 数据: {resp.text[:200]}")
data = {}
# code==20000 说明提取失败
if safe_dict_get(data, "code") == 20000:
print(f"Task {task_id} 爬虫返回 code=20000, message={data.get('message')}")
return {"success": False, "message": data.get("message", "提取不到关键词")}
# 保存任务详情
results = safe_dict_get(data, "results", [])
if not isinstance(results, list):
results = []
with transaction.atomic():
for idx, item in enumerate(results, start=1):
if not isinstance(item, dict):
print(f"Task {task_id} results 第 {idx} 个元素不是字典,跳过")
continue
download_val = item.get("download") or 0
try:
download_val = int(download_val)
except (ValueError, TypeError):
download_val = 0
date_val = str(item.get("date")) if item.get("date") else None
author_val = item.get("author")
if isinstance(author_val, list):
author_val = ';'.join(author_val)
elif author_val is None:
author_val = ''
keywords_val = item.get("keywords")
if isinstance(keywords_val, list):
keywords_val = ';'.join(keywords_val)
else:
keywords_val = ''
pdf_url = item.get("pdfUrl") or ''
parsed_summary = item.get("parsed_summary") or {}
quote_val = item.get("quote") or ''
site_val = item.get("site") or ''
source_val = item.get("source") or ''
summary_val = item.get("summary") or ''
title_val = item.get("title") or ''
original_link = item.get("originalLink") or ''
# 保存 TaskDetail单条失败不影响其他条
try:
TaskDetail.objects.get_or_create(
task=task,
original_link=original_link,
defaults={
'author': author_val,
'date': date_val,
'download': download_val,
'keywords': keywords_val,
'pdf_url': pdf_url,
'parsed_summary': parsed_summary,
'quote': quote_val,
'site': site_val,
'source': source_val,
'summary': summary_val,
'title': title_val
}
)
print(f"Task {task_id} 保存第 {idx} 条结果成功")
except Exception as e:
print(f"Task {task_id} 保存第 {idx} 条结果失败: {e}")
continue
# 更新任务状态为 done
task.status = 'done'
task.save(update_fields=['status'])
print(f"任务 {task_id} 执行完成")
except Task.DoesNotExist:
print(f"Task {task_id} 不存在")
except Exception as e:
print(f"Task {task_id} 执行失败: {e}")
try:
if task:
task.status = 'failed'
task.save(update_fields=['status'])
except Exception as e2:
print(f"更新任务失败状态失败: {e2}")
raise self.retry(exc=e)