# coding=utf-8 import json import time import random import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import traceback from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException from selenium.webdriver.common.action_chains import ActionChains from config import create_browser,_scroll_into_view def find_valid_detail_tab(driver, origin_handle, timeout=10): """从现有句柄中挑出一个不是原始窗口的有效新标签页。""" end_time = time.time() + timeout while time.time() < end_time: for handle in driver.window_handles: if handle != origin_handle: try: driver.switch_to.window(handle) current_url = driver.current_url if not current_url.startswith("chrome://") and current_url != "about:blank": print(f"[万方切换窗口] 成功 → {driver.title}") return handle except Exception: pass time.sleep(0.5) raise Exception("未能在规定时间内找到有效详情页窗口") # ---------主函数 --------- def extract_row_info(row, driver): """抓取单条记录信息并返回字典""" try: type_text = row.find_element(By.XPATH, 'td[6]').text.strip() except Exception: type_text = "" # 如果不是期刊论文,直接跳过 if type_text != "期刊论文": return None title_element= row.find_element(By.XPATH, 'td[2]/span[1]') title = row.find_element(By.XPATH, 'td[2]/span[1]').text.strip() author_area=row.find_element(By.XPATH,'td[3]') authors = author_area.find_elements(By.XPATH, ".//span[@class='authors'][not(contains(text(),'年'))]") author_names = [a.text for a in authors] # 获取期刊来源 source = row.find_element(By.XPATH, "td[4]/span").text # 获取期刊时间 date = row.find_element(By.XPATH, 'td[5]').text # 获取期刊引用次数 quote= row.find_element(By.XPATH, 'td[7]').text # 获取期刊下载次数 download = row.find_element(By.XPATH, 'td[8]').text print("类型:", type_text) print("论文名称", title) print("作者列表:", author_names) print("期刊来源:", source) print("时间:", date) print("引用次数:", quote) print("下载次数:", download) time.sleep(1) origin = driver.current_window_handle existing_handles = driver.window_handles try: _scroll_into_view(driver, title_element) title_element.click() except Exception: try: ActionChains(driver).move_to_element(title_element).pause(0.1).click(title_element).perform() except Exception: driver.execute_script("arguments[0].click();", title_element) try: WebDriverWait(driver, 10).until(EC.new_window_is_opened(existing_handles)) except TimeoutException: print("[警告] 未检测到新窗口,跳过") return None try: detail_tab = find_valid_detail_tab(driver, origin) if detail_tab not in driver.window_handles: return None driver.switch_to.window(detail_tab) time.sleep(1) originalLink = driver.current_url print("详情页链接:", originalLink) # 尝试获取摘要 summary_text = "" try: summary_container = driver.find_element(By.CSS_SELECTOR, "#essential > div.detailList > div.summary.list") text_span = summary_container.find_element(By.CSS_SELECTOR, "span.text-overflow > span > span") summary_text = text_span.text except Exception: # 没找到元素,就保持 summary_text 为空 print("[警告] 摘要信息未找到") # 判断折叠按钮 try: expand_btn = summary_container.find_element(By.CSS_SELECTOR, "span.slot-box > span.abstractIcon.btn[title='查看全部']") driver.execute_script("arguments[0].click();", expand_btn) time.sleep(1) summary_text = text_span.text except Exception: pass print("摘要:", summary_text) #获取关键词信息 # 定位关键词容器 keyword_container = driver.find_element(By.CSS_SELECTOR, "#essential > div.detailList > div.keyword.list") # 定位里面所有的 span keyword_elements = keyword_container.find_elements(By.CSS_SELECTOR, "div.itemKeyword a span") # 提取文本 keywords = [el.text.strip() for el in keyword_elements] print("关键词:", keywords) time.sleep(1) finally: # 仅关闭非原窗口 if driver.current_window_handle != origin: driver.close() driver.switch_to.window(origin) time.sleep(random.uniform(0.5, 1.5)) return { "title": title, # 确保函数里有定义 "author": author_names, "source": source, "date": date, "site":"万方", "quote": quote, "originalLink": originalLink, "download": download, "keywords": keywords, "summary": summary_text } def crawl_current_sort(driver, limit): """抓取当前排序下的 limit 条记录""" fetched_count = 0 results = [] while fetched_count < limit: try: WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CSS_SELECTOR, '#anxs-logoName_sns')) ) except TimeoutException: print("[警告] 本页结果表格未出现,尝试继续") time.sleep(2) rows = driver.find_elements(By.XPATH, '/ html / body / div[5] / div / div[3] / div[2] / div / div[4] / div[2] / div[1] / table / tbody / tr') for row in rows: if fetched_count >= limit: break try: info = extract_row_info(row, driver) if info: results.append(info) time.sleep(2) fetched_count += 1 except Exception as e: print(f"[错误] {e}") traceback.print_exc() try: if driver.window_handles: driver.switch_to.window(driver.window_handles[0]) except Exception: pass # 翻页 try: next_btn = driver.find_element(By.XPATH, "/html/body/div[5]/div/div[3]/div[2]/div/div[3]/div[2]/div[4]/span[3]") if not next_btn.is_enabled() or fetched_count >= limit: break _scroll_into_view(driver, next_btn) try: next_btn.click() except Exception: try: ActionChains(driver).move_to_element(next_btn).pause(0.1).click(next_btn).perform() except Exception: driver.execute_script("arguments[0].click();", next_btn) time.sleep(1) except Exception: print("[INFO] 已到最后一页或翻页失败") break return results def wangfang(keyword, limit, sort_options=None): """主函数:三种排序抓取""" driver = create_browser() wait = WebDriverWait(driver, 15) all_results = {} if not sort_options: sort_options = ["relevance"] # 默认相关性 try: driver.get("https://www.wanfangdata.com.cn/index.html") wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "#search-input"))).send_keys(keyword) driver.find_element(By.CLASS_NAME, "search-icon").click() time.sleep(1) #切换展示模式 element=driver.find_element(By.CLASS_NAME, "toggle-table-list") driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", element) time.sleep(2) element.click() for sort_name in sort_options: if sort_name == "relevance": print("[INFO] 使用相关性排序(默认)") elif sort_name == "download_count": print("[INFO] 使用下载量排序") try: driver.find_element(By.XPATH, '//span[text()="被引频次"]').click() except Exception: print(f"[WARN] 点击排序 {sort_name} 失败") elif sort_name == "publication_time": print("[INFO] 使用时间排序") try: driver.find_element(By.XPATH, '//span[text()="出版时间"]').click() except Exception: print(f"[WARN] 点击排序 {sort_name} 失败") time.sleep(1) results = crawl_current_sort(driver, limit) all_results[sort_name] = results finally: try: driver.quit() except Exception: pass print("[DONE] PDF处理完成") print(json.dumps(all_results, ensure_ascii=False, indent=2)) return all_results if __name__ == '__main__': keyword = "知识图谱" limit=100 wangfang(keyword,limit, ["relevance", "publication_time"])