selenium_keyan/selenium/utils/springerLink.py

276 lines
9.4 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
import csv
import json
import time
import random
import traceback
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import (
StaleElementReferenceException,
TimeoutException, NoSuchElementException,
)
from selenium.webdriver.common.action_chains import ActionChains
from config import create_browser,_scroll_into_view
def click_element_safe(driver, locator, retries=3, wait_time=10):
"""安全点击元素,防止 StaleElementReference"""
for attempt in range(retries):
try:
elem = WebDriverWait(driver, wait_time).until(
EC.element_to_be_clickable(locator)
)
elem.click()
return elem
except StaleElementReferenceException:
print(f"StaleElementReferenceException, retry {attempt+1}/{retries}")
time.sleep(0.5)
raise Exception("点击失败,元素持续不可用")
# ---------主函数 ---------
def extract_row_info(row, driver):
"""抓取单条记录信息并返回字典"""
_scroll_into_view(driver, row)
# 类型
try:
type_elem = row.find_element(
By.CSS_SELECTOR,
'div.app-card-open__main > div.app-entitlement > div > div > span'
)
type_text = type_elem.text.strip()
print("类型:", type_text)
except Exception:
return None
if type_text.lower() not in ["conference paper", "article"]:
return None
# 标题
try:
title_element = row.find_element(By.CSS_SELECTOR, "div.app-card-open__main h3.app-card-open__heading a")
title = title_element.text.strip()
except Exception:
return None
# 作者
try:
authors_elem = row.find_element(By.CSS_SELECTOR, "div.app-card-open__authors span[data-test='authors']")
authors = authors_elem.text.strip()
except Exception:
authors = None
# 期刊/书籍来源
try:
source_elem = row.find_element(By.CSS_SELECTOR, "div.app-card-open__authors a[data-test='parent']")
source = source_elem.text.strip()
except Exception:
source = None
# 发表年份/提交时间
try:
date_elem = row.find_element(By.CSS_SELECTOR, "div.app-card-open__meta [data-test='published']")
date = date_elem.text.strip()
except Exception:
date = None
print("论文名称:", title)
print("作者:", authors)
print("期刊来源:", source)
print("提交时间:", date)
# 点击标题进入详情页并获取摘要
summary_text = ""
try:
title_locator = (By.CSS_SELECTOR, "div.app-card-open__main h3.app-card-open__heading a")
click_element_safe(driver, title_locator)
# 等待详情页摘要加载
try:
abstract_elem = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'section[data-title="Abstract"]'))
)
# 如果存在“▽ More”展开全文
try:
more_link = abstract_elem.find_element(By.XPATH, ".//a[contains(text(), '')]")
driver.execute_script("arguments[0].click();", more_link)
time.sleep(0.3)
except NoSuchElementException:
pass
summary_text = abstract_elem.text.strip()
if summary_text.startswith("Abstract"):
summary_text = summary_text[len("Abstract"):].lstrip("\n").strip()
except (TimeoutException, NoSuchElementException):
summary_text = ""
print("摘要:", summary_text)
time.sleep(1)
finally:
# 回退到列表页
driver.back()
time.sleep(random.uniform(1.5, 2.5))
return {
"title": title,
"author": authors,
"source": source,
"date": date,
"summary": summary_text
}
def crawl_current_sort(driver, limit):
"""抓取当前排序下的 limit 条记录"""
fetched_count = 0
results = []
while fetched_count < limit:
try:
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, '#search-submit'))
)
except TimeoutException:
print("[警告] 本页结果表格未出现,尝试继续")
time.sleep(2)
rows = driver.find_elements(By.CSS_SELECTOR,'#main > div > div > div > div:nth-child(2) > div:nth-child(2) > ol > li')
for i in range(len(rows)):
row = driver.find_elements(By.CSS_SELECTOR,'#main > div > div > div > div:nth-child(2) > div:nth-child(2) > ol > li')[i]
if fetched_count >= limit:
break
try:
info = extract_row_info(row, driver)
if info:
results.append(info)
time.sleep(2)
fetched_count += 1
except Exception as e:
print(f"[错误] {e}")
traceback.print_exc()
try:
if driver.window_handles:
driver.switch_to.window(driver.window_handles[0])
except Exception:
pass
# 翻页
try:
# 尝试定位唯一的下一页按钮(支持不同分页情况)
next_btn = driver.find_element(
By.CSS_SELECTOR,
"ul.eds-c-pagination a[rel='next'], ul.eds-c-pagination a[data-test='next-page']"
)
# 判断是否超过限制
if not next_btn.is_enabled() or fetched_count >= limit:
break
_scroll_into_view(driver, next_btn)
try:
next_btn.click()
except Exception:
try:
ActionChains(driver).move_to_element(next_btn).pause(0.1).click(next_btn).perform()
except Exception:
driver.execute_script("arguments[0].click();", next_btn)
time.sleep(1)
except Exception:
print("[INFO] 已到最后一页或翻页失败")
break
return results
def springerLink(keyword, limit, sort_options=None):
"""主函数:根据选择的排序抓取 SpringerLink 结果"""
driver = create_browser()
wait = WebDriverWait(driver, 15)
all_results = {}
if not sort_options:
sort_options = ["relevance"] # 默认相关性
try:
driver.get("http://link.springer.com/")
print("网站标题",driver.title)
print("当前 URL:", driver.current_url)
try:
accept_cookies_btn = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable(
(By.CSS_SELECTOR, "button[data-cc-action='accept']")
)
)
accept_cookies_btn.click()
print("[INFO] 已点击 Accept all cookies")
except Exception:
print("[INFO] 没有发现 Cookies 弹窗")
try:
search_input = WebDriverWait(driver, 10).until(
EC.visibility_of_element_located((By.CSS_SELECTOR, "#homepage-search"))
)
except TimeoutException:
print("[ERROR] 搜索框未加载完成")
# 输入搜索关键词
search_input = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "#homepage-search"))
)
driver.execute_script("arguments[0].scrollIntoView(true);", search_input)
search_input.clear()
search_input.send_keys(keyword)
time.sleep(2)
driver.find_element(By.CSS_SELECTOR, "#main > div.app-homepage-hero > div > search > form > div > button").click()
time.sleep(2)
# 遍历用户选择的排序
for sort_name in sort_options:
if sort_name == "relevance":
print("[INFO] 使用相关性排序(默认)")
# SpringerLink 默认就是相关性,不需要额外点击
pass
elif sort_name == "publication_time":
print("[INFO] 切换到最新排序")
try:
# 点击排序下拉框
wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, "#search-select"))).click()
time.sleep(1)
# 点击 "Newest First" 选项
driver.find_element(By.CSS_SELECTOR, "#search-select > option:nth-child(2)").click()
time.sleep(2)
except Exception as e:
print(f"[WARN] 切换最新排序失败: {e}")
# 抓取当前排序的结果
results = crawl_current_sort(driver, limit)
all_results[sort_name] = results
finally:
try:
driver.quit()
except Exception:
pass
print("[DONE] SpringerLink 抓取完成")
print(json.dumps(all_results, ensure_ascii=False, indent=2))
return all_results
if __name__ == '__main__':
keyword = "graphrag"
limit=100
# 默认只搜相关性
# springerLink(keyword, limit, ["relevance"])
# 搜最新
springerLink(keyword, limit, ["publication_time"])
# 先相关性再最新
springerLink(keyword, limit, ["relevance", "publication_time"])