sdk/dingdingSdk/dingding_wiki_file.py

145 lines
5.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import requests
from alibabacloud_dingtalk.wiki_2_0.client import Client as dingtalkwiki_2_0Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_dingtalk.wiki_2_0 import models as wiki_models
from alibabacloud_tea_util import models as util_models
APP_KEY = "dinguetojbaxvvhzpk3d"
APP_SECRET = "lMFqns_ceLIcXvfLL8GKfa3ZiPKHcaZq0VbGtJXJlDuK8AEJ2WV3-PN8zv61ajm3"
# ------------------- DingTalk 基础 -------------------
def get_access_token():
url = "https://oapi.dingtalk.com/gettoken"
resp = requests.get(url, params={"appkey": APP_KEY, "appsecret": APP_SECRET}).json()
if resp.get("errcode") == 0:
return resp["access_token"]
raise Exception(f"获取token失败: {resp}")
def get_department_list(token, parent_id=1):
"""递归获取所有部门 ID"""
url = "https://oapi.dingtalk.com/topapi/v2/department/listsub"
resp = requests.post(url, params={"access_token": token}, json={"dept_id": parent_id}).json()
if resp.get("errcode") != 0:
raise Exception(f"获取部门失败: {resp}")
dept_list = resp["result"]
all_depts = [d["dept_id"] for d in dept_list]
for d in dept_list:
all_depts.extend(get_department_list(token, d["dept_id"]))
return all_depts
def get_department_users(token, dept_id):
"""获取部门成员 userid 列表"""
url = "https://oapi.dingtalk.com/topapi/user/listid"
resp = requests.post(url, params={"access_token": token}, json={"dept_id": dept_id}).json()
if resp.get("errcode") == 0:
return resp.get("result", {}).get("userid_list", [])
else:
print(f"❌ 获取部门用户失败 dept_id={dept_id}: {resp}")
return []
def get_unionid(token, userid):
"""通过 userId 获取 unionid"""
url = "https://oapi.dingtalk.com/topapi/v2/user/get"
resp = requests.post(url, params={"access_token": token}, json={"userid": userid}).json()
if resp.get("errcode") != 0:
raise Exception(f"获取unionid失败: {resp}")
return resp["result"]["unionid"]
# ------------------- Wiki 部分 -------------------
def create_wiki_client():
config = open_api_models.Config()
config.protocol = 'https'
config.region_id = 'central'
return dingtalkwiki_2_0Client(config)
def get_user_root_node_id(client, token, operator_id):
"""获取用户个人空间 rootNodeId"""
headers = wiki_models.GetMineWorkspaceHeaders()
headers.x_acs_dingtalk_access_token = token
req = wiki_models.GetMineWorkspaceRequest(operator_id=operator_id)
try:
result = client.get_mine_workspace_with_options(req, headers, util_models.RuntimeOptions())
if hasattr(result.body, "workspace") and hasattr(result.body.workspace, "root_node_id"):
return result.body.workspace.root_node_id
return None
except Exception as e:
print(f"获取 {operator_id} rootNodeId 失败: {e}")
return None
def list_all_nodes(client, operator_id, parent_node_id, token):
"""递归获取 Wiki 节点,返回有层级结构的列表"""
headers = wiki_models.ListNodesHeaders()
headers.x_acs_dingtalk_access_token = token
request = wiki_models.ListNodesRequest(
operator_id=operator_id,
parent_node_id=parent_node_id
)
try:
resp = client.list_nodes_with_options(request, headers, util_models.RuntimeOptions())
except Exception as e:
print(f"请求 ListNodes 出错: {e}")
return []
if not hasattr(resp.body, "nodes") or not resp.body.nodes:
return []
results = []
for node in resp.body.nodes:
node_dict = {
"name": node.name,
"node_id": node.node_id,
"type": node.type,
"url": getattr(node, "url", None)
}
if getattr(node, "has_children", False):
# 递归获取子节点放入children
node_dict["children"] = list_all_nodes(client, operator_id, node.node_id, token)
else:
node_dict["children"] = []
results.append(node_dict)
return results
# ------------------- 主逻辑 -------------------
if __name__ == "__main__":
token = get_access_token()
print("✅ AccessToken:", token)
dept_ids = [1] + get_department_list(token)
print("✅ 部门总数:", len(dept_ids))
all_users = set()
for dept_id in dept_ids:
users = get_department_users(token, dept_id)
all_users.update(users)
print("✅ 成员总数:", len(all_users))
client = create_wiki_client()
for userid in all_users:
try:
operator_id = get_unionid(token, userid)
if not operator_id:
print(f"⚠️ 用户 {userid} 未获取到 unionId跳过。")
continue
root_node_id = get_user_root_node_id(client, token, operator_id)
if not root_node_id:
print(f"⚠️ 用户 {userid} ({operator_id}) 未获取到 rootNodeId跳过。")
continue
wiki_files = list_all_nodes(client, operator_id, root_node_id, token)
print(f"\n📂 用户 {userid} ({operator_id}) ({root_node_id})- 共 {len(wiki_files)} 个文件/文件夹:")
for f in wiki_files:
print(f" - [{f['type']}] {f['name']} ({f['url']})")
except Exception as e:
print(f"❌ 处理用户 {userid} 出错: {e}")