selenium_keyan/selenium/utils/pubmed.py

254 lines
9.4 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
import csv
import json
import time
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import random
import traceback
from selenium.webdriver.common.by import By
from selenium.webdriver.support.select import Select
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import (
TimeoutException, NoSuchElementException,
)
from selenium.webdriver.common.action_chains import ActionChains
from config import create_browser,_scroll_into_view
# ---------主函数 ---------
def extract_row_info(row, driver):
"""抓取单条记录信息并返回字典"""
try:
url_elem = row.find_element(By.CSS_SELECTOR, "a.docsum-title")
title = url_elem.text.strip()
originalLink = url_elem.get_attribute("href")
except Exception as e:
title = ""
originalLink = ""
print("[错误] 获取论文标题或链接失败:", e)
#抓取作者以及引用信息
try:
authors = row.find_element(By.XPATH,
".//span[contains(@class,'docsum-authors') and contains(@class,'full-authors')]").text
citation = row.find_element(By.XPATH,
".//span[contains(@class,'docsum-journal-citation') and contains(@class,'full-journal-citation')]").text
except:
citation = ""
authors = ""
print("论文原处:",originalLink)
print("论文标题:", title)
print("作者列表:", authors)
print("论文引用:", citation)
try:
downloadElem = row.find_element(By.XPATH,".//div[contains(@class,'docsum-citation') and contains(@class,'full-citation')]")
downloadText = downloadElem.text
except:
downloadText = ""
time.sleep(1)
url_elem.click()
time.sleep(3) # 等待加载
# 摘要
try:
abstract_elem = driver.find_element(By.CSS_SELECTOR, "#eng-abstract p")
abstract_text = abstract_elem.text.strip()
except NoSuchElementException:
abstract_text = ""
print("摘要:", abstract_text)
# 关键词(可能没有)
try:
keyword_elem = driver.find_element(By.CSS_SELECTOR, "#abstract > p")
keyword_text = keyword_elem.text.replace("Keywords:", "").strip()
except NoSuchElementException:
keyword_text = ""
print("关键词:", keyword_text)
pdf_url = ""
if "Free PMC article" in downloadText:
print("✅ 该文章是免费文章,可以下载")
original_handle = driver.current_window_handle
original_handles = driver.window_handles.copy()
# --- 点击下载按钮 ---
print("[步骤] 跳转下载界面 ...")
try:
pdf_selector = WebDriverWait(driver, 5).until(
EC.presence_of_element_located(
(By.XPATH, "//a[contains(@class,'link-item') and contains(@class,'pmc')]"))
)
except Exception:
print("❌ 找不到 PMC PDF 链接,跳过下载")
pdf_selector = None
if pdf_selector:
try:
pdf_selector.click()
except Exception:
driver.execute_script("arguments[0].click();", pdf_selector)
print("[步骤] 点击完成,等待新窗口/页面...")
# --- 判断是否有新窗口 ---
try:
WebDriverWait(driver, 5).until(lambda d: len(d.window_handles) > len(original_handles))
new_handle = [h for h in driver.window_handles if h not in original_handles][0]
driver.switch_to.window(new_handle)
print("[步骤] 已切换到新窗口:", new_handle)
except TimeoutException:
print("[步骤] 没有新窗口,在当前窗口继续处理。")
time.sleep(1)
# --- 切换后重新查找 PDF 元素 ---
try:
print("[步骤] 等待 PDF 按钮出现...")
pdf_a = WebDriverWait(driver, 10).until(
EC.presence_of_element_located(
(By.XPATH, "//a[contains(@class,'usa-button') and contains(@href,'pdf/')]")
)
)
pdf_url = pdf_a.get_attribute("href")
if pdf_url:
print("📄 PDF 链接:", pdf_url)
except Exception as e:
print("❌ 获取 PDF 失败:", e)
finally:
# --- 关闭并切回原窗口 ---
current = driver.current_window_handle
if current != original_handle:
driver.close()
driver.switch_to.window(original_handle)
time.sleep(1)
print("[步骤] 已切回原窗口。")
else:
print("❌ 该文章不是免费文章")
# 回退到上一级
driver.back()
time.sleep(2) # 等待页面加载完成
return {
"title": title, # 确保函数里有定义
"author": authors,
"site":"pubmed",
"originalLink":originalLink,
"citation": citation,
"pdfUrl": pdf_url,
"keywords": keyword_text,
"summary": abstract_text
}
def crawl_current_sort(driver, limit):
"""抓取当前排序下的 limit 条记录"""
fetched_count = 0
results = []
while fetched_count < limit:
try:
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, '#search-form > div.inner-wrap > a.pubmed-logo > img'))
)
except TimeoutException:
print("[警告] 本页结果表格未出现,尝试继续")
time.sleep(2)
rows = driver.find_elements(By.XPATH, '//*[@id="search-results"]/section/div[2]/div/article')
for row in rows:
if fetched_count >= limit:
break
try:
info = extract_row_info(row, driver)
if info: # 只有 info 有效才追加
results.append(info)
fetched_count += 1
time.sleep(2)
except Exception as e:
print(f"[错误] 抓取 row 失败: {e}")
traceback.print_exc()
# 如果窗口还存在,强制切回主窗口,避免死锁
try:
if driver.window_handles:
driver.switch_to.window(driver.window_handles[0])
except Exception:
pass
continue # 遇错后继续下一个 row不要影响整体
# 翻页
try:
next_btn = driver.find_element(By.CSS_SELECTOR,"#search-results > div.top-wrapper > div.top-pagination > button.button-wrapper.next-page-btn > img.chevron-icon.enabled-icon")
if next_btn.is_enabled() and fetched_count < limit:
_scroll_into_view(driver, next_btn)
try:
next_btn.click()
except Exception:
try:
ActionChains(driver).move_to_element(next_btn).pause(0.1).click(next_btn).perform()
except Exception:
driver.execute_script("arguments[0].click();", next_btn)
time.sleep(random.uniform(1, 2))
else:
break
except NoSuchElementException:
print("[INFO] 已到最后一页或翻页不存在")
break
except Exception as e:
print(f"[错误] 翻页失败: {e}")
break
return results
def pubmed(keyword, limit,sort_options=None):
"""主函数:三种排序抓取"""
driver = create_browser()
wait = WebDriverWait(driver, 15)
all_results = {}
if not sort_options:
sort_options = ["relevance"] # 默认相关性
try:
driver.get("https://pubmed.ncbi.nlm.nih.gov/")
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "#id_term"))).send_keys(keyword)
time.sleep(1)
driver.find_element(By.CSS_SELECTOR, "#search-form > div > div.search-input > div > button").click()
time.sleep(4)
for sort_name in sort_options:
try:
if sort_name == "publication_time":
print("[INFO] 使用时间进行排序")
# 等待下拉框出现
sort_elem = driver.find_element(By.ID, "id_sort")
sort_select = Select(sort_elem)
sort_select.select_by_value("pubdate") # 或 select_by_visible_text("Publication date")
time.sleep(2)
elif sort_name == "relevance":
print("[INFO] 使用相关性排序(默认)")
except Exception as e:
print(f"[WARN] 点击排序 {sort_name} 失败:", e)
# 抓取当前排序下的结果
results = crawl_current_sort(driver, limit)
all_results[sort_name] = results
finally:
try:
driver.quit()
except Exception:
pass
print("[DONE] PDF处理完成")
print(json.dumps(all_results, ensure_ascii=False, indent=2))
return all_results
if __name__ == '__main__':
keyword = "bert"
limit=100
# pubmed(keyword, limit, ["relevance"])
# 搜最新
# pubmed(keyword, limit, ["publication_time"])
# 先相关性再最新
pubmed(keyword, limit, ["relevance", "publication_time"])