243 lines
9.2 KiB
Python
Executable File
243 lines
9.2 KiB
Python
Executable File
# coding=utf-8
|
|
import json
|
|
import time
|
|
import random
|
|
import sys
|
|
import os
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
import traceback
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.common.exceptions import TimeoutException
|
|
from selenium.webdriver.common.action_chains import ActionChains
|
|
from config import create_browser,_scroll_into_view
|
|
def find_valid_detail_tab(driver, origin_handle, timeout=10):
|
|
"""从现有句柄中挑出一个不是原始窗口的有效新标签页。"""
|
|
end_time = time.time() + timeout
|
|
while time.time() < end_time:
|
|
for handle in driver.window_handles:
|
|
if handle != origin_handle:
|
|
try:
|
|
driver.switch_to.window(handle)
|
|
current_url = driver.current_url
|
|
if not current_url.startswith("chrome://") and current_url != "about:blank":
|
|
print(f"[万方切换窗口] 成功 → {driver.title}")
|
|
return handle
|
|
except Exception:
|
|
pass
|
|
time.sleep(0.5)
|
|
raise Exception("未能在规定时间内找到有效详情页窗口")
|
|
# ---------主函数 ---------
|
|
def extract_row_info(row, driver):
|
|
"""抓取单条记录信息并返回字典"""
|
|
try:
|
|
type_text = row.find_element(By.XPATH, 'td[6]').text.strip()
|
|
except Exception:
|
|
type_text = ""
|
|
# 如果不是期刊论文,直接跳过
|
|
if type_text != "期刊论文":
|
|
return None
|
|
|
|
title_element= row.find_element(By.XPATH, 'td[2]/span[1]')
|
|
title = row.find_element(By.XPATH, 'td[2]/span[1]').text.strip()
|
|
|
|
|
|
author_area=row.find_element(By.XPATH,'td[3]')
|
|
authors = author_area.find_elements(By.XPATH, ".//span[@class='authors'][not(contains(text(),'年'))]")
|
|
author_names = [a.text for a in authors]
|
|
# 获取期刊来源
|
|
source = row.find_element(By.XPATH, "td[4]/span").text
|
|
# 获取期刊时间
|
|
date = row.find_element(By.XPATH, 'td[5]').text
|
|
# 获取期刊引用次数
|
|
quote= row.find_element(By.XPATH, 'td[7]').text
|
|
# 获取期刊下载次数
|
|
download = row.find_element(By.XPATH, 'td[8]').text
|
|
|
|
print("类型:", type_text)
|
|
print("论文名称", title)
|
|
print("作者列表:", author_names)
|
|
print("期刊来源:", source)
|
|
print("时间:", date)
|
|
print("引用次数:", quote)
|
|
print("下载次数:", download)
|
|
time.sleep(1)
|
|
origin = driver.current_window_handle
|
|
existing_handles = driver.window_handles
|
|
try:
|
|
_scroll_into_view(driver, title_element)
|
|
title_element.click()
|
|
except Exception:
|
|
try:
|
|
ActionChains(driver).move_to_element(title_element).pause(0.1).click(title_element).perform()
|
|
except Exception:
|
|
driver.execute_script("arguments[0].click();", title_element)
|
|
|
|
try:
|
|
WebDriverWait(driver, 10).until(EC.new_window_is_opened(existing_handles))
|
|
except TimeoutException:
|
|
print("[警告] 未检测到新窗口,跳过")
|
|
return None
|
|
|
|
try:
|
|
detail_tab = find_valid_detail_tab(driver, origin)
|
|
if detail_tab not in driver.window_handles:
|
|
return None
|
|
driver.switch_to.window(detail_tab)
|
|
time.sleep(1)
|
|
|
|
originalLink = driver.current_url
|
|
print("详情页链接:", originalLink)
|
|
|
|
# 尝试获取摘要
|
|
summary_text = ""
|
|
try:
|
|
summary_container = driver.find_element(By.CSS_SELECTOR, "#essential > div.detailList > div.summary.list")
|
|
text_span = summary_container.find_element(By.CSS_SELECTOR, "span.text-overflow > span > span")
|
|
summary_text = text_span.text
|
|
except Exception:
|
|
# 没找到元素,就保持 summary_text 为空
|
|
print("[警告] 摘要信息未找到")
|
|
|
|
# 判断折叠按钮
|
|
try:
|
|
expand_btn = summary_container.find_element(By.CSS_SELECTOR,
|
|
"span.slot-box > span.abstractIcon.btn[title='查看全部']")
|
|
driver.execute_script("arguments[0].click();", expand_btn)
|
|
time.sleep(1)
|
|
summary_text = text_span.text
|
|
except Exception:
|
|
pass
|
|
print("摘要:", summary_text)
|
|
|
|
#获取关键词信息
|
|
# 定位关键词容器
|
|
keyword_container = driver.find_element(By.CSS_SELECTOR, "#essential > div.detailList > div.keyword.list")
|
|
# 定位里面所有的 span
|
|
keyword_elements = keyword_container.find_elements(By.CSS_SELECTOR, "div.itemKeyword a span")
|
|
# 提取文本
|
|
keywords = [el.text.strip() for el in keyword_elements]
|
|
print("关键词:", keywords)
|
|
time.sleep(1)
|
|
finally:
|
|
# 仅关闭非原窗口
|
|
if driver.current_window_handle != origin:
|
|
driver.close()
|
|
driver.switch_to.window(origin)
|
|
time.sleep(random.uniform(0.5, 1.5))
|
|
|
|
return {
|
|
"title": title, # 确保函数里有定义
|
|
"author": author_names,
|
|
"source": source,
|
|
"date": date,
|
|
"site":"万方",
|
|
"quote": quote,
|
|
"originalLink": originalLink,
|
|
"download": download,
|
|
"keywords": keywords,
|
|
"summary": summary_text
|
|
}
|
|
def crawl_current_sort(driver, limit):
|
|
"""抓取当前排序下的 limit 条记录"""
|
|
fetched_count = 0
|
|
results = []
|
|
|
|
while fetched_count < limit:
|
|
try:
|
|
WebDriverWait(driver, 10).until(
|
|
EC.presence_of_element_located((By.CSS_SELECTOR, '#anxs-logoName_sns'))
|
|
)
|
|
except TimeoutException:
|
|
print("[警告] 本页结果表格未出现,尝试继续")
|
|
time.sleep(2)
|
|
|
|
rows = driver.find_elements(By.XPATH, '/ html / body / div[5] / div / div[3] / div[2] / div / div[4] / div[2] / div[1] / table / tbody / tr')
|
|
|
|
for row in rows:
|
|
if fetched_count >= limit:
|
|
break
|
|
try:
|
|
info = extract_row_info(row, driver)
|
|
if info:
|
|
results.append(info)
|
|
time.sleep(2)
|
|
fetched_count += 1
|
|
|
|
except Exception as e:
|
|
print(f"[错误] {e}")
|
|
traceback.print_exc()
|
|
try:
|
|
if driver.window_handles:
|
|
driver.switch_to.window(driver.window_handles[0])
|
|
except Exception:
|
|
pass
|
|
|
|
# 翻页
|
|
try:
|
|
next_btn = driver.find_element(By.XPATH, "/html/body/div[5]/div/div[3]/div[2]/div/div[3]/div[2]/div[4]/span[3]")
|
|
if not next_btn.is_enabled() or fetched_count >= limit:
|
|
break
|
|
_scroll_into_view(driver, next_btn)
|
|
try:
|
|
next_btn.click()
|
|
except Exception:
|
|
try:
|
|
ActionChains(driver).move_to_element(next_btn).pause(0.1).click(next_btn).perform()
|
|
except Exception:
|
|
driver.execute_script("arguments[0].click();", next_btn)
|
|
time.sleep(1)
|
|
except Exception:
|
|
print("[INFO] 已到最后一页或翻页失败")
|
|
break
|
|
|
|
return results
|
|
def wangfang(keyword, limit, sort_options=None):
|
|
"""主函数:三种排序抓取"""
|
|
driver = create_browser()
|
|
wait = WebDriverWait(driver, 15)
|
|
all_results = {}
|
|
if not sort_options:
|
|
sort_options = ["relevance"] # 默认相关性
|
|
try:
|
|
driver.get("https://www.wanfangdata.com.cn/index.html")
|
|
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "#search-input"))).send_keys(keyword)
|
|
driver.find_element(By.CLASS_NAME, "search-icon").click()
|
|
time.sleep(1)
|
|
#切换展示模式
|
|
element=driver.find_element(By.CLASS_NAME, "toggle-table-list")
|
|
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", element)
|
|
time.sleep(2)
|
|
element.click()
|
|
for sort_name in sort_options:
|
|
if sort_name == "relevance":
|
|
print("[INFO] 使用相关性排序(默认)")
|
|
elif sort_name == "download_count":
|
|
print("[INFO] 使用下载量排序")
|
|
try:
|
|
driver.find_element(By.XPATH, '//span[text()="被引频次"]').click()
|
|
except Exception:
|
|
print(f"[WARN] 点击排序 {sort_name} 失败")
|
|
elif sort_name == "publication_time":
|
|
print("[INFO] 使用时间排序")
|
|
try:
|
|
driver.find_element(By.XPATH, '//span[text()="出版时间"]').click()
|
|
except Exception:
|
|
print(f"[WARN] 点击排序 {sort_name} 失败")
|
|
time.sleep(1)
|
|
results = crawl_current_sort(driver, limit)
|
|
all_results[sort_name] = results
|
|
|
|
finally:
|
|
try:
|
|
driver.quit()
|
|
except Exception:
|
|
pass
|
|
print("[DONE] PDF处理完成")
|
|
print(json.dumps(all_results, ensure_ascii=False, indent=2))
|
|
return all_results
|
|
if __name__ == '__main__':
|
|
keyword = "知识图谱"
|
|
limit=100
|
|
wangfang(keyword,limit, ["relevance", "publication_time"]) |