# -*- coding: utf-8 -*- import sys import requests from typing import List, Dict, Set from alibabacloud_dingtalk.calendar_1_0.client import Client as dingtalkcalendar_1_0Client from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_dingtalk.calendar_1_0 import models as dingtalkcalendar_1_0_models from alibabacloud_tea_util import models as util_models from alibabacloud_tea_util.client import Client as UtilClient class DingTalkHelper: @staticmethod def get_access_token(app_key: str, app_secret: str) -> str: url = "https://oapi.dingtalk.com/gettoken" params = {"appkey": app_key, "appsecret": app_secret} resp = requests.get(url, params=params) data = resp.json() if data.get("errcode") == 0: print("✅ AccessToken 获取成功:", data.get("access_token")) return data.get("access_token") print("❌ 获取 AccessToken 失败:", data) return "" @staticmethod def get_sub_departments(token: str, dept_id: int = None) -> List[Dict]: url = "https://oapi.dingtalk.com/topapi/v2/department/listsub" params = {"access_token": token} payload = {} if dept_id is not None: payload["dept_id"] = dept_id resp = requests.post(url, params=params, json=payload) data = resp.json() if data.get("errcode") == 0: return data.get("result", []) else: print(f"❌ 获取部门失败 dept_id={dept_id}: {data}") return [] @staticmethod def get_all_departments(token: str) -> List[Dict]: all_depts = [] def recurse(dept_id: int = None): depts = DingTalkHelper.get_sub_departments(token, dept_id) for d in depts: all_depts.append(d) recurse(d["dept_id"]) recurse() return all_depts @staticmethod def get_users_by_dept(token: str, dept_id: int) -> List[str]: url = "https://oapi.dingtalk.com/topapi/user/listid" params = {"access_token": token} payload = {"dept_id": dept_id} resp = requests.post(url, params=params, json=payload) data = resp.json() if data.get("errcode") == 0: return data.get("result", {}).get("userid_list", []) else: print(f"❌ 获取部门用户失败 dept_id={dept_id}: {data}") return [] @staticmethod def get_user_detail(token: str, userid: str) -> Dict: url = "https://oapi.dingtalk.com/topapi/v2/user/get" params = {"access_token": token} payload = {"userid": userid} resp = requests.post(url, params=params, json=payload) data = resp.json() if data.get("errcode") == 0: return data.get("result", {}) else: print(f"❌ 获取用户详情失败 userid={userid}: {data}") return {} @staticmethod def create_calendar_client() -> dingtalkcalendar_1_0Client: config = open_api_models.Config() config.protocol = 'https' config.region_id = 'central' return dingtalkcalendar_1_0Client(config) @staticmethod def get_user_calendar_events(client, access_token: str, unionid: str) -> List[Dict]: headers = dingtalkcalendar_1_0_models.ListEventsHeaders() headers.x_acs_dingtalk_access_token = access_token request = dingtalkcalendar_1_0_models.ListEventsRequest() try: resp = client.list_events_with_options(unionid, 'primary', request, headers, util_models.RuntimeOptions()) return resp.body.events if resp and resp.body and hasattr(resp.body, "events") else [] except Exception as e: print(f"❌ 获取日程失败 unionid={unionid},错误: {e}") return [] def main(): app_key = "dinguetojbaxvvhzpk3d" app_secret = "lMFqns_ceLIcXvfLL8GKfa3ZiPKHcaZq0VbGtJXJlDuK8AEJ2WV3-PN8zv61ajm3" token = DingTalkHelper.get_access_token(app_key, app_secret) if not token: return print("获取全部部门...") departments = DingTalkHelper.get_all_departments(token) print(f"部门总数: {len(departments)}") all_userids = set() for dept in departments: userids = DingTalkHelper.get_users_by_dept(token, dept["dept_id"]) all_userids.update(userids) print(f"共获取用户数量: {len(all_userids)}") calendar_client = DingTalkHelper.create_calendar_client() for userid in all_userids: user_detail = DingTalkHelper.get_user_detail(token, userid) if not user_detail: continue unionid = user_detail.get("unionid") name = user_detail.get("name") print(f"用户: {name},userid: {userid},unionid: {unionid}") if unionid: events = DingTalkHelper.get_user_calendar_events(calendar_client, token, unionid) print(f"日程数: {len(events)}") for event in events: print(event) if __name__ == '__main__': main()