sdk/dingdingSdk/dingding_calendars.py

136 lines
4.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import sys
import requests
from typing import List, Dict, Set
from alibabacloud_dingtalk.calendar_1_0.client import Client as dingtalkcalendar_1_0Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_dingtalk.calendar_1_0 import models as dingtalkcalendar_1_0_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class DingTalkHelper:
@staticmethod
def get_access_token(app_key: str, app_secret: str) -> str:
url = "https://oapi.dingtalk.com/gettoken"
params = {"appkey": app_key, "appsecret": app_secret}
resp = requests.get(url, params=params)
data = resp.json()
if data.get("errcode") == 0:
print("✅ AccessToken 获取成功:", data.get("access_token"))
return data.get("access_token")
print("❌ 获取 AccessToken 失败:", data)
return ""
@staticmethod
def get_sub_departments(token: str, dept_id: int = None) -> List[Dict]:
url = "https://oapi.dingtalk.com/topapi/v2/department/listsub"
params = {"access_token": token}
payload = {}
if dept_id is not None:
payload["dept_id"] = dept_id
resp = requests.post(url, params=params, json=payload)
data = resp.json()
if data.get("errcode") == 0:
return data.get("result", [])
else:
print(f"❌ 获取部门失败 dept_id={dept_id}: {data}")
return []
@staticmethod
def get_all_departments(token: str) -> List[Dict]:
all_depts = []
def recurse(dept_id: int = None):
depts = DingTalkHelper.get_sub_departments(token, dept_id)
for d in depts:
all_depts.append(d)
recurse(d["dept_id"])
recurse()
return all_depts
@staticmethod
def get_users_by_dept(token: str, dept_id: int) -> List[str]:
url = "https://oapi.dingtalk.com/topapi/user/listid"
params = {"access_token": token}
payload = {"dept_id": dept_id}
resp = requests.post(url, params=params, json=payload)
data = resp.json()
if data.get("errcode") == 0:
return data.get("result", {}).get("userid_list", [])
else:
print(f"❌ 获取部门用户失败 dept_id={dept_id}: {data}")
return []
@staticmethod
def get_user_detail(token: str, userid: str) -> Dict:
url = "https://oapi.dingtalk.com/topapi/v2/user/get"
params = {"access_token": token}
payload = {"userid": userid}
resp = requests.post(url, params=params, json=payload)
data = resp.json()
if data.get("errcode") == 0:
return data.get("result", {})
else:
print(f"❌ 获取用户详情失败 userid={userid}: {data}")
return {}
@staticmethod
def create_calendar_client() -> dingtalkcalendar_1_0Client:
config = open_api_models.Config()
config.protocol = 'https'
config.region_id = 'central'
return dingtalkcalendar_1_0Client(config)
@staticmethod
def get_user_calendar_events(client, access_token: str, unionid: str) -> List[Dict]:
headers = dingtalkcalendar_1_0_models.ListEventsHeaders()
headers.x_acs_dingtalk_access_token = access_token
request = dingtalkcalendar_1_0_models.ListEventsRequest()
try:
resp = client.list_events_with_options(unionid, 'primary', request, headers, util_models.RuntimeOptions())
return resp.body.events if resp and resp.body and hasattr(resp.body, "events") else []
except Exception as e:
print(f"❌ 获取日程失败 unionid={unionid},错误: {e}")
return []
def main():
app_key = "dinguetojbaxvvhzpk3d"
app_secret = "lMFqns_ceLIcXvfLL8GKfa3ZiPKHcaZq0VbGtJXJlDuK8AEJ2WV3-PN8zv61ajm3"
token = DingTalkHelper.get_access_token(app_key, app_secret)
if not token:
return
print("获取全部部门...")
departments = DingTalkHelper.get_all_departments(token)
print(f"部门总数: {len(departments)}")
all_userids = set()
for dept in departments:
userids = DingTalkHelper.get_users_by_dept(token, dept["dept_id"])
all_userids.update(userids)
print(f"共获取用户数量: {len(all_userids)}")
calendar_client = DingTalkHelper.create_calendar_client()
for userid in all_userids:
user_detail = DingTalkHelper.get_user_detail(token, userid)
if not user_detail:
continue
unionid = user_detail.get("unionid")
name = user_detail.get("name")
print(f"用户: {name}userid: {userid}unionid: {unionid}")
if unionid:
events = DingTalkHelper.get_user_calendar_events(calendar_client, token, unionid)
print(f"日程数: {len(events)}")
for event in events:
print(event)
if __name__ == '__main__':
main()