# coding=utf-8 import csv import json import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import time import random import traceback from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchWindowException, WebDriverException from selenium.webdriver.common.action_chains import ActionChains from config import create_browser,_scroll_into_view def find_valid_detail_tab(driver, origin_handle, timeout=10): """从现有句柄中挑出一个不是 chrome:// 且标题包含知网的标签页。""" end_time = time.time() + timeout while time.time() < end_time: for handle in driver.window_handles: if handle != origin_handle: try: driver.switch_to.window(handle) current_url = driver.current_url current_title = driver.title if not current_url.startswith("chrome://") and ("知网" in current_title or "CNKI" in current_title.upper()): print(f"[知网切换窗口] 成功 → {current_title}") return handle except Exception: pass time.sleep(0.5) raise Exception("未能在规定时间内找到有效详情页窗口") # ---------主函数 --------- def extract_row_info(row, driver): """抓取单条记录信息并返回字典""" td_name = None for _ in range(3): try: td_name = row.find_element(By.CSS_SELECTOR, 'td.name') break except Exception: time.sleep(0.3) if not td_name: return None a_tags = td_name.find_elements(By.TAG_NAME, 'a') if not a_tags: return None link_elem = a_tags[0] title = (link_elem.text or "").strip() if not title: return None try: author = row.find_element(By.CSS_SELECTOR, 'td.author').text except Exception: author = "" try: source = row.find_element(By.CSS_SELECTOR, 'td.source').text except Exception: source = "" try: date = row.find_element(By.CSS_SELECTOR, 'td.date').text except Exception: date = "" try: quote = row.find_element(By.CSS_SELECTOR, 'td.quote').text except Exception: quote = "" try: download = row.find_element(By.CSS_SELECTOR, 'td.download').text except Exception: download = "" print(f"作者:{author}") print(f"来源:{source}") print(f"出版时间:{date}") print(f"被引频次:{quote}") print(f"下载次数:{download}") print("-" * 50) try: origin = driver.current_window_handle except Exception: print("[警告] 当前窗口不可用") return None existing_handles = driver.window_handles.copy() try: _scroll_into_view(driver, link_elem) link_elem.click() except Exception: try: ActionChains(driver).move_to_element(link_elem).pause(0.1).click(link_elem).perform() except Exception: try: driver.execute_script("arguments[0].click();", link_elem) except Exception: print("[警告] 点击失败") return None try: WebDriverWait(driver, 10).until(EC.new_window_is_opened(existing_handles)) except TimeoutException: print("[警告] 未检测到新窗口,跳过") return None originalLink = "" keywords = [] summary = "" try: detail_tab = find_valid_detail_tab(driver, origin) if detail_tab not in driver.window_handles: print("[警告] 新窗口不存在") return None try: driver.switch_to.window(detail_tab) time.sleep(0.5) originalLink = driver.current_url except Exception: print("[警告] 无法切换到新窗口") return None try: keywords = [kw.text for kw in driver.find_elements( By.XPATH, "//span[@class='rowtit' and text()='关键词:']/following-sibling::p[@class='keywords']/a" )] except Exception: keywords = [] try: summary = driver.find_element(By.XPATH, '//*[@id="ChDivSummary"]').text except Exception: summary = "" print(f"关键词{keywords}") print(f"摘要{summary}") finally: try: if driver.current_window_handle != origin: driver.close() except Exception: pass try: driver.switch_to.window(origin) except Exception: print("[警告] 无法切回原窗口") time.sleep(random.uniform(0.5, 1.5)) return { "title": title, "author": author, "source": source, "date": date, "site": "知网", "originalLink": originalLink, "quote": quote, "download": download, "keywords": keywords, "summary": summary } def crawl_current_sort(driver, limit): """抓取当前排序下的 limit 条记录""" fetched_count = 0 results = [] while fetched_count < limit: try: WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CSS_SELECTOR, '#gridTable table tbody')) ) except TimeoutException: print("[警告] 本页结果表格未出现,尝试继续") time.sleep(1) rows = driver.find_elements(By.CSS_SELECTOR, '#gridTable > div > div > div > table > tbody > tr') for row in rows: if fetched_count >= limit: break try: info = extract_row_info(row, driver) if info: results.append(info) fetched_count += 1 print(f"[{fetched_count}] {info['title']}") except Exception as e: print(f"[错误] {e}") traceback.print_exc() try: if driver.window_handles: driver.switch_to.window(driver.window_handles[0]) except Exception: pass # 翻页 try: next_btn = driver.find_element(By.ID, "PageNext") if not next_btn.is_enabled() or fetched_count >= limit: break _scroll_into_view(driver, next_btn) try: next_btn.click() except Exception: try: ActionChains(driver).move_to_element(next_btn).pause(0.1).click(next_btn).perform() except Exception: driver.execute_script("arguments[0].click();", next_btn) time.sleep(1) except Exception: print("[INFO] 已到最后一页或翻页失败") break return results def zhiwang(keyword, limit,sort_options=None): """主函数:四种排序抓取""" print(f"[DEBUG][zhiwang] Received parameters: keyword='{keyword}', limit={limit}, sort_options={sort_options}") driver = create_browser() wait = WebDriverWait(driver, 15) all_results = {} if not sort_options: sort_options = ["publication_time"] # 默认相关性 try: driver.get("https://www.cnki.net") wait.until(EC.presence_of_element_located((By.ID, "txt_SearchText"))).send_keys(keyword) driver.find_element(By.CLASS_NAME, "search-btn").click() time.sleep(2) for sort_name in sort_options: if sort_name == "publication_time": print("[INFO] 使用发表时间排序(默认)") elif sort_name == "download_count": print("[INFO] 使用下载量排序") try: download = driver.find_element(By.XPATH, '//ul[@id="orderList"]/li[text()="下载"]') download.click() except Exception: print(f"[WARN] 点击排序 {sort_name} 失败") elif sort_name == "cited_count": print("[INFO] 使用被引量排序") try: download = driver.find_element(By.XPATH, '//ul[@id="orderList"]/li[text()="被引"]') download.click() except Exception: print(f"[WARN] 点击排序 {sort_name} 失败") elif sort_name == "relevance": print("[INFO] 使用相关度排序") try: relevance = driver.find_element(By.XPATH, '//ul[@id="orderList"]/li[text()="相关度"]') relevance.click() except Exception: print(f"[WARN] 点击排序 {sort_name} 失败") time.sleep(1) results = crawl_current_sort(driver, limit) all_results[sort_name] = results finally: try: driver.quit() except Exception: pass print("[DONE] PDF处理完成") print(json.dumps(all_results, ensure_ascii=False, indent=2)) return all_results if __name__ == '__main__': keyword = "graphrag" limit=10 zhiwang(keyword,limit,["relevance", "publication_time"])