sdk/qq_mail/qqmail.py

201 lines
7.9 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import imaplib
import email
from email.header import decode_header
import os
import pandas as pd
from datetime import datetime
MAIL_ACCOUNTS = {
"qq": {
"email_user": "510429074@qq.com",
"email_pass": "ynlsbbrmhqsnbief",
"imap_server": "imap.qq.com",
},
"163": {
"email_user": "17279617782@163.com",
"email_pass": "RDbwq3yRppmDn9x6",
"imap_server": "imap.163.com",
}
}
class MailFetcher:
def __init__(self, email_user, email_pass, imap_server, port=993):
self.email_user = email_user
self.email_pass = email_pass
self.imap_server = imap_server
self.port = port
self.connection = None
def connect(self):
try:
self.connection = imaplib.IMAP4_SSL(self.imap_server, self.port)
self.connection.login(self.email_user, self.email_pass)
print(f"✅ 登录成功: {self.email_user}")
except Exception as e:
print(f"❌ 邮箱登录失败: {e}")
self.connection = None
def list_mailboxes(self):
folders = []
if self.connection:
status, mailboxes = self.connection.list()
if status == "OK":
print("📂 邮箱文件夹列表:")
for m in mailboxes:
try:
raw_name = m.decode().split(' "/" ')[-1].strip('"')
decoded_name = imaplib.IMAP4._decode_utf7(raw_name)
print(f"- 原始: {raw_name} | 解码: {decoded_name}")
folders.append(raw_name)
except Exception:
print(m.decode())
return folders
def _decode_str(self, s):
if not s:
return ""
decoded_parts = decode_header(s)
decoded_str = ""
for part, enc in decoded_parts:
if isinstance(part, bytes):
decoded_str += part.decode(enc or "utf-8", errors="ignore")
else:
decoded_str += part
return decoded_str
def fetch_all_emails(self, limit=None, download_attachments=True, start_date=None, end_date=None):
emails = []
if not self.connection:
print("⚠️ 未建立IMAP连接")
return emails
folders = self.list_mailboxes()
for folder in folders:
try:
status, _ = self.connection.select(f'"{folder}"')
if status != "OK":
print(f"❌ 无法选择文件夹: {folder}")
continue
print(f"✅ 选择文件夹: {folder}")
status, data = self.connection.search(None, "ALL")
if status != "OK" or not data or not data[0]:
print(f"📭 文件夹 {folder} 没有邮件")
continue
email_ids = data[0].split()
latest_ids = email_ids[-limit:] if limit else email_ids
for eid in reversed(latest_ids):
status, msg_data = self.connection.fetch(eid, "(RFC822)")
if status != "OK":
continue
msg = email.message_from_bytes(msg_data[0][1])
mail_from = self._decode_str(msg.get("From"))
mail_subject = self._decode_str(msg.get("Subject"))
mail_date = msg.get("Date")
try:
parsed_date = datetime.strptime(mail_date, "%a, %d %b %Y %H:%M:%S %z")
except:
parsed_date = None
if parsed_date:
if start_date and parsed_date < datetime.strptime(start_date, "%Y-%m-%d").replace(tzinfo=parsed_date.tzinfo):
continue
if end_date and parsed_date > datetime.strptime(end_date, "%Y-%m-%d").replace(tzinfo=parsed_date.tzinfo):
continue
mail_body = ""
attachments = []
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
filename = part.get_filename()
if filename and download_attachments:
filename = self._decode_str(filename)
os.makedirs("attachments", exist_ok=True)
save_path = os.path.join("attachments", filename)
with open(save_path, "wb") as f:
f.write(part.get_payload(decode=True))
attachments.append(save_path)
elif content_type == "text/plain" and not filename:
charset = part.get_content_charset() or "utf-8"
try:
mail_body += part.get_payload(decode=True).decode(charset, errors="ignore")
except:
mail_body += part.get_payload(decode=True).decode("utf-8", errors="ignore")
else:
charset = msg.get_content_charset() or "utf-8"
mail_body = msg.get_payload(decode=True).decode(charset, errors="ignore")
emails.append({
"发送时间": mail_date,
"发送人": mail_from,
"接收人": self.email_user,
"原文内容": mail_body,
"附件": ", ".join(attachments) if attachments else ""
})
except Exception as e:
print(f"❌ 处理文件夹 {folder} 失败: {e}")
return emails
def logout(self):
if self.connection:
try:
try:
self.connection.close()
except:
pass
self.connection.logout()
print("🔌 已断开连接")
except Exception:
print("🔌 断开连接失败")
# ------------------- 主逻辑 -------------------
def fetch_emails(account_key, limit=None, start_date=None, end_date=None, download_attachments=True, save_excel="emails.xlsx"):
if account_key not in MAIL_ACCOUNTS:
print(f"❌ 未找到邮箱配置: {account_key}")
return []
config = MAIL_ACCOUNTS[account_key]
fetcher = MailFetcher(**config)
fetcher.connect()
emails = fetcher.fetch_all_emails(limit=limit, download_attachments=download_attachments, start_date=start_date, end_date=end_date)
fetcher.logout()
if emails:
df = pd.DataFrame(emails, columns=["发送时间", "发送人", "接收人", "原文内容", "附件"])
df.to_excel(save_excel, index=False)
print(f"✅ 已导出 {len(emails)} 封邮件到 {save_excel}")
else:
print(" 没有符合条件的邮件导出")
return emails
if __name__ == "__main__":
EMAIL_TYPE = "163" # qq 或 163
LIMIT_COUNT = 20 # 限制抓取数量
DOWNLOAD_ATTACHMENTS = True # 是否下载附件
START_DATE = "2025-07-01" # 起始时间
END_DATE = "2025-08-14" # 截_
result = fetch_emails(
EMAIL_TYPE,
limit=LIMIT_COUNT,
start_date=START_DATE,
end_date=END_DATE,
download_attachments=DOWNLOAD_ATTACHMENTS
)
if result:
for mail in result:
print(f"📅 发送时间: {mail['发送时间']}")
print(f"📧 发送人: {mail['发送人']}")
print(f"📨 接收人: {mail['接收人']}")
print(f"📜 原文内容: {mail['原文内容'][:1000]}...")
print(f"📎 附件: {mail['附件']}")
print("-" * 50)