import imaplib import email from email.header import decode_header import os import pandas as pd from datetime import datetime MAIL_ACCOUNTS = { "qq": { "email_user": "510429074@qq.com", "email_pass": "ynlsbbrmhqsnbief", "imap_server": "imap.qq.com", }, "163": { "email_user": "17279617782@163.com", "email_pass": "RDbwq3yRppmDn9x6", "imap_server": "imap.163.com", } } class MailFetcher: def __init__(self, email_user, email_pass, imap_server, port=993): self.email_user = email_user self.email_pass = email_pass self.imap_server = imap_server self.port = port self.connection = None def connect(self): try: self.connection = imaplib.IMAP4_SSL(self.imap_server, self.port) self.connection.login(self.email_user, self.email_pass) print(f"✅ 登录成功: {self.email_user}") except Exception as e: print(f"❌ 邮箱登录失败: {e}") self.connection = None def list_mailboxes(self): folders = [] if self.connection: status, mailboxes = self.connection.list() if status == "OK": print("📂 邮箱文件夹列表:") for m in mailboxes: try: raw_name = m.decode().split(' "/" ')[-1].strip('"') decoded_name = imaplib.IMAP4._decode_utf7(raw_name) print(f"- 原始: {raw_name} | 解码: {decoded_name}") folders.append(raw_name) except Exception: print(m.decode()) return folders def _decode_str(self, s): if not s: return "" decoded_parts = decode_header(s) decoded_str = "" for part, enc in decoded_parts: if isinstance(part, bytes): decoded_str += part.decode(enc or "utf-8", errors="ignore") else: decoded_str += part return decoded_str def fetch_all_emails(self, limit=None, download_attachments=True, start_date=None, end_date=None): emails = [] if not self.connection: print("⚠️ 未建立IMAP连接") return emails folders = self.list_mailboxes() for folder in folders: try: status, _ = self.connection.select(f'"{folder}"') if status != "OK": print(f"❌ 无法选择文件夹: {folder}") continue print(f"✅ 选择文件夹: {folder}") status, data = self.connection.search(None, "ALL") if status != "OK" or not data or not data[0]: print(f"📭 文件夹 {folder} 没有邮件") continue email_ids = data[0].split() latest_ids = email_ids[-limit:] if limit else email_ids for eid in reversed(latest_ids): status, msg_data = self.connection.fetch(eid, "(RFC822)") if status != "OK": continue msg = email.message_from_bytes(msg_data[0][1]) mail_from = self._decode_str(msg.get("From")) mail_subject = self._decode_str(msg.get("Subject")) mail_date = msg.get("Date") try: parsed_date = datetime.strptime(mail_date, "%a, %d %b %Y %H:%M:%S %z") except: parsed_date = None if parsed_date: if start_date and parsed_date < datetime.strptime(start_date, "%Y-%m-%d").replace(tzinfo=parsed_date.tzinfo): continue if end_date and parsed_date > datetime.strptime(end_date, "%Y-%m-%d").replace(tzinfo=parsed_date.tzinfo): continue mail_body = "" attachments = [] if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() filename = part.get_filename() if filename and download_attachments: filename = self._decode_str(filename) os.makedirs("attachments", exist_ok=True) save_path = os.path.join("attachments", filename) with open(save_path, "wb") as f: f.write(part.get_payload(decode=True)) attachments.append(save_path) elif content_type == "text/plain" and not filename: charset = part.get_content_charset() or "utf-8" try: mail_body += part.get_payload(decode=True).decode(charset, errors="ignore") except: mail_body += part.get_payload(decode=True).decode("utf-8", errors="ignore") else: charset = msg.get_content_charset() or "utf-8" mail_body = msg.get_payload(decode=True).decode(charset, errors="ignore") emails.append({ "发送时间": mail_date, "发送人": mail_from, "接收人": self.email_user, "原文内容": mail_body, "附件": ", ".join(attachments) if attachments else "无" }) except Exception as e: print(f"❌ 处理文件夹 {folder} 失败: {e}") return emails def logout(self): if self.connection: try: try: self.connection.close() except: pass self.connection.logout() print("🔌 已断开连接") except Exception: print("🔌 断开连接失败") # ------------------- 主逻辑 ------------------- def fetch_emails(account_key, limit=None, start_date=None, end_date=None, download_attachments=True, save_excel="emails.xlsx"): if account_key not in MAIL_ACCOUNTS: print(f"❌ 未找到邮箱配置: {account_key}") return [] config = MAIL_ACCOUNTS[account_key] fetcher = MailFetcher(**config) fetcher.connect() emails = fetcher.fetch_all_emails(limit=limit, download_attachments=download_attachments, start_date=start_date, end_date=end_date) fetcher.logout() if emails: df = pd.DataFrame(emails, columns=["发送时间", "发送人", "接收人", "原文内容", "附件"]) df.to_excel(save_excel, index=False) print(f"✅ 已导出 {len(emails)} 封邮件到 {save_excel}") else: print("ℹ️ 没有符合条件的邮件导出") return emails if __name__ == "__main__": EMAIL_TYPE = "163" # qq 或 163 LIMIT_COUNT = 20 # 限制抓取数量 DOWNLOAD_ATTACHMENTS = True # 是否下载附件 START_DATE = "2025-07-01" # 起始时间 END_DATE = "2025-08-14" # 截_ result = fetch_emails( EMAIL_TYPE, limit=LIMIT_COUNT, start_date=START_DATE, end_date=END_DATE, download_attachments=DOWNLOAD_ATTACHMENTS ) if result: for mail in result: print(f"📅 发送时间: {mail['发送时间']}") print(f"📧 发送人: {mail['发送人']}") print(f"📨 接收人: {mail['接收人']}") print(f"📜 原文内容: {mail['原文内容'][:1000]}...") print(f"📎 附件: {mail['附件']}") print("-" * 50)