sdk/oapiSdk/message.py

250 lines
9.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import csv
import json
import time
from datetime import datetime
import lark_oapi as lark
from lark_oapi.api.im.v1 import ListChatRequest, ListChatResponse, ListMessageRequest, ListMessageResponse, GetMessageResourceRequest
from lark_oapi.api.contact.v3 import GetUserRequest
from OaConfig import APP_ID,APP_SECRET,OUTPUT_CSV,DOWNLOAD_DIR
USER_CACHE = {}
def timestamp_to_str(ms):
try:
return datetime.fromtimestamp(int(ms) / 1000).strftime("%Y-%m-%d %H:%M:%S")
except:
return ""
def str_to_timestamp_seconds(dt_str):
try:
dt = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
return int(dt.timestamp())
except Exception as e:
raise ValueError(f"时间格式错误,应为 'YYYY-MM-DD HH:MM:SS', 但收到: {dt_str}")
def get_user_name(client, open_id):
if not open_id:
return "未知用户"
if open_id in USER_CACHE:
return USER_CACHE[open_id]
try:
request = GetUserRequest.builder() \
.user_id(open_id) \
.user_id_type("open_id") \
.department_id_type("open_department_id") \
.build()
response = client.contact.v3.user.get(request)
if response.success():
name = response.data.user.name
USER_CACHE[open_id] = name
return name
else:
return open_id
except Exception as e:
print(f"❌ 获取用户名异常: {e}")
return open_id
def download_file(client, message_id, file_key, file_name, file_type):
try:
request = GetMessageResourceRequest.builder() \
.message_id(message_id) \
.file_key(file_key) \
.type(file_type) \
.build()
response = client.im.v1.message_resource.get(request)
if not response.success():
return ""
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
file_path = os.path.join(DOWNLOAD_DIR, file_name)
with open(file_path, "wb") as f:
f.write(response.file.read())
return file_path
except Exception as e:
return ""
def save_messages(messages):
os.makedirs(os.path.dirname(OUTPUT_CSV), exist_ok=True)
file_exists = os.path.exists(OUTPUT_CSV)
with open(OUTPUT_CSV, "a", newline="", encoding="utf-8-sig") as f:
fieldnames = [
"chat_id", "chat_name", "message_id", "sender_id", "sender_name", "send_time",
"msg_type", "content", "file_path"
]
writer = csv.DictWriter(f, fieldnames=fieldnames)
if not file_exists:
writer.writeheader()
writer.writerows(messages)
def fetch_chats(client):
chats = []
page_token = None
while True:
builder = ListChatRequest.builder() \
.user_id_type("user_id") \
.sort_type("ByCreateTimeAsc") \
.page_size(50)
if page_token:
builder = builder.page_token(page_token)
request = builder.build()
response: ListChatResponse = client.im.v1.chat.list(request)
if not response.success():
return None
chats.extend(response.data.items)
if not response.data.has_more:
break
page_token = response.data.page_token
return chats
def fetch_messages_for_chat(client, chat_id, chat_name, start_time, end_time):
page_token = ""
messages = []
while True:
request = ListMessageRequest.builder() \
.container_id_type("chat") \
.container_id(chat_id) \
.start_time(start_time) \
.end_time(end_time) \
.sort_type("ByCreateTimeAsc") \
.page_size(50) \
.page_token(page_token) \
.build()
response: ListMessageResponse = client.im.v1.message.list(request)
if not response.success():
break
for message in response.data.items:
msg_id = message.message_id
msg_type = message.msg_type
sender_id = message.sender.id
sender_name = get_user_name(client, sender_id)
send_time = int(message.create_time) // 1000
try:
body = json.loads(message.body.content)
except json.JSONDecodeError:
# 非 JSON比如撤回消息
body = message.body.content
content = ""
file_path = ""
if msg_type == "system":
continue
elif msg_type == "text":
if isinstance(body, dict):
content = body.get("text", "")
else:
# 非 JSON 对象(比如撤回消息)
content = str(body)
elif msg_type == "post":
content_json = body
texts = []
file_paths = []
file_keys_seen = set()
for block in content_json.get("content", []):
for item in block:
tag = item.get("tag", "")
if tag == "text":
texts.append(item.get("text", ""))
elif tag == "img":
file_key = item.get("image_key")
file_name = f"{msg_id}_{file_key}.png"
if file_key and file_key not in file_keys_seen:
file_keys_seen.add(file_key)
path = download_file(client, msg_id, file_key, file_name, "image") # 注意这里是 "image"
if path:
file_paths.append(path)
elif tag == "file":
file_key = item.get("file_key")
file_name = item.get("name") or f"{msg_id}_{file_key}"
if file_key and file_key not in file_keys_seen:
file_keys_seen.add(file_key)
path = download_file(client, msg_id, file_key, file_name, "file") # 注意这里是 "file"
if path:
file_paths.append(path)
# 你也可以处理 emotion 或其他 tag
content = "".join(texts)
file_path = ";".join(file_paths) if file_paths else ""
elif msg_type in ["file", "image", "audio", "video", "sticker"]:
key_field_map = {
"file": "file_key",
"image": "image_key",
"audio": "file_key",
"video": "file_key",
"sticker": "file_key"
}
key_field = key_field_map[msg_type]
file_key = body.get(key_field, "")
# 获取真实文件名
if msg_type == "image":
file_ext = ".png" # 默认图片扩展名
file_name = f"{msg_id}_{file_key}{file_ext}"
else:
file_name = body.get("file_name") or f"{msg_id}_{file_key}.bin"
file_path = download_file(client, msg_id, file_key, file_name, "file") if file_key else ""
content = f"[{msg_type} 文件] {file_name}"
elif msg_type == "share_chat":
shared_chat_id = body.get("chat_id", "")
content = f"[分享群聊] chat_id: {shared_chat_id}"
elif msg_type == "share_user":
shared_user_id = body.get("user_id", "")
content = f"[分享用户] user_id: {shared_user_id}"
elif msg_type == "media":
media_list = body.get("medias", [])
content = "[媒体合成消息] 包含资源:" + ", ".join(media.get("name", "") for media in media_list)
elif msg_type == "message":
quoted = body.get("text", "")
content = f"[引用消息] {quoted}"
else:
content = f"[未处理类型: {msg_type}] 原始内容: {json.dumps(body, ensure_ascii=False)}"
# Add more conditions for other message types
messages.append({
"chat_id": chat_id,
"chat_name": chat_name,
"message_id": msg_id,
"sender_id": sender_id,
"sender_name": sender_name,
"send_time": send_time,
"msg_type": msg_type,
"content": content,
"file_path": file_path
})
if not response.data.has_more:
break
page_token = response.data.page_token
return messages
def main():
client = lark.Client.builder() \
.app_id(APP_ID) \
.app_secret(APP_SECRET) \
.log_level(lark.LogLevel.ERROR) \
.build()
chats = fetch_chats(client)
print(f"共获取群聊数量: {len(chats)}")
all_messages = []
for chat in chats:
print(f"开始抓取群聊: {chat.name} ({chat.chat_id})")
msgs = fetch_messages_for_chat(client, chat.chat_id, chat.name)
all_messages.extend(msgs)
if all_messages:
save_messages(all_messages)
print(f"✅ 已保存所有消息,共计 {len(all_messages)}")
else:
print("没有消息需要保存")
if __name__ == "__main__":
main()