sdk/oapiSdk/attendance.py

210 lines
7.1 KiB
Python

import json
import csv
import pytz
import os
from datetime import datetime
import lark_oapi as lark
from lark_oapi.api.attendance.v1 import *
from lark_oapi.api.contact.v3 import *
from flask import Flask, request, jsonify, send_from_directory
# Flask app 配置
app = Flask(__name__)
def str_to_timestamp_seconds(dt_str):
"""将时间字符串转为秒级时间戳"""
try:
dt = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
return int(dt.timestamp())
except Exception as e:
raise ValueError(f"时间格式错误,应为 'YYYY-MM-DD HH:MM:SS', 但收到: {dt_str}")
def convert_timestamp_to_datetime(timestamp, timezone="Asia/Shanghai"):
"""将时间戳转换为本地日期时间格式"""
# 将时间戳转换为 UTC 时间
utc_time = datetime.utcfromtimestamp(int(timestamp)).replace(tzinfo=pytz.utc)
# 转换为指定的本地时区时间(默认为上海时间)
local_timezone = pytz.timezone(timezone)
local_time = utc_time.astimezone(local_timezone)
# 格式化为 'YYYY-MM-DD HH:MM:SS'
return local_time.strftime('%Y-%m-%d %H:%M:%S')
def get_user_ids_and_names(client, department_id):
"""获取部门内所有用户的 user_id 和 name"""
request: FindByDepartmentUserRequest = FindByDepartmentUserRequest.builder() \
.user_id_type("open_id") \
.department_id_type("open_department_id") \
.department_id(department_id) \
.page_size(10) \
.build()
# 发起请求
response: FindByDepartmentUserResponse = client.contact.v3.user.find_by_department(request)
if not response.success():
lark.logger.error(f"获取部门用户失败,错误信息: {response.msg}")
return [], {}
user_ids = []
user_names = {}
# 提取所有用户的 user_id 和 name
for item in response.data.items:
user_ids.append(item.user_id)
user_names[item.user_id] = item.name
return user_ids, user_names
def get_attendance_data(client, user_ids, start_time, end_time, user_names):
"""获取多个用户的打卡记录"""
all_records = []
# 构造请求对象
request: QueryUserFlowRequest = QueryUserFlowRequest.builder() \
.employee_type("employee_id") \
.include_terminated_user(True) \
.request_body(QueryUserFlowRequestBody.builder()
.user_ids(user_ids) # 传入所有的 user_id
.check_time_from(start_time)
.check_time_to(end_time)
.build()) \
.build()
# 发起请求
response: QueryUserFlowResponse = client.attendance.v1.user_flow.query(request)
if not response.success():
lark.logger.error(f"获取用户打卡记录失败,错误信息: {response.msg}")
return []
# 访问 user_flow_results 属性,而不是使用 .get() 方法
records = response.data.user_flow_results # 直接访问属性
if not records:
return all_records
# 将当前页的记录添加到所有记录中
for record in records:
try:
# 获取打卡时间、地点、照片链接
check_in_time = convert_timestamp_to_datetime(record.check_time)
location_name = record.location_name if hasattr(record, 'location_name') else '无位置'
photo_urls = record.photo_urls if hasattr(record, 'photo_urls') else []
photo_urls_str = ", ".join(photo_urls) # 将多个照片链接拼接为字符串
comment = record.comment if hasattr(record, 'comment') else ''
# 获取用户名(通过之前获取的 user_names 字典)
user_name = user_names.get(record.user_id, "未知用户")
# 将打卡记录添加到统计数据中
all_records.append([record.user_id, user_name, check_in_time, location_name, photo_urls_str, comment])
except Exception as e:
lark.logger.error(f"处理打卡记录时出错: {e}")
return all_records
def save_to_csv(data):
"""Save attendance records to a CSV file and return structured data"""
os.makedirs("output", exist_ok=True)
file_path = os.path.join("output", "attendance_report.csv")
header = ["user_id", "name", "check_in_time", "location_name", "photo_urls", "comment"]
# Debug: print data to be saved
print(f"Preparing to save the following data to {file_path}:")
for row in data:
print(row)
# Write data to CSV (overwrite if file exists)
with open(file_path, "w", newline="", encoding="utf-8-sig") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(header)
for row in data:
writer.writerow(row)
# Return structured list of dicts
structured_data = [
{header[i]: row[i] for i in range(len(header))}
for row in data
]
return structured_data, file_path
def get_all_department_ids(client):
"""分页获取所有 open_department_id"""
all_department_ids = []
page_token = None
while True:
req_builder = ChildrenDepartmentRequest.builder() \
.department_id("0") \
.user_id_type("open_id") \
.department_id_type("department_id") \
.fetch_child(True) \
.page_size(50)
if page_token:
req_builder.page_token(page_token)
request = req_builder.build()
response: ChildrenDepartmentResponse = client.contact.v3.department.children(request)
if not response.success():
print(f"获取部门失败: {response.msg}")
break
for item in response.data.items:
all_department_ids.append(item.open_department_id)
if not response.data.has_more:
break
page_token = response.data.page_token
return all_department_ids
@app.route('/attendance', methods=['POST'])
def attendance():
data = request.get_json()
app_id = data.get("app_id")
app_secret = data.get("app_secret")
start_time_str = data.get("start_time")
end_time_str = data.get("end_time")
if not all([app_id, app_secret, start_time_str, end_time_str]):
return jsonify({"error": "缺少必要的参数: app_id, app_secret, start_time, end_time"}), 400
start_time = str_to_timestamp_seconds(start_time_str)
end_time = str_to_timestamp_seconds(end_time_str)
client = lark.Client.builder() \
.app_id(app_id) \
.app_secret(app_secret) \
.log_level(lark.LogLevel.ERROR) \
.build()
# 获取所有部门 ID
department_ids = get_all_department_ids(client)
print(f"共获取到 {len(department_ids)} 个部门")
all_check_in_records = []
for dep_id in department_ids:
user_ids, user_names = get_user_ids_and_names(client, dep_id)
p
dep_records = get_attendance_data(client, user_ids, start_time, end_time, user_names)
all_check_in_records.extend(dep_records)
structured_data, file_path = save_to_csv(all_check_in_records)
return jsonify({
"message": "打卡统计已保存",
"file_path": file_path,
"data": structured_data
}), 200
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8789)