216 lines
8.2 KiB
Python
216 lines
8.2 KiB
Python
![]() |
import imaplib
|
|||
|
import email
|
|||
|
from email.header import decode_header
|
|||
|
import os
|
|||
|
|
|||
|
class MailFetcher:
|
|||
|
def __init__(self, email_user, email_pass, imap_server, port=993):
|
|||
|
self.email_user = email_user
|
|||
|
self.email_pass = email_pass
|
|||
|
self.imap_server = imap_server
|
|||
|
self.port = port
|
|||
|
self.connection = None
|
|||
|
|
|||
|
def connect(self):
|
|||
|
try:
|
|||
|
self.connection = imaplib.IMAP4_SSL(self.imap_server, self.port)
|
|||
|
self.connection.login(self.email_user, self.email_pass)
|
|||
|
print(f"✅ 登录成功: {self.email_user}")
|
|||
|
except Exception as e:
|
|||
|
print(f"❌ 邮箱登录失败: {e}")
|
|||
|
self.connection = None
|
|||
|
|
|||
|
def list_mailboxes(self):
|
|||
|
if self.connection:
|
|||
|
status, mailboxes = self.connection.list()
|
|||
|
if status == "OK":
|
|||
|
print("📂 邮箱文件夹列表:")
|
|||
|
for m in mailboxes:
|
|||
|
try:
|
|||
|
raw_name = m.decode().split(' "/" ')[-1].strip('"')
|
|||
|
decoded_name = imaplib.IMAP4._decode_utf7(raw_name)
|
|||
|
print(f"- 原始: {raw_name} | 解码: {decoded_name}")
|
|||
|
except Exception:
|
|||
|
print(m.decode())
|
|||
|
|
|||
|
def _decode_str(self, s):
|
|||
|
if not s:
|
|||
|
return ""
|
|||
|
decoded_parts = decode_header(s)
|
|||
|
decoded_str = ""
|
|||
|
for part, enc in decoded_parts:
|
|||
|
if isinstance(part, bytes):
|
|||
|
decoded_str += part.decode(enc or "utf-8", errors="ignore")
|
|||
|
else:
|
|||
|
decoded_str += part
|
|||
|
return decoded_str
|
|||
|
|
|||
|
def _select_mailbox(self, folder="INBOX"):
|
|||
|
if not self.connection:
|
|||
|
return False
|
|||
|
|
|||
|
# 先尝试直接选择
|
|||
|
try:
|
|||
|
status, _ = self.connection.select(f'"{folder}"')
|
|||
|
if status == "OK":
|
|||
|
print(f"📂 已选择文件夹: {folder}")
|
|||
|
return True
|
|||
|
except:
|
|||
|
pass
|
|||
|
|
|||
|
# 遍历所有文件夹尝试匹配 INBOX / 收件箱
|
|||
|
try:
|
|||
|
status, mailboxes = self.connection.list()
|
|||
|
if status != "OK" or not mailboxes:
|
|||
|
print("❌ 无法获取文件夹列表")
|
|||
|
return False
|
|||
|
|
|||
|
for box in mailboxes:
|
|||
|
raw_name = box.decode().split(' "/" ')[-1].strip('"')
|
|||
|
try:
|
|||
|
decoded_name = imaplib.IMAP4._decode_utf7(raw_name)
|
|||
|
except:
|
|||
|
decoded_name = raw_name
|
|||
|
|
|||
|
if "收件箱" in decoded_name or "INBOX" in decoded_name.upper():
|
|||
|
# select 时使用原始名字加双引号
|
|||
|
status, _ = self.connection.select(f'"{raw_name}"')
|
|||
|
if status == "OK":
|
|||
|
print(f"📂 自动匹配文件夹: {decoded_name}")
|
|||
|
return True
|
|||
|
except Exception as e:
|
|||
|
print(f"❌ 遍历文件夹选择收件箱失败: {e}")
|
|||
|
return False
|
|||
|
|
|||
|
print("❌ 无法选择收件箱文件夹")
|
|||
|
return False
|
|||
|
|
|||
|
def fetch_from_all_folders(self, limit=5, download_attachments=False):
|
|||
|
if not self.connection:
|
|||
|
print("⚠️ 未建立IMAP连接")
|
|||
|
return []
|
|||
|
|
|||
|
emails = []
|
|||
|
try:
|
|||
|
status, mailboxes = self.connection.list()
|
|||
|
if status != "OK" or not mailboxes:
|
|||
|
print("❌ 无法获取文件夹列表")
|
|||
|
return emails
|
|||
|
|
|||
|
print("📂 邮箱文件夹列表:")
|
|||
|
for box in mailboxes:
|
|||
|
box_str = box.decode() if isinstance(box, bytes) else box
|
|||
|
parts = box_str.split(' "/" ')
|
|||
|
raw_name = parts[-1].strip('"')
|
|||
|
try:
|
|||
|
decoded_name = imaplib.IMAP4._decode_utf7(raw_name)
|
|||
|
except:
|
|||
|
decoded_name = raw_name
|
|||
|
print(f"- 原始: {raw_name} | 解码: {decoded_name}")
|
|||
|
|
|||
|
# 尝试选择这个文件夹
|
|||
|
try:
|
|||
|
status, _ = self.connection.select(raw_name)
|
|||
|
if status != "OK":
|
|||
|
continue
|
|||
|
print(f"✅ 成功选择文件夹: {decoded_name}")
|
|||
|
|
|||
|
# 获取邮件ID
|
|||
|
status, data = self.connection.search(None, "ALL")
|
|||
|
if status != "OK" or not data or not data[0]:
|
|||
|
print(f"📭 文件夹 {decoded_name} 没有邮件")
|
|||
|
continue
|
|||
|
|
|||
|
email_ids = data[0].split()
|
|||
|
latest_ids = email_ids[-limit:] if limit else email_ids
|
|||
|
|
|||
|
for eid in reversed(latest_ids):
|
|||
|
status, msg_data = self.connection.fetch(eid, "(RFC822)")
|
|||
|
if status != "OK":
|
|||
|
continue
|
|||
|
msg = email.message_from_bytes(msg_data[0][1])
|
|||
|
emails.append(msg) # 这里只是示例,你可以解析发件人、主题等
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
print(f"❌ 无法选择或抓取 {decoded_name}: {e}")
|
|||
|
except Exception as e:
|
|||
|
print(f"❌ 遍历文件夹失败: {e}")
|
|||
|
|
|||
|
return emails
|
|||
|
|
|||
|
def fetch_latest_emails(self, limit=5, download_attachments=False, folder="INBOX"):
|
|||
|
emails = []
|
|||
|
if not self.connection:
|
|||
|
print("⚠️ 未建立IMAP连接,无法获取邮件")
|
|||
|
return emails
|
|||
|
|
|||
|
try:
|
|||
|
if not self._select_mailbox(folder):
|
|||
|
return []
|
|||
|
|
|||
|
status, data = self.connection.search(None, "ALL")
|
|||
|
if status != "OK" or not data or not data[0]:
|
|||
|
print("📭 没有找到邮件")
|
|||
|
return []
|
|||
|
|
|||
|
email_ids = data[0].split()
|
|||
|
latest_ids = email_ids[-limit:] if limit else email_ids
|
|||
|
|
|||
|
for eid in reversed(latest_ids):
|
|||
|
status, msg_data = self.connection.fetch(eid, "(RFC822)")
|
|||
|
if status != "OK":
|
|||
|
continue
|
|||
|
|
|||
|
msg = email.message_from_bytes(msg_data[0][1])
|
|||
|
mail_from = self._decode_str(msg.get("From"))
|
|||
|
mail_subject = self._decode_str(msg.get("Subject"))
|
|||
|
mail_date = msg.get("Date")
|
|||
|
mail_body = ""
|
|||
|
attachments = []
|
|||
|
|
|||
|
if msg.is_multipart():
|
|||
|
for part in msg.walk():
|
|||
|
content_type = part.get_content_type()
|
|||
|
filename = part.get_filename()
|
|||
|
if filename and download_attachments:
|
|||
|
filename = self._decode_str(filename)
|
|||
|
os.makedirs("attachments", exist_ok=True)
|
|||
|
save_path = os.path.join("attachments", filename)
|
|||
|
with open(save_path, "wb") as f:
|
|||
|
f.write(part.get_payload(decode=True))
|
|||
|
attachments.append(save_path)
|
|||
|
elif content_type == "text/plain" and not filename:
|
|||
|
charset = part.get_content_charset() or "utf-8"
|
|||
|
try:
|
|||
|
mail_body += part.get_payload(decode=True).decode(charset, errors="ignore")
|
|||
|
except:
|
|||
|
mail_body += part.get_payload(decode=True).decode("utf-8", errors="ignore")
|
|||
|
else:
|
|||
|
charset = msg.get_content_charset() or "utf-8"
|
|||
|
mail_body = msg.get_payload(decode=True).decode(charset, errors="ignore")
|
|||
|
|
|||
|
emails.append({
|
|||
|
"from": mail_from,
|
|||
|
"subject": mail_subject,
|
|||
|
"date": mail_date,
|
|||
|
"body": mail_body,
|
|||
|
"attachments": attachments
|
|||
|
})
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
print(f"❌ 获取邮件失败: {e}")
|
|||
|
|
|||
|
return emails
|
|||
|
|
|||
|
def logout(self):
|
|||
|
if self.connection:
|
|||
|
try:
|
|||
|
try:
|
|||
|
self.connection.close()
|
|||
|
except:
|
|||
|
pass
|
|||
|
self.connection.logout()
|
|||
|
print("🔌 已断开连接")
|
|||
|
except Exception:
|
|||
|
print("🔌 断开连接失败")
|