245 lines
8.2 KiB
Python
Executable File
245 lines
8.2 KiB
Python
Executable File
# coding=utf-8
|
|
import csv
|
|
import json
|
|
import sys
|
|
import os
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
import random
|
|
import time
|
|
from bs4 import BeautifulSoup
|
|
|
|
import traceback
|
|
from config import create_browser,_scroll_into_view
|
|
from selenium.webdriver import ActionChains
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.select import Select
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.common.exceptions import (
|
|
TimeoutException,
|
|
NoSuchElementException,
|
|
)
|
|
|
|
# coding=utf-8
|
|
import csv
|
|
import json
|
|
from bs4 import BeautifulSoup
|
|
|
|
import os
|
|
import random
|
|
import time
|
|
import traceback
|
|
from config import create_browser,_scroll_into_view
|
|
from selenium.webdriver import ActionChains
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.select import Select
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.common.exceptions import (
|
|
TimeoutException,
|
|
NoSuchElementException,
|
|
)
|
|
|
|
def extract_row_info(row, driver):
|
|
"""抓取单条记录信息并返回字典"""
|
|
_scroll_into_view(driver, row)
|
|
# 原始链接
|
|
try:
|
|
originalElem = row.find_element(By.CSS_SELECTOR, "div > p > a")
|
|
originalLink=originalElem.get_attribute("href")
|
|
except Exception:
|
|
originalLink = ""
|
|
# 标题
|
|
try:
|
|
title = row.find_element(By.CSS_SELECTOR, "p.title.is-5.mathjax").text.strip()
|
|
except Exception:
|
|
title = ""
|
|
# 作者
|
|
try:
|
|
authors = [a.text.strip() for a in
|
|
row.find_element(By.CSS_SELECTOR, "p.authors").find_elements(By.TAG_NAME, "a")]
|
|
except Exception:
|
|
authors = []
|
|
|
|
|
|
|
|
# 提取时间
|
|
try:
|
|
info_p = row.find_element(By.CSS_SELECTOR, "p.is-size-7").text
|
|
date = ""
|
|
for part in info_p.split(";"):
|
|
if "Submitted" in part:
|
|
date = part.replace("Submitted", "").strip()
|
|
break
|
|
except Exception:
|
|
date = ""
|
|
print("原文url:", originalLink)
|
|
print("论文标题:", title)
|
|
print("作者列表:", authors)
|
|
|
|
print("提交时间:", date)
|
|
time.sleep(1)
|
|
try:
|
|
# 点击打开新页面
|
|
originalElem.click()
|
|
time.sleep(2)
|
|
# pdf链接
|
|
try:
|
|
pdf_link_elem = WebDriverWait(driver, 10).until(
|
|
EC.presence_of_element_located((By.CSS_SELECTOR, "a.abs-button.download-pdf"))
|
|
)
|
|
# 获取 href
|
|
pdf_link = pdf_link_elem.get_attribute("href")
|
|
except Exception:
|
|
pdf_link = ""
|
|
print("PDF 链接:", pdf_link)
|
|
# 获取摘要的 innerHTML
|
|
abstract_elem = driver.find_element(By.CSS_SELECTOR, "blockquote.abstract.mathjax")
|
|
html_text = abstract_elem.get_attribute("innerHTML").replace("<br>", "\n").strip()
|
|
|
|
# 使用 BeautifulSoup 去除所有标签
|
|
soup = BeautifulSoup(html_text, "html.parser")
|
|
summary_text = soup.get_text().strip()
|
|
|
|
except Exception as e:
|
|
pdf_link = ""
|
|
summary_text = ""
|
|
print("[错误] 获取摘要失败:", e)
|
|
|
|
finally:
|
|
# 回退
|
|
try:
|
|
driver.back()
|
|
time.sleep(1)
|
|
except Exception as e:
|
|
print("[警告] 页面回退失败:", e)
|
|
print("摘要:", summary_text)
|
|
time.sleep(1)
|
|
|
|
return {
|
|
"title": title, # 确保在函数内有定义
|
|
"author": authors,
|
|
"site":"arxiv",
|
|
"originalLink": originalLink,
|
|
"pdfUrl": pdf_link,
|
|
"date":date,
|
|
"summary": summary_text
|
|
}
|
|
def crawl_current_sort(driver, limit):
|
|
"""抓取当前排序下的 limit 条记录"""
|
|
fetched_count = 0
|
|
results = []
|
|
|
|
while fetched_count < limit:
|
|
try:
|
|
WebDriverWait(driver, 10).until(
|
|
EC.presence_of_element_located(
|
|
(By.CSS_SELECTOR, '#main-container > div.content > ol > li:nth-child(1) > div > p > a'))
|
|
)
|
|
except TimeoutException:
|
|
print("[警告] 本页结果表格未出现,尝试继续")
|
|
time.sleep(2)
|
|
|
|
rows = driver.find_elements(By.CSS_SELECTOR, '#main-container > div.content > ol > li')
|
|
for idx, row in enumerate(rows, 1):
|
|
|
|
if fetched_count >= limit:
|
|
break
|
|
try:
|
|
info = extract_row_info(row, driver)
|
|
if isinstance(info, dict):
|
|
results.append(info)
|
|
fetched_count += 1
|
|
time.sleep(random.uniform(0.5, 1.2))
|
|
except Exception as e:
|
|
print(f"[错误] {e}")
|
|
traceback.print_exc()
|
|
try:
|
|
if driver.window_handles:
|
|
driver.switch_to.window(driver.window_handles[0])
|
|
except Exception:
|
|
pass
|
|
if fetched_count >= limit:
|
|
break
|
|
# 翻页
|
|
try:
|
|
# 尝试定位唯一的下一页按钮(支持不同分页情况)
|
|
next_btn = driver.find_element(
|
|
By.CSS_SELECTOR,
|
|
"#main-container > div.content > nav:nth-child(3) > a.pagination-next" # 修复了多余空格
|
|
)
|
|
# 判断是否超过限制
|
|
if not next_btn.is_enabled() :
|
|
break
|
|
_scroll_into_view(driver, next_btn)
|
|
try:
|
|
next_btn.click()
|
|
except Exception:
|
|
try:
|
|
ActionChains(driver).move_to_element(next_btn).pause(0.1).click(next_btn).perform()
|
|
except Exception:
|
|
driver.execute_script("arguments[0].click();", next_btn)
|
|
print("进入下一页")
|
|
time.sleep(random.uniform(1, 1.5))
|
|
except Exception:
|
|
print("[INFO] 已到最后一页或翻页失败")
|
|
break
|
|
return results
|
|
def arxiv(keyword, limit, sort_options=None):
|
|
|
|
driver = create_browser()
|
|
wait = WebDriverWait(driver, 15)
|
|
all_results = {}
|
|
if not sort_options:
|
|
sort_options = ["publication_time"] # 默认时间
|
|
try:
|
|
driver.get("https://arxiv.org/")
|
|
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "#header > div.search-block.level-right > form > div > div:nth-child(1) > input"))).send_keys(keyword)
|
|
driver.find_element(By.CSS_SELECTOR, "#header > div.search-block.level-right > form > div > button").click()
|
|
time.sleep(5)
|
|
|
|
for sort_name in sort_options:
|
|
if sort_name == "publication_time":
|
|
print("[INFO] 使用时间排序(默认)")
|
|
# Arxiv 默认就是时间,不需要额外点击
|
|
pass
|
|
|
|
elif sort_name == "relevance":
|
|
print("[INFO] 切换到最新排序")
|
|
try:
|
|
# 点击排序下拉框
|
|
order_select_elem = WebDriverWait(driver, 10).until(
|
|
EC.presence_of_element_located((By.ID, "order"))
|
|
)
|
|
order_select = Select(order_select_elem)
|
|
time.sleep(1)
|
|
target_text = "Relevance"
|
|
for option in order_select.options:
|
|
if option.text.strip().lower() == target_text.lower():
|
|
order_select.select_by_value(option.get_attribute("value"))
|
|
print(f"已选择排序: {option.text} -> {option.get_attribute('value')}")
|
|
break
|
|
time.sleep(2)
|
|
except Exception as e:
|
|
print(f"[WARN] 切换最新排序失败: {e}")
|
|
|
|
# 抓取当前排序的结果
|
|
results = crawl_current_sort(driver, limit)
|
|
all_results[sort_name] = results
|
|
|
|
finally:
|
|
try:
|
|
driver.quit()
|
|
except Exception:
|
|
pass
|
|
print("[DONE] arxiv 抓取完成")
|
|
|
|
print(json.dumps(all_results, ensure_ascii=False, indent=2))
|
|
return all_results
|
|
|
|
if __name__ == '__main__':
|
|
keyword = "graphrag"
|
|
limit = 100
|
|
arxiv(keyword, limit, ["relevance"])
|
|
|