diff --git a/docker-compose.yaml b/docker-compose.yaml index a8f0e24..96eae4c 100755 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -14,7 +14,10 @@ services: container_name: selenium-crawl-container ports: - "5001:5000" - + environment: + API_MODEL: "glm-4.5" + API_BASE_URL: "https://open.bigmodel.cn/api/paas/v4" + API_KEY: "ce39bdd4fcf34ec0aec75072bc9ff988.hAp7HZTVUwy7vImn" # ---------- Django + Celery ---------- selenium_django: build: ./selenium_django @@ -24,7 +27,11 @@ services: CELERY_BROKER_URL: redis://redis:6379/0 CELERY_RESULT_BACKEND: redis://redis:6379/0 # Django 调用爬虫服务的地址 - CRAWL_API_URL: http://47.83.141.164:5001/crawl + CRAWL_API_URL: http://47.83.141.164:5001 + # API 配置 + API_MODEL: "glm-4-long" + API_BASE_URL: "https://open.bigmodel.cn/api/paas/v4" + API_KEY: "ce39bdd4fcf34ec0aec75072bc9ff988.hAp7HZTVUwy7vImn" volumes: - "./selenium_django:/app" depends_on: @@ -39,8 +46,8 @@ services: context: ./selenium_vue # 上一级目录 dockerfile: Dockerfile args: - VITE_API_BASE_URL: http://47.83.141.164:8002 - VITE_CRAWL_URL: http://47.83.141.164:5001/crawl + VITE_API_BASE_URL: http://47.83.141.164:8002 # 改为远程 IP + VITE_CRAWL_URL: http://47.83.141.164:5001 # 改为远程 IP container_name: selenium-vue-container environment: PORT: 80 diff --git a/selenium/config.py b/selenium/config.py index ea9fca8..de0f716 100755 --- a/selenium/config.py +++ b/selenium/config.py @@ -4,9 +4,9 @@ from selenium import webdriver from selenium.webdriver.chrome.service import Service import os api_info = { - "model": "glm-4.5", - "base_url": "https://open.bigmodel.cn/api/paas/v4", - "api_key": "ce39bdd4fcf34ec0aec75072bc9ff988.hAp7HZTVUwy7vImn" + "model": os.environ.get("API_MODEL", "glm-4.5"), # 默认值可选 + "base_url": os.environ.get("API_BASE_URL", "https://open.bigmodel.cn/api/paas/v4"), + "api_key": os.environ.get("API_KEY", ""), } # chrome浏览器以及驱动配置 diff --git a/selenium/main.py b/selenium/main.py index 93c042d..43b1740 100755 --- a/selenium/main.py +++ b/selenium/main.py @@ -39,26 +39,31 @@ def translate_text(text): return {"chinese": [], "english": []} # 构造 prompt - prompt = ( - "你是科研助手,输入是一句话或中文关键词列表。" - "请从输入中理解语义,提取与科研论文主题最相关、最核心的中文主题,并翻译为英文。" - "只保留1~2个最核心主题,不要加入无关内容。" - "输出必须严格遵守 JSON 格式,不允许有额外文字或符号:{\"chinese\": [...], \"english\": [...]}。\n" - "示例输入输出:\n" - "输入: '我想获取基于深度学习的图像识别方面的研究'\n" - "输出: {\"chinese\": [\"基于深度学习的图像识别\"], \"english\": [\"Deep Learning-based Image Recognition\"]}\n" - "输入: '图像识别在深度学习方面的研究'\n" - "输出: {\"chinese\": [\"基于深度学习的图像识别\"], \"english\": [\"Deep Learning-based Image Recognition\"]}\n" - "输入: '自然语言处理模型在文本分类中的应用'\n" - "输出: {\"chinese\": [\"自然语言处理文本分类\"], \"english\": [\"NLP Text Classification\"]}\n" - "输入: '强化学习在自动驾驶决策中的最新进展'\n" - "输出: {\"chinese\": [\"强化学习自动驾驶决策\"], \"english\": [\"Reinforcement Learning for Autonomous Driving Decision-Making\"]}\n" - "输入: '使用图神经网络进行社交网络分析的研究'\n" - "输出: {\"chinese\": [\"图神经网络社交网络分析\"], \"english\": [\"Graph Neural Networks for Social Network Analysis\"]}\n" - "输入: '我想研究深度强化学习在机器人控制中的应用'\n" - "输出: {\"chinese\": [\"深度强化学习机器人控制\"], \"english\": [\"Deep Reinforcement Learning for Robot Control\"]}\n" - f"现在请对输入提取核心主题:\n输入: {text}" - ) + system_prompt = """你是一名科研检索关键词提炼专家,任务是将用户输入的自然语言直接提炼为学术检索关键词。 + 【要求】 + 1. 提炼输入中的核心研究对象、问题、方法或应用场景。 + 2. 用学术化中文表达,避免口语化或宽泛词汇。 + 3. 给出对应英文表达,使用国际学术界常用专业术语。 + 4. 如果输入包含多个研究问题,请分别提炼关键词,每个字段最多 3 个关键词。 + 5. 删除无关修饰词或无检索价值的词。 + 6. 输出严格 JSON 格式,仅包含 `chinese` 和 `english` 字段,值为列表。 + + 【示例】 + 输入: '我想研究深度强化学习在机器人控制中的应用' + 输出: {"chinese": ["深度强化学习", "机器人控制"], "english": ["Deep Reinforcement Learning", "Robot Control"]} + + 输入: '大模型多轮对话迷失的问题及解决方案' + 输出: {"chinese": ["大型语言模型", "多轮对话上下文漂移"], "english": ["Large Language Models", "Context Drift in Multi-turn Dialogue"]} + + 输入: '人工智能幻觉问题及多轮对话迷失的解决方法,包括意图识别工作' + 输出: {"chinese": ["人工智能幻觉", "多轮对话上下文漂移", "意图识别"], "english": ["AI Hallucination", "Context Drift in Multi-turn Dialogue", "Intent Recognition"]} + + 输入: '了解生态系统的能量流动和物种多样性' + 输出: {"chinese": ["生态系统能量流动", "物种多样性"], "english": ["Ecosystem Energy Flow", "Species Diversity"]} + """ + user_prompt=f"""输入:{text} + 请严格输出符合 JSON 格式的核心科研关键词: + """ url = f"{api_info['base_url']}/chat/completions" @@ -68,12 +73,21 @@ def translate_text(text): } payload = { "model": api_info["model"], - "messages": [{"role": "user", "content": prompt}], + "messages": [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt} + ], + "thinking": { + "type": "disabled" + }, "max_output_tokens": 512 } + + + try: - resp = requests.post(url, headers=headers, json=payload, timeout=30) + resp = requests.post(url, headers=headers, json=payload, timeout=60) resp.raise_for_status() result = resp.json() diff --git a/selenium/parseApi/api.py b/selenium/parseApi/api.py index e4a7d0b..9a3b275 100755 --- a/selenium/parseApi/api.py +++ b/selenium/parseApi/api.py @@ -25,6 +25,9 @@ async def call_model_api(prompt): payload = { "model": api_info["model"], "messages": [{"role": "user", "content": prompt}], + "thinking": { + "type": "disabled" + }, "max_output_tokens": 1024 } diff --git a/selenium_django/Dockerfile b/selenium_django/Dockerfile index 6df2474..b31f1f3 100755 --- a/selenium_django/Dockerfile +++ b/selenium_django/Dockerfile @@ -20,11 +20,6 @@ ENV CELERY_BROKER_URL=${CELERY_BROKER_URL:-redis://redis:6379/0} ENV CELERY_RESULT_BACKEND=${CELERY_RESULT_BACKEND:-redis://redis:6379/0} ENV CRAWL_API_URL=${CRAWL_API_URL:-http://47.83.141.164:5001/crawl} -# 在构建时替换 settings.py 中的配置 -RUN sed -i "s#CELERY_BROKER_URL = .*#CELERY_BROKER_URL = '${CELERY_BROKER_URL}'#" selenium_django/settings.py && \ - sed -i "s#CELERY_RESULT_BACKEND = .*#CELERY_RESULT_BACKEND = '${CELERY_RESULT_BACKEND}'#" selenium_django/settings.py && \ - sed -i "s#CRAWL_API_URL = .*#CRAWL_API_URL = '${CRAWL_API_URL}'#" selenium_django/settings.py - # 入口脚本 COPY entrypoint.sh /entrypoint.sh RUN chmod +x /entrypoint.sh diff --git a/selenium_django/api/models.py b/selenium_django/api/models.py index 0067a5e..1c69683 100755 --- a/selenium_django/api/models.py +++ b/selenium_django/api/models.py @@ -1,55 +1,68 @@ -from django.db import models - -# Create your models here. -from django.db import models - -class Task(models.Model): - TASK_STATUS_CHOICES = [ - ('running', '进行中'), - ('idle', '空闲中'), - ('done', '完成'), - ('failed', '失败'), - ] - - EXECUTION_TYPE_CHOICES = [ - ('scheduled', '定期执行'), - ('predefined', '预定时间执行'), - ] - - task_id = models.CharField(max_length=64, unique=True) - name = models.CharField(max_length=200) - description = models.TextField(blank=True, null=True) - last_run_date = models.DateField(null=True, blank=True) - execution_type = models.CharField( - max_length=20, - choices=EXECUTION_TYPE_CHOICES, - blank=True, - null=True - ) - # 一次性执行使用 DateTimeField - execution_time = models.DateTimeField(blank=True, null=True) - # 每天执行使用 TimeField - scheduled_time = models.CharField(max_length=10, blank=True, null=True) # 改为字符串 HH:MM - parse_flag = models.BooleanField(default=False) - limit = models.IntegerField(default=60) # ⭐ 新增的字段,默认60 - status = models.CharField(max_length=20, choices=TASK_STATUS_CHOICES, default='idle') - created_at = models.DateTimeField(auto_now_add=True) - updated_at = models.DateTimeField(auto_now=True) - - def __str__(self): - return self.name -class TaskDetail(models.Model): - task = models.ForeignKey(Task, related_name="details", on_delete=models.CASCADE) - author = models.CharField(max_length=500, blank=True) - date = models.CharField(max_length=100, blank=True, null=True) # 改为字符串 - download = models.IntegerField(blank=True, null=True) - keywords = models.TextField(blank=True) # 存储 ; 分隔的关键字 - original_link = models.URLField(blank=True) - pdf_url = models.URLField(blank=True) - quote = models.TextField(blank=True) - source = models.CharField(max_length=200, blank=True) - site = models.CharField(max_length=200, blank=True) - summary = models.TextField(blank=True) - parsed_summary = models.JSONField(blank=True, null=True) # 存储 JSON - title = models.CharField(max_length=300, blank=True) - created_at = models.DateTimeField(auto_now_add=True) +from django.db import models + +# Create your models here. +from django.db import models + + + +class Task(models.Model): + TASK_STATUS_CHOICES = [ + ('running', '进行中'), + ('queued', '进行中'), + ('idle', '空闲中'), + ('done', '完成'), + ('failed', '失败'), + ] + EXECUTION_TYPE_CHOICES = [ + ('scheduled', '定期执行'), + ('predefined', '预定时间执行'), + ] + + task_id = models.CharField(max_length=64, unique=True) + name = models.CharField(max_length=200) + description = models.TextField(blank=True, null=True) + last_run_date = models.DateField(null=True, blank=True) + execution_type = models.CharField( + max_length=20, + choices=EXECUTION_TYPE_CHOICES, + blank=True, + null=True + ) + # 一次性执行使用 DateTimeField + execution_time = models.DateTimeField(blank=True, null=True) + # 每天执行使用 TimeField + scheduled_time = models.CharField(max_length=10, blank=True, null=True) # 改为字符串 HH:MM + parse_flag = models.BooleanField(default=False) + limit = models.IntegerField(default=60) # ⭐ 新增的字段,默认60 + status = models.CharField(max_length=20, choices=TASK_STATUS_CHOICES, default='idle') + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + def __str__(self): + return self.name +class TaskDetail(models.Model): + task = models.ForeignKey(Task, related_name="details", on_delete=models.CASCADE) + author = models.CharField(max_length=500, blank=True) + date = models.CharField(max_length=100, blank=True, null=True) # 改为字符串 + download = models.IntegerField(blank=True, null=True) + keywords = models.TextField(blank=True) # 存储 ; 分隔的关键字 + original_link = models.URLField(blank=True) + pdf_url = models.URLField(blank=True) + quote = models.TextField(blank=True) + source = models.CharField(max_length=200, blank=True) + site = models.CharField(max_length=200, blank=True) + summary = models.TextField(blank=True) + parsed_summary = models.JSONField(blank=True, null=True) # 存储 JSON + title = models.CharField(max_length=300, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + +class CrawlQueue(models.Model): + task = models.ForeignKey(Task, on_delete=models.CASCADE, related_name="queue") + texts = models.TextField() + parse_flag = models.BooleanField(default=True) + limit = models.IntegerField(default=10) + sort_options = models.JSONField(default=list) + status = models.CharField(max_length=20, default="pending") # pending / processing / done / failed + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + diff --git a/selenium_django/api/scheduler.py b/selenium_django/api/scheduler.py index b33901f..3528311 100755 --- a/selenium_django/api/scheduler.py +++ b/selenium_django/api/scheduler.py @@ -1,104 +1,126 @@ -from apscheduler.schedulers.background import BackgroundScheduler -from apscheduler.triggers.date import DateTrigger -from django.utils import timezone -from datetime import datetime, date -from .models import Task -from .tasks import trigger_task_execution -import logging -logger = logging.getLogger(__name__) -scheduler = BackgroundScheduler(timezone=None) # 使用本地时间 -scheduler_started = False - - -def start_scheduler(): - global scheduler_started - if scheduler_started: - return - scheduler_started = True - - scheduler.start() - logger.info("APScheduler 启动成功") - # 定期检查一次性任务,每30秒 - scheduler.add_job(check_predefined_tasks, 'interval', seconds=30) - - # 定期检查新创建的每日定时任务,每30秒 - scheduler.add_job(sync_scheduled_tasks, 'interval', seconds=30) - -def check_predefined_tasks(): - """检查一次性任务并触发 Celery 异步执行""" - logger.info("检查一次性任务: 开始") - now = datetime.now() # 使用本地时间 - tasks = Task.objects.filter(status='idle', execution_type='predefined') - logger.debug(f"[Predefined] 检查 {len(tasks)} 个一次性任务, 当前时间 {now}") - - for task in tasks: - exec_time = task.execution_time - if not exec_time: - logger.warning(f"Task {task.id} 没有设置 execution_time,跳过") - continue - - # 数据库里已经是本地时间,不需要再做 timezone aware - if exec_time <= now: - try: - # 异步调用 Celery 执行任务,只传 task.id - trigger_task_execution.delay(task.id) - logger.info(f"Task {task.id} 已触发 Celery 异步执行") - - # 更新任务状态为 done,避免重复触发 - task.status = 'done' - task.save(update_fields=['status']) - except Exception as e: - logger.error(f"触发 Task {task.id} 时出错: {e}") - - -def sync_scheduled_tasks(): - """同步每日定时任务到 APScheduler""" - today = date.today() - now = datetime.now() # 本地时间 - tasks = Task.objects.filter(status='idle', execution_type='scheduled') - logger.debug(f"[Scheduled] 检查 {len(tasks)} 个每日任务, 当前时间 {now}") - - for task in tasks: - st = task.scheduled_time - if not st: - continue - - # 解析时间字符串 - try: - scheduled_time_obj = datetime.strptime(st, "%H:%M:%S").time() - except ValueError: - scheduled_time_obj = datetime.strptime(st, "%H:%M").time() - - last_run = task.last_run_date - if last_run != today: - # 直接用本地时间,不再 make_aware - exec_datetime = datetime.combine(today, scheduled_time_obj) - - job_id = f"scheduled_task_{task.id}" - if not scheduler.get_job(job_id): - scheduler.add_job( - run_scheduled_task, - trigger=DateTrigger(run_date=exec_datetime), - id=job_id, - args=[task.id], - replace_existing=True, - misfire_grace_time=1 # 只允许 1 秒的延迟,超过就跳过 - ) - - - -def run_scheduled_task(task_id): - """执行每日定时任务""" - try: - task = Task.objects.get(id=task_id) - except Task.DoesNotExist: - logger.warning(f"[Scheduled] Task {task_id} 不存在") - return - - try: - trigger_task_execution.delay(task.id) - logger.info(f"[Scheduled] Task {task.id} 已触发 Celery 执行") - task.last_run_date = date.today() - task.save(update_fields=['last_run_date']) - except Exception as e: - logger.error(f"[Scheduled] 执行 Task {task.id} 出错: {e}") +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.date import DateTrigger +from django.utils import timezone +from datetime import datetime, date +from .models import Task,CrawlQueue +from .tasks import trigger_task_execution +import logging +logger = logging.getLogger(__name__) +scheduler = BackgroundScheduler(timezone=None) # 使用本地时间 +scheduler_started = False + + +def start_scheduler(): + global scheduler_started + if scheduler_started: + return + scheduler_started = True + + scheduler.start() + logger.info("APScheduler 启动成功") + # 定期检查一次性任务,每30秒 + scheduler.add_job(check_predefined_tasks, 'interval', seconds=30) + + # 定期检查新创建的每日定时任务,每30秒 + scheduler.add_job(sync_scheduled_tasks, 'interval', seconds=30) + +def check_predefined_tasks(): + """检查一次性任务并加入队列""" + logger.info("检查一次性任务: 开始") + now = datetime.now() + tasks = Task.objects.filter(status='idle', execution_type='predefined') + logger.debug(f"[Predefined] 检查 {len(tasks)} 个一次性任务, 当前时间 {now}") + + for task in tasks: + exec_time = task.execution_time + if not exec_time: + logger.warning(f"Task {task.id} 没有设置 execution_time,跳过") + continue + + if exec_time <= now: + try: + # 排队逻辑 + task.status = 'running' # 前端显示为进行中 + task.save(update_fields=['status']) + + CrawlQueue.objects.create( + task=task, + texts=task.description, + parse_flag=task.parse_flag, + limit=task.limit, + sort_options=[], + status="pending" + ) + process_crawl_queue.delay() + + logger.info(f"Task {task.id} 已加入队列") + except Exception as e: + logger.error(f"触发 Task {task.id} 时出错: {e}") + + + + + +def sync_scheduled_tasks(): + """同步每日定时任务到 APScheduler""" + today = date.today() + now = datetime.now() + tasks = Task.objects.filter(status='idle', execution_type='scheduled') + logger.debug(f"[Scheduled] 检查 {len(tasks)} 个每日任务, 当前时间 {now}") + + for task in tasks: + st = task.scheduled_time + if not st: + continue + + # 解析时间字符串 + try: + scheduled_time_obj = datetime.strptime(st, "%H:%M:%S").time() + except ValueError: + scheduled_time_obj = datetime.strptime(st, "%H:%M").time() + + last_run = task.last_run_date + if last_run != today: + exec_datetime = datetime.combine(today, scheduled_time_obj) + job_id = f"scheduled_task_{task.id}" + + if not scheduler.get_job(job_id): + scheduler.add_job( + run_scheduled_task, + trigger=DateTrigger(run_date=exec_datetime), + id=job_id, + args=[task.id], + replace_existing=True, + misfire_grace_time=1 # 只允许 1 秒的延迟,超过就跳过 + ) + + +def run_scheduled_task(task_id): + """执行每日定时任务""" + try: + task = Task.objects.get(id=task_id) + except Task.DoesNotExist: + logger.warning(f"[Scheduled] Task {task_id} 不存在") + return + + try: + # 排队逻辑 + task.status = 'running' + task.save(update_fields=['status']) + + CrawlQueue.objects.create( + task=task, + texts=task.description, + parse_flag=task.parse_flag, + limit=task.limit, + sort_options=[], + status="pending" + ) + process_crawl_queue.delay() + + logger.info(f"[Scheduled] Task {task.id} 已加入队列") + task.last_run_date = date.today() + task.save(update_fields=['last_run_date']) + except Exception as e: + logger.error(f"[Scheduled] 执行 Task {task.id} 出错: {e}") + diff --git a/selenium_django/api/tasks.py b/selenium_django/api/tasks.py index 79b9420..95e086e 100755 --- a/selenium_django/api/tasks.py +++ b/selenium_django/api/tasks.py @@ -2,9 +2,9 @@ import requests from datetime import datetime, date -from django.db import transaction +from django.db import transaction, DatabaseError -from .models import Task, TaskDetail +from .models import Task, TaskDetail,CrawlQueue from django.utils import timezone import threading import time @@ -18,123 +18,135 @@ def safe_dict_get(d, key, default=None): return d.get(key, default) return default -@shared_task(bind=True, max_retries=3, default_retry_delay=60) +@shared_task(bind=True, queue='crawler', max_retries=3, default_retry_delay=60) def trigger_task_execution(self, task_id): - """异步执行单个任务""" - task = None + """接收任务 → 入队等待处理""" try: - # 获取任务 task = Task.objects.get(id=task_id) - task.status = 'running' + # 标记为排队状态 + task.status = 'queued' task.save(update_fields=['status']) - print(f"任务 {task_id} 状态更新为 running") + print(f"任务 {task_id} 状态更新为 queued") - # 爬虫请求 - payload = { - "texts": task.description, - "parse": task.parse_flag, - "limit": task.limit - } + # 将任务存入 CrawlQueue + CrawlQueue.objects.create( + task=task, + texts=task.description, + parse_flag=task.parse_flag, + limit=task.limit, + sort_options=[], + status="pending", + ) + print(f"任务 {task_id} 已加入爬虫队列") - try: - resp = requests.post(CRAWL_API_URL, json=payload, timeout=30000) - resp.raise_for_status() - except requests.RequestException as e: - print(f"Task {task_id} 爬虫请求失败: {e}") - raise self.retry(exc=e) - - # 安全解析 JSON - try: - data = resp.json() - if not isinstance(data, dict): - print(f"Task {task_id} 返回数据不是字典,用空 dict 代替") - data = {} - except ValueError: - print(f"Task {task_id} 返回非 JSON 数据: {resp.text[:200]}") - data = {} - - # code==20000 说明提取失败 - if safe_dict_get(data, "code") == 20000: - print(f"Task {task_id} 爬虫返回 code=20000, message={data.get('message')}") - return {"success": False, "message": data.get("message", "提取不到关键词")} - - # 保存任务详情 - results = safe_dict_get(data, "results", []) - if not isinstance(results, list): - results = [] - - with transaction.atomic(): - for idx, item in enumerate(results, start=1): - if not isinstance(item, dict): - print(f"Task {task_id} results 第 {idx} 个元素不是字典,跳过") - continue - - download_val = item.get("download") or 0 - try: - download_val = int(download_val) - except (ValueError, TypeError): - download_val = 0 - - date_val = str(item.get("date")) if item.get("date") else None - - author_val = item.get("author") - if isinstance(author_val, list): - author_val = ';'.join(author_val) - elif author_val is None: - author_val = '' - - keywords_val = item.get("keywords") - if isinstance(keywords_val, list): - keywords_val = ';'.join(keywords_val) - else: - keywords_val = '' - - pdf_url = item.get("pdfUrl") or '' - parsed_summary = item.get("parsed_summary") or {} - quote_val = item.get("quote") or '' - site_val = item.get("site") or '' - source_val = item.get("source") or '' - summary_val = item.get("summary") or '' - title_val = item.get("title") or '' - original_link = item.get("originalLink") or '' - - # 保存 TaskDetail,单条失败不影响其他条 - try: - TaskDetail.objects.get_or_create( - task=task, - original_link=original_link, - defaults={ - 'author': author_val, - 'date': date_val, - 'download': download_val, - 'keywords': keywords_val, - 'pdf_url': pdf_url, - 'parsed_summary': parsed_summary, - 'quote': quote_val, - 'site': site_val, - 'source': source_val, - 'summary': summary_val, - 'title': title_val - } - ) - print(f"Task {task_id} 保存第 {idx} 条结果成功") - except Exception as e: - print(f"Task {task_id} 保存第 {idx} 条结果失败: {e}") - continue - - # 更新任务状态为 done - task.status = 'done' - task.save(update_fields=['status']) - print(f"任务 {task_id} 执行完成") + # 立即触发队列处理任务 + process_crawl_queue.delay() except Task.DoesNotExist: print(f"Task {task_id} 不存在") except Exception as e: - print(f"Task {task_id} 执行失败: {e}") + print(f"Task {task_id} 入队失败: {e}") + raise self.retry(exc=e) + +@shared_task(bind=True, queue='crawl_worker', max_retries=3, default_retry_delay=60) +def process_crawl_queue(self): + """ + 顺序执行队列任务,确保一个接着一个执行 + """ + item = None + try: + # 获取最早 pending 任务(加锁避免并发) + with transaction.atomic(): + item = ( + CrawlQueue.objects + .select_for_update(skip_locked=True) + .filter(status='pending') + .order_by('created_at') + .first() + ) + if not item: + return "no task" + + # 标记队列和任务状态 + item.status = 'processing' + item.save(update_fields=['status']) + + task = item.task + task.status = 'running' + task.save(update_fields=['status']) + + # 事务之外执行网络请求,减少锁表时间 + payload = { + "texts": item.texts, + "parse": item.parse_flag, + "limit": item.limit, + "sort": item.sort_options + } + print(f"开始请求爬虫 task_id={task.id}") + resp = requests.post(CRAWL_API_URL, json=payload, timeout=300) + resp.raise_for_status() try: - if task: + data = resp.json() + except ValueError: + print(f"Task {task.id} 返回非 JSON 数据: {resp.text[:200]}") + data = {} + + results = data.get("results", []) + if not isinstance(results, list): + results = [] + + # 保存结果,事务保护 + with transaction.atomic(): + for idx, r in enumerate(results, start=1): + TaskDetail.objects.get_or_create( + task=task, + original_link=r.get("originalLink") or "", + defaults={ + "author": ";".join(r.get("author", [])) if isinstance(r.get("author"), list) else (r.get("author") or ""), + "date": str(r.get("date")) if r.get("date") else None, + "download": int(r.get("download") or 0), + "keywords": ";".join(r.get("keywords", [])) if isinstance(r.get("keywords"), list) else (r.get("keywords") or ""), + "pdf_url": r.get("pdfUrl") or "", + "parsed_summary": r.get("parsed_summary") or {}, + "quote": r.get("quote") or "", + "site": r.get("site") or "", + "source": r.get("source") or "", + "summary": r.get("summary") or "", + "title": r.get("title") or "", + } + ) + print(f"Task {task.id} 保存第 {idx} 条结果成功") + + # 标记完成 + with transaction.atomic(): + task.status = 'done' + task.save(update_fields=['status']) + item.status = 'done' + item.save(update_fields=['status']) + print(f"任务 {task.id} 执行完成") + + except requests.RequestException as e: + print(f"网络请求失败 task_id={item.task.id if item else 'N/A'}: {e}") + if item: + with transaction.atomic(): + item.status = 'pending' + item.save(update_fields=['status']) + raise self.retry(exc=e) + + except DatabaseError as e: + print(f"数据库异常 task_id={item.task.id if item else 'N/A'}: {e}") + raise self.retry(exc=e) + + except Exception as e: + print(f"任务执行失败 task_id={item.task.id if item else 'N/A'}: {e}") + if item: + with transaction.atomic(): task.status = 'failed' task.save(update_fields=['status']) - except Exception as e2: - print(f"更新任务失败状态失败: {e2}") - raise self.retry(exc=e) \ No newline at end of file + item.status = 'failed' + item.save(update_fields=['status']) + raise self.retry(exc=e) + + finally: + # 触发下一个队列任务 + process_crawl_queue.apply_async(countdown=1) \ No newline at end of file diff --git a/selenium_django/api/views.py b/selenium_django/api/views.py index c63d405..d65661a 100755 --- a/selenium_django/api/views.py +++ b/selenium_django/api/views.py @@ -10,13 +10,21 @@ from django_filters.rest_framework import DjangoFilterBackend # Create your views here. from rest_framework import viewsets, filters from rest_framework.pagination import PageNumberPagination -from .models import Task, TaskDetail +from .models import Task, TaskDetail,CrawlQueue from .serializers import TaskSerializer, TaskDetailSerializer, TaskListSerializer from rest_framework.decorators import action from rest_framework.response import Response from rest_framework import status -from .tasks import trigger_task_execution +from .tasks import trigger_task_execution,process_crawl_queue import threading +import logging +logger = logging.getLogger(__name__) + + +print(f'----------chat----------init---------') + + + # 分页设置 class StandardResultsSetPagination(PageNumberPagination): page_size = 10 @@ -36,6 +44,7 @@ def sync_stream(generator): # 获取异步生成器的下一条数据 chunk = loop.run_until_complete(async_gen.__anext__()) if chunk and chunk.strip(): + print(chunk) yield chunk except StopAsyncIteration: break @@ -52,6 +61,9 @@ async def call_model_stream(messages): "model": api_info["model"], "messages": messages, "max_output_tokens": 1024, + "thinking": { + "type": "disabled" + }, "stream": True } @@ -77,6 +89,12 @@ class TaskViewSet(viewsets.ModelViewSet): ordering_fields = ['created_at', 'updated_at'] def get_serializer_class(self): + + + print(f'----------get_serializer_class-------------------') + print(f'1111111111') + + if self.action == 'list': return TaskListSerializer # list 返回简化字段 return TaskSerializer # retrieve 返回完整字段,含 details @@ -86,14 +104,26 @@ class TaskViewSet(viewsets.ModelViewSet): task = self.get_object() try: - # 异步触发 Celery 任务 - async_result = trigger_task_execution.delay(task.id) - - # 直接返回任务已触发,不访问 async_result 的内容 + # 标记任务为排队状态(前端显示“进行中”) + task.status = 'running' # 前端仍然可理解为“进行中” + task.save(update_fields=['status']) + + # 创建队列记录 + CrawlQueue.objects.create( + task=task, + texts=task.description, + parse_flag=task.parse_flag, + limit=task.limit, + sort_options=[], + status="pending" + ) + + # 触发队列处理任务(异步,单 worker 串行执行) + process_crawl_queue.delay() + return Response({ "success": True, - "task_id": async_result.id, - "message": f"任务 {task.id} 已触发" + "message": f"任务 {task.id} 已加入队列" }, status=status.HTTP_200_OK) except Exception as e: @@ -104,12 +134,23 @@ class TaskViewSet(viewsets.ModelViewSet): @action(detail=True, methods=['post']) def chat(self, request, pk=None): + + print(f'----------chat-------------------') + print(f'222222222222222') + task = self.get_object() user_question = request.data.get("question", "") + + + print(f'----chat--------------user_question={user_question}--------------') + if not user_question: return Response({"success": False, "message": "question 参数不能为空"}, status=400) # 构造结构化文档 + + print(f'----chat--------------task={task}--------------') + all_docs = TaskDetail.objects.filter(task=task) all_docs_list = [] for doc in all_docs: @@ -124,6 +165,9 @@ class TaskViewSet(viewsets.ModelViewSet): "keywords": doc.keywords or "" }) all_docs_json = json.dumps(all_docs_list, ensure_ascii=False) + + + print(f'----chat--------------all_docs_json={all_docs_json}--------------') SYSTEM_PROMPT = """ 你是专业文献问答助手。请严格根据提供的任务文档回答用户问题。 @@ -144,6 +188,9 @@ class TaskViewSet(viewsets.ModelViewSet): # 使用 Django 的 StreamingHttpResponse 返回 response = StreamingHttpResponse(sync_stream(call_model_stream(messages)), content_type="text/event-stream") + + print(f'----chat--------------666666666--------------') + return response from rest_framework import status from rest_framework.response import Response @@ -156,6 +203,11 @@ class TaskDetailViewSet(viewsets.ModelViewSet): search_fields = ['title', 'author', 'site'] def get_queryset(self): + + + print(f'----------get_queryset-------------------') + print(f'33333333333333') + queryset = super().get_queryset() task_id = self.request.query_params.get('task') if task_id and task_id.isdigit(): diff --git a/selenium_django/db.sqlite3 b/selenium_django/db.sqlite3 index 55def6f..769b69c 100755 Binary files a/selenium_django/db.sqlite3 and b/selenium_django/db.sqlite3 differ diff --git a/selenium_django/entrypoint.sh b/selenium_django/entrypoint.sh index d30f480..d0706f7 100755 --- a/selenium_django/entrypoint.sh +++ b/selenium_django/entrypoint.sh @@ -1,10 +1,14 @@ #!/bin/bash # entrypoint.sh -# 启动 Celery Worker -echo "Starting Celery..." -celery -A selenium_django worker -l info --pool=solo & +# 启动 Celery 入队 Worker(可以多线程) +echo "Starting Celery crawler queue worker..." +celery -A selenium_django worker -Q crawler -l info --pool=threads -c 4 & -# 启动 Django +# 启动 Celery 爬虫处理 Worker(顺序执行,单线程) +echo "Starting Celery crawl_worker (sequential)..." +celery -A selenium_django worker -Q crawl_worker -l info --pool=prefork -c 1 & + +# 启动 Django Gunicorn echo "Starting Django..." exec gunicorn selenium_django.wsgi:application --log-level=info --bind 0.0.0.0:8000 diff --git a/selenium_django/selenium_django/settings.py b/selenium_django/selenium_django/settings.py index ae6f959..8e1b13b 100755 --- a/selenium_django/selenium_django/settings.py +++ b/selenium_django/selenium_django/settings.py @@ -11,26 +11,26 @@ https://docs.djangoproject.com/en/5.2/ref/settings/ """ from pathlib import Path - +import os # Build paths inside the project like this: BASE_DIR / 'subdir'. BASE_DIR = Path(__file__).resolve().parent.parent # Celery 配置 -CELERY_BROKER_URL = 'redis://redis:6379/0' -CELERY_RESULT_BACKEND = 'redis://redis:6379/0' +CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL", "redis://redis:6379/0") +CELERY_RESULT_BACKEND = os.environ.get("CELERY_RESULT_BACKEND", "redis://redis:6379/0") CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = 'Asia/Shanghai' # 根据你本地时区调整 # 爬虫api地址 -CRAWL_API_URL = "http://47.83.141.164:5001/crawl" +CRAWL_API_URL = os.environ.get("CRAWL_API_URL", "http://selenium:5000/crawl") # 模型api配置 api_info = { - "model": "glm-4.5", - "base_url": "https://open.bigmodel.cn/api/paas/v4", - "api_key": "ce39bdd4fcf34ec0aec75072bc9ff988.hAp7HZTVUwy7vImn" + "model": os.environ.get("API_MODEL", "glm-4.5"), # 默认值可选 + "base_url": os.environ.get("API_BASE_URL", "https://open.bigmodel.cn/api/paas/v4"), + "api_key": os.environ.get("API_KEY", ""), } # Quick-start development settings - unsuitable for production diff --git a/selenium_vue/Dockerfile b/selenium_vue/Dockerfile index 05c5409..5642cfd 100755 --- a/selenium_vue/Dockerfile +++ b/selenium_vue/Dockerfile @@ -4,9 +4,12 @@ FROM node:18-alpine as builder # 设置工作目录 WORKDIR /app -# 设置构建时环境变量 -ARG VITE_API_BASE_URL=${VITE_API_BASE_URL:-http://localhost:8000/api} -ENV VITE_API_BASE_URL=$VITE_API_BASE_URL +ARG VITE_API_BASE_URL +ARG VITE_CRAWL_URL + +# 设置给构建时 Vite +ENV VITE_API_BASE_URL=${VITE_API_BASE_URL} +ENV VITE_CRAWL_URL=${VITE_CRAWL_URL} # 复制前端代码 COPY frontend-vite/package*.json ./ diff --git a/selenium_vue/frontend-vite/index.html b/selenium_vue/frontend-vite/index.html index 5838f16..ef4c3ea 100755 --- a/selenium_vue/frontend-vite/index.html +++ b/selenium_vue/frontend-vite/index.html @@ -12,6 +12,4 @@