sdk/dingding-sdk/dingding_message.py

106 lines
3.8 KiB
Python
Raw Normal View History

2025-08-18 09:05:41 +00:00
# -*- coding: utf-8 -*-
import requests
import json
import time
from typing import List
class DingTalkGroupFetcher:
@staticmethod
def get_access_token(app_key: str, app_secret: str) -> str:
url = "https://oapi.dingtalk.com/gettoken"
params = {"appkey": app_key, "appsecret": app_secret}
resp = requests.get(url, params=params)
r = resp.json()
if r.get("errcode") == 0:
print(f"✅ AccessToken 获取成功: {r['access_token']}")
return r["access_token"]
print(f"❌ AccessToken 获取失败: {r}")
return ""
@staticmethod
def get_userid_by_mobile(access_token: str, mobile: str) -> str:
url = "https://oapi.dingtalk.com/topapi/v2/user/getbymobile"
params = {"access_token": access_token}
data = {"mobile": mobile}
resp = requests.post(url, params=params, json=data)
r = resp.json()
if r.get("errcode") == 0:
print(f"✅ 获取 {mobile} 的 userId: {r['result']['userid']}")
return r["result"]["userid"]
print(f"❌ 获取 userId 失败: {r}")
return ""
@staticmethod
def create_group(access_token: str, name: str, owner: str, user_ids: List[str]) -> str:
url = "https://oapi.dingtalk.com/chat/create"
headers = {"Content-Type": "application/json"}
params = {"access_token": access_token}
data = {
"name": name,
"owner": owner,
"useridlist": user_ids
}
resp = requests.post(url, params=params, data=json.dumps(data), headers=headers)
r = resp.json()
if r.get("errcode") == 0:
print(f"✅ 群聊创建成功: chatid = {r['chatid']}")
return r["chatid"]
print(f"❌ 创建群聊失败: {r}")
return ""
@staticmethod
def get_chat_info(access_token: str, chatid: str):
url = "https://oapi.dingtalk.com/chat/get"
params = {"access_token": access_token, "chatid": chatid}
resp = requests.get(url, params=params)
r = resp.json()
if r.get("errcode") == 0:
print(f"✅ 群聊信息获取成功:")
print(json.dumps(r, indent=2, ensure_ascii=False))
else:
print(f"❌ 群聊信息获取失败: {r}")
@staticmethod
def create_chat_flow():
app_key = "dinguetojbaxvvhzpk3d"
app_secret = "lMFqns_ceLIcXvfLL8GKfa3ZiPKHcaZq0VbGtJXJlDuK8AEJ2WV3-PN8zv61ajm3"
mobile_list = [
"17279617782",
"18175540355"
]
group_name = "测试自动创建群聊"
access_token = DingTalkGroupFetcher.get_access_token(app_key, app_secret)
user_ids = []
for mobile in mobile_list:
uid = DingTalkGroupFetcher.get_userid_by_mobile(access_token, mobile)
if uid:
user_ids.append(uid)
if not user_ids:
print("❌ 无有效用户,无法创建群聊")
return
owner = user_ids[0]
chatid = DingTalkGroupFetcher.create_group(access_token, group_name, owner, user_ids)
if chatid:
print(f"📌 请记录下此 chatid 用于后续获取群聊信息: {chatid}")
@staticmethod
def get_chat_info_flow(chatid: str):
app_key = "dinguetojbaxvvhzpk3d"
app_secret = "lMFqns_ceLIcXvfLL8GKfa3ZiPKHcaZq0VbGtJXJlDuK8AEJ2WV3-PN8zv61ajm3"
access_token = DingTalkGroupFetcher.get_access_token(app_key, app_secret)
DingTalkGroupFetcher.get_chat_info(access_token, chatid)
if __name__ == '__main__':
# ✅ 调用创建群聊流程
# DingTalkGroupFetcher.create_chat_flow()
# ✅ 单独调用获取群聊信息流程
chat_id = "chat3c356e41e2cfc0bee60afb8830b74c53" # <-- 替换为已有 chatid
DingTalkGroupFetcher.get_chat_info_flow(chatid=chat_id)