sdk/dingdingSdk/dingding_wiki_file.py

145 lines
5.3 KiB
Python
Raw Normal View History

2025-08-19 10:20:23 +00:00
# -*- coding: utf-8 -*-
import requests
from alibabacloud_dingtalk.wiki_2_0.client import Client as dingtalkwiki_2_0Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_dingtalk.wiki_2_0 import models as wiki_models
from alibabacloud_tea_util import models as util_models
APP_KEY = "dinguetojbaxvvhzpk3d"
APP_SECRET = "lMFqns_ceLIcXvfLL8GKfa3ZiPKHcaZq0VbGtJXJlDuK8AEJ2WV3-PN8zv61ajm3"
# ------------------- DingTalk 基础 -------------------
def get_access_token():
url = "https://oapi.dingtalk.com/gettoken"
resp = requests.get(url, params={"appkey": APP_KEY, "appsecret": APP_SECRET}).json()
if resp.get("errcode") == 0:
return resp["access_token"]
raise Exception(f"获取token失败: {resp}")
def get_department_list(token, parent_id=1):
"""递归获取所有部门 ID"""
url = "https://oapi.dingtalk.com/topapi/v2/department/listsub"
resp = requests.post(url, params={"access_token": token}, json={"dept_id": parent_id}).json()
if resp.get("errcode") != 0:
raise Exception(f"获取部门失败: {resp}")
dept_list = resp["result"]
all_depts = [d["dept_id"] for d in dept_list]
for d in dept_list:
all_depts.extend(get_department_list(token, d["dept_id"]))
return all_depts
def get_department_users(token, dept_id):
"""获取部门成员 userid 列表"""
url = "https://oapi.dingtalk.com/topapi/user/listid"
resp = requests.post(url, params={"access_token": token}, json={"dept_id": dept_id}).json()
if resp.get("errcode") == 0:
return resp.get("result", {}).get("userid_list", [])
else:
print(f"❌ 获取部门用户失败 dept_id={dept_id}: {resp}")
return []
def get_unionid(token, userid):
"""通过 userId 获取 unionid"""
url = "https://oapi.dingtalk.com/topapi/v2/user/get"
resp = requests.post(url, params={"access_token": token}, json={"userid": userid}).json()
if resp.get("errcode") != 0:
raise Exception(f"获取unionid失败: {resp}")
return resp["result"]["unionid"]
# ------------------- Wiki 部分 -------------------
def create_wiki_client():
config = open_api_models.Config()
config.protocol = 'https'
config.region_id = 'central'
return dingtalkwiki_2_0Client(config)
def get_user_root_node_id(client, token, operator_id):
"""获取用户个人空间 rootNodeId"""
headers = wiki_models.GetMineWorkspaceHeaders()
headers.x_acs_dingtalk_access_token = token
req = wiki_models.GetMineWorkspaceRequest(operator_id=operator_id)
try:
result = client.get_mine_workspace_with_options(req, headers, util_models.RuntimeOptions())
if hasattr(result.body, "workspace") and hasattr(result.body.workspace, "root_node_id"):
return result.body.workspace.root_node_id
return None
except Exception as e:
print(f"获取 {operator_id} rootNodeId 失败: {e}")
return None
def list_all_nodes(client, operator_id, parent_node_id, token):
"""递归获取 Wiki 节点,返回有层级结构的列表"""
headers = wiki_models.ListNodesHeaders()
headers.x_acs_dingtalk_access_token = token
request = wiki_models.ListNodesRequest(
operator_id=operator_id,
parent_node_id=parent_node_id
)
try:
resp = client.list_nodes_with_options(request, headers, util_models.RuntimeOptions())
except Exception as e:
print(f"请求 ListNodes 出错: {e}")
return []
if not hasattr(resp.body, "nodes") or not resp.body.nodes:
return []
results = []
for node in resp.body.nodes:
node_dict = {
"name": node.name,
"node_id": node.node_id,
"type": node.type,
"url": getattr(node, "url", None)
}
if getattr(node, "has_children", False):
# 递归获取子节点放入children
node_dict["children"] = list_all_nodes(client, operator_id, node.node_id, token)
else:
node_dict["children"] = []
results.append(node_dict)
return results
# ------------------- 主逻辑 -------------------
if __name__ == "__main__":
token = get_access_token()
print("✅ AccessToken:", token)
dept_ids = [1] + get_department_list(token)
print("✅ 部门总数:", len(dept_ids))
all_users = set()
for dept_id in dept_ids:
users = get_department_users(token, dept_id)
all_users.update(users)
print("✅ 成员总数:", len(all_users))
client = create_wiki_client()
for userid in all_users:
try:
operator_id = get_unionid(token, userid)
if not operator_id:
print(f"⚠️ 用户 {userid} 未获取到 unionId跳过。")
continue
root_node_id = get_user_root_node_id(client, token, operator_id)
if not root_node_id:
print(f"⚠️ 用户 {userid} ({operator_id}) 未获取到 rootNodeId跳过。")
continue
wiki_files = list_all_nodes(client, operator_id, root_node_id, token)
print(f"\n📂 用户 {userid} ({operator_id}) ({root_node_id})- 共 {len(wiki_files)} 个文件/文件夹:")
for f in wiki_files:
print(f" - [{f['type']}] {f['name']} ({f['url']})")
except Exception as e:
print(f"❌ 处理用户 {userid} 出错: {e}")