# -*- coding: utf-8 -*- import requests import json import time from typing import List class DingTalkGroupFetcher: @staticmethod def get_access_token(app_key: str, app_secret: str) -> str: url = "https://oapi.dingtalk.com/gettoken" params = {"appkey": app_key, "appsecret": app_secret} resp = requests.get(url, params=params) r = resp.json() if r.get("errcode") == 0: print(f"✅ AccessToken 获取成功: {r['access_token']}") return r["access_token"] print(f"❌ AccessToken 获取失败: {r}") return "" @staticmethod def get_userid_by_mobile(access_token: str, mobile: str) -> str: url = "https://oapi.dingtalk.com/topapi/v2/user/getbymobile" params = {"access_token": access_token} data = {"mobile": mobile} resp = requests.post(url, params=params, json=data) r = resp.json() if r.get("errcode") == 0: print(f"✅ 获取 {mobile} 的 userId: {r['result']['userid']}") return r["result"]["userid"] print(f"❌ 获取 userId 失败: {r}") return "" @staticmethod def create_group(access_token: str, name: str, owner: str, user_ids: List[str]) -> str: url = "https://oapi.dingtalk.com/chat/create" headers = {"Content-Type": "application/json"} params = {"access_token": access_token} data = { "name": name, "owner": owner, "useridlist": user_ids } resp = requests.post(url, params=params, data=json.dumps(data), headers=headers) r = resp.json() if r.get("errcode") == 0: print(f"✅ 群聊创建成功: chatid = {r['chatid']}") return r["chatid"] print(f"❌ 创建群聊失败: {r}") return "" @staticmethod def get_chat_info(access_token: str, chatid: str): url = "https://oapi.dingtalk.com/chat/get" params = {"access_token": access_token, "chatid": chatid} resp = requests.get(url, params=params) r = resp.json() if r.get("errcode") == 0: print(f"✅ 群聊信息获取成功:") print(json.dumps(r, indent=2, ensure_ascii=False)) else: print(f"❌ 群聊信息获取失败: {r}") @staticmethod def create_chat_flow(): app_key = "dinguetojbaxvvhzpk3d" app_secret = "lMFqns_ceLIcXvfLL8GKfa3ZiPKHcaZq0VbGtJXJlDuK8AEJ2WV3-PN8zv61ajm3" mobile_list = [ "17279617782", "18175540355" ] group_name = "测试自动创建群聊" access_token = DingTalkGroupFetcher.get_access_token(app_key, app_secret) user_ids = [] for mobile in mobile_list: uid = DingTalkGroupFetcher.get_userid_by_mobile(access_token, mobile) if uid: user_ids.append(uid) if not user_ids: print("❌ 无有效用户,无法创建群聊") return owner = user_ids[0] chatid = DingTalkGroupFetcher.create_group(access_token, group_name, owner, user_ids) if chatid: print(f"📌 请记录下此 chatid 用于后续获取群聊信息: {chatid}") @staticmethod def get_chat_info_flow(chatid: str): app_key = "dinguetojbaxvvhzpk3d" app_secret = "lMFqns_ceLIcXvfLL8GKfa3ZiPKHcaZq0VbGtJXJlDuK8AEJ2WV3-PN8zv61ajm3" access_token = DingTalkGroupFetcher.get_access_token(app_key, app_secret) DingTalkGroupFetcher.get_chat_info(access_token, chatid) if __name__ == '__main__': # ✅ 调用创建群聊流程 # DingTalkGroupFetcher.create_chat_flow() # ✅ 单独调用获取群聊信息流程 chat_id = "chat3c356e41e2cfc0bee60afb8830b74c53" # <-- 替换为已有 chatid DingTalkGroupFetcher.get_chat_info_flow(chatid=chat_id)