210 lines
7.1 KiB
Python
210 lines
7.1 KiB
Python
import json
|
|
import csv
|
|
import pytz
|
|
import os
|
|
from datetime import datetime
|
|
import lark_oapi as lark
|
|
from lark_oapi.api.attendance.v1 import *
|
|
from lark_oapi.api.contact.v3 import *
|
|
from flask import Flask, request, jsonify, send_from_directory
|
|
|
|
# Flask app 配置
|
|
app = Flask(__name__)
|
|
|
|
|
|
def str_to_timestamp_seconds(dt_str):
|
|
"""将时间字符串转为秒级时间戳"""
|
|
try:
|
|
dt = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
|
|
return int(dt.timestamp())
|
|
except Exception as e:
|
|
raise ValueError(f"时间格式错误,应为 'YYYY-MM-DD HH:MM:SS', 但收到: {dt_str}")
|
|
|
|
|
|
def convert_timestamp_to_datetime(timestamp, timezone="Asia/Shanghai"):
|
|
"""将时间戳转换为本地日期时间格式"""
|
|
# 将时间戳转换为 UTC 时间
|
|
utc_time = datetime.utcfromtimestamp(int(timestamp)).replace(tzinfo=pytz.utc)
|
|
|
|
# 转换为指定的本地时区时间(默认为上海时间)
|
|
local_timezone = pytz.timezone(timezone)
|
|
local_time = utc_time.astimezone(local_timezone)
|
|
|
|
# 格式化为 'YYYY-MM-DD HH:MM:SS'
|
|
return local_time.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
|
|
def get_user_ids_and_names(client, department_id):
|
|
"""获取部门内所有用户的 user_id 和 name"""
|
|
request: FindByDepartmentUserRequest = FindByDepartmentUserRequest.builder() \
|
|
.user_id_type("open_id") \
|
|
.department_id_type("open_department_id") \
|
|
.department_id(department_id) \
|
|
.page_size(10) \
|
|
.build()
|
|
|
|
# 发起请求
|
|
response: FindByDepartmentUserResponse = client.contact.v3.user.find_by_department(request)
|
|
|
|
if not response.success():
|
|
lark.logger.error(f"获取部门用户失败,错误信息: {response.msg}")
|
|
return [], {}
|
|
|
|
user_ids = []
|
|
user_names = {}
|
|
|
|
# 提取所有用户的 user_id 和 name
|
|
for item in response.data.items:
|
|
user_ids.append(item.user_id)
|
|
user_names[item.user_id] = item.name
|
|
|
|
return user_ids, user_names
|
|
|
|
|
|
def get_attendance_data(client, user_ids, start_time, end_time, user_names):
|
|
"""获取多个用户的打卡记录"""
|
|
all_records = []
|
|
|
|
# 构造请求对象
|
|
request: QueryUserFlowRequest = QueryUserFlowRequest.builder() \
|
|
.employee_type("employee_id") \
|
|
.include_terminated_user(True) \
|
|
.request_body(QueryUserFlowRequestBody.builder()
|
|
.user_ids(user_ids) # 传入所有的 user_id
|
|
.check_time_from(start_time)
|
|
.check_time_to(end_time)
|
|
.build()) \
|
|
.build()
|
|
|
|
# 发起请求
|
|
response: QueryUserFlowResponse = client.attendance.v1.user_flow.query(request)
|
|
|
|
if not response.success():
|
|
lark.logger.error(f"获取用户打卡记录失败,错误信息: {response.msg}")
|
|
return []
|
|
|
|
# 访问 user_flow_results 属性,而不是使用 .get() 方法
|
|
records = response.data.user_flow_results # 直接访问属性
|
|
|
|
if not records:
|
|
return all_records
|
|
|
|
# 将当前页的记录添加到所有记录中
|
|
for record in records:
|
|
try:
|
|
# 获取打卡时间、地点、照片链接
|
|
check_in_time = convert_timestamp_to_datetime(record.check_time)
|
|
location_name = record.location_name if hasattr(record, 'location_name') else '无位置'
|
|
photo_urls = record.photo_urls if hasattr(record, 'photo_urls') else []
|
|
photo_urls_str = ", ".join(photo_urls) # 将多个照片链接拼接为字符串
|
|
comment = record.comment if hasattr(record, 'comment') else ''
|
|
# 获取用户名(通过之前获取的 user_names 字典)
|
|
user_name = user_names.get(record.user_id, "未知用户")
|
|
|
|
# 将打卡记录添加到统计数据中
|
|
all_records.append([record.user_id, user_name, check_in_time, location_name, photo_urls_str, comment])
|
|
except Exception as e:
|
|
lark.logger.error(f"处理打卡记录时出错: {e}")
|
|
|
|
return all_records
|
|
|
|
|
|
def save_to_csv(data):
|
|
"""Save attendance records to a CSV file and return structured data"""
|
|
os.makedirs("output", exist_ok=True)
|
|
file_path = os.path.join("output", "attendance_report.csv")
|
|
|
|
header = ["user_id", "name", "check_in_time", "location_name", "photo_urls", "comment"]
|
|
|
|
# Debug: print data to be saved
|
|
print(f"Preparing to save the following data to {file_path}:")
|
|
for row in data:
|
|
print(row)
|
|
|
|
# Write data to CSV (overwrite if file exists)
|
|
with open(file_path, "w", newline="", encoding="utf-8-sig") as csvfile:
|
|
writer = csv.writer(csvfile)
|
|
writer.writerow(header)
|
|
for row in data:
|
|
writer.writerow(row)
|
|
|
|
# Return structured list of dicts
|
|
structured_data = [
|
|
{header[i]: row[i] for i in range(len(header))}
|
|
for row in data
|
|
]
|
|
|
|
return structured_data, file_path
|
|
def get_all_department_ids(client):
|
|
"""分页获取所有 open_department_id"""
|
|
all_department_ids = []
|
|
page_token = None
|
|
|
|
while True:
|
|
req_builder = ChildrenDepartmentRequest.builder() \
|
|
.department_id("0") \
|
|
.user_id_type("open_id") \
|
|
.department_id_type("department_id") \
|
|
.fetch_child(True) \
|
|
.page_size(50)
|
|
if page_token:
|
|
req_builder.page_token(page_token)
|
|
|
|
request = req_builder.build()
|
|
response: ChildrenDepartmentResponse = client.contact.v3.department.children(request)
|
|
|
|
if not response.success():
|
|
print(f"获取部门失败: {response.msg}")
|
|
break
|
|
|
|
for item in response.data.items:
|
|
all_department_ids.append(item.open_department_id)
|
|
|
|
if not response.data.has_more:
|
|
break
|
|
page_token = response.data.page_token
|
|
|
|
return all_department_ids
|
|
|
|
@app.route('/attendance', methods=['POST'])
|
|
def attendance():
|
|
data = request.get_json()
|
|
app_id = data.get("app_id")
|
|
app_secret = data.get("app_secret")
|
|
start_time_str = data.get("start_time")
|
|
end_time_str = data.get("end_time")
|
|
|
|
if not all([app_id, app_secret, start_time_str, end_time_str]):
|
|
return jsonify({"error": "缺少必要的参数: app_id, app_secret, start_time, end_time"}), 400
|
|
|
|
start_time = str_to_timestamp_seconds(start_time_str)
|
|
end_time = str_to_timestamp_seconds(end_time_str)
|
|
|
|
client = lark.Client.builder() \
|
|
.app_id(app_id) \
|
|
.app_secret(app_secret) \
|
|
.log_level(lark.LogLevel.ERROR) \
|
|
.build()
|
|
|
|
# 获取所有部门 ID
|
|
department_ids = get_all_department_ids(client)
|
|
print(f"共获取到 {len(department_ids)} 个部门")
|
|
|
|
all_check_in_records = []
|
|
|
|
for dep_id in department_ids:
|
|
user_ids, user_names = get_user_ids_and_names(client, dep_id)
|
|
p
|
|
dep_records = get_attendance_data(client, user_ids, start_time, end_time, user_names)
|
|
all_check_in_records.extend(dep_records)
|
|
|
|
structured_data, file_path = save_to_csv(all_check_in_records)
|
|
return jsonify({
|
|
"message": "打卡统计已保存",
|
|
"file_path": file_path,
|
|
"data": structured_data
|
|
}), 200
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=8789)
|