106 lines
3.8 KiB
Python
106 lines
3.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
import requests
|
|
import json
|
|
import time
|
|
from typing import List
|
|
|
|
|
|
class DingTalkGroupFetcher:
|
|
@staticmethod
|
|
def get_access_token(app_key: str, app_secret: str) -> str:
|
|
url = "https://oapi.dingtalk.com/gettoken"
|
|
params = {"appkey": app_key, "appsecret": app_secret}
|
|
resp = requests.get(url, params=params)
|
|
r = resp.json()
|
|
if r.get("errcode") == 0:
|
|
print(f"✅ AccessToken 获取成功: {r['access_token']}")
|
|
return r["access_token"]
|
|
print(f"❌ AccessToken 获取失败: {r}")
|
|
return ""
|
|
|
|
@staticmethod
|
|
def get_userid_by_mobile(access_token: str, mobile: str) -> str:
|
|
url = "https://oapi.dingtalk.com/topapi/v2/user/getbymobile"
|
|
params = {"access_token": access_token}
|
|
data = {"mobile": mobile}
|
|
resp = requests.post(url, params=params, json=data)
|
|
r = resp.json()
|
|
if r.get("errcode") == 0:
|
|
print(f"✅ 获取 {mobile} 的 userId: {r['result']['userid']}")
|
|
return r["result"]["userid"]
|
|
print(f"❌ 获取 userId 失败: {r}")
|
|
return ""
|
|
|
|
@staticmethod
|
|
def create_group(access_token: str, name: str, owner: str, user_ids: List[str]) -> str:
|
|
url = "https://oapi.dingtalk.com/chat/create"
|
|
headers = {"Content-Type": "application/json"}
|
|
params = {"access_token": access_token}
|
|
data = {
|
|
"name": name,
|
|
"owner": owner,
|
|
"useridlist": user_ids
|
|
}
|
|
resp = requests.post(url, params=params, data=json.dumps(data), headers=headers)
|
|
r = resp.json()
|
|
if r.get("errcode") == 0:
|
|
print(f"✅ 群聊创建成功: chatid = {r['chatid']}")
|
|
return r["chatid"]
|
|
print(f"❌ 创建群聊失败: {r}")
|
|
return ""
|
|
|
|
@staticmethod
|
|
def get_chat_info(access_token: str, chatid: str):
|
|
url = "https://oapi.dingtalk.com/chat/get"
|
|
params = {"access_token": access_token, "chatid": chatid}
|
|
resp = requests.get(url, params=params)
|
|
r = resp.json()
|
|
if r.get("errcode") == 0:
|
|
print(f"✅ 群聊信息获取成功:")
|
|
print(json.dumps(r, indent=2, ensure_ascii=False))
|
|
else:
|
|
print(f"❌ 群聊信息获取失败: {r}")
|
|
|
|
@staticmethod
|
|
def create_chat_flow():
|
|
app_key = "dinguetojbaxvvhzpk3d"
|
|
app_secret = "lMFqns_ceLIcXvfLL8GKfa3ZiPKHcaZq0VbGtJXJlDuK8AEJ2WV3-PN8zv61ajm3"
|
|
mobile_list = [
|
|
"17279617782",
|
|
"18175540355"
|
|
]
|
|
group_name = "测试自动创建群聊"
|
|
|
|
access_token = DingTalkGroupFetcher.get_access_token(app_key, app_secret)
|
|
user_ids = []
|
|
|
|
for mobile in mobile_list:
|
|
uid = DingTalkGroupFetcher.get_userid_by_mobile(access_token, mobile)
|
|
if uid:
|
|
user_ids.append(uid)
|
|
|
|
if not user_ids:
|
|
print("❌ 无有效用户,无法创建群聊")
|
|
return
|
|
|
|
owner = user_ids[0]
|
|
chatid = DingTalkGroupFetcher.create_group(access_token, group_name, owner, user_ids)
|
|
if chatid:
|
|
print(f"📌 请记录下此 chatid 用于后续获取群聊信息: {chatid}")
|
|
|
|
@staticmethod
|
|
def get_chat_info_flow(chatid: str):
|
|
app_key = "dinguetojbaxvvhzpk3d"
|
|
app_secret = "lMFqns_ceLIcXvfLL8GKfa3ZiPKHcaZq0VbGtJXJlDuK8AEJ2WV3-PN8zv61ajm3"
|
|
access_token = DingTalkGroupFetcher.get_access_token(app_key, app_secret)
|
|
DingTalkGroupFetcher.get_chat_info(access_token, chatid)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# ✅ 调用创建群聊流程
|
|
# DingTalkGroupFetcher.create_chat_flow()
|
|
|
|
# ✅ 单独调用获取群聊信息流程
|
|
chat_id = "chat3c356e41e2cfc0bee60afb8830b74c53" # <-- 替换为已有 chatid
|
|
DingTalkGroupFetcher.get_chat_info_flow(chatid=chat_id)
|