sdk/oapiSdk/main.py

368 lines
12 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import csv
import io
import os
from flask import Flask, jsonify, redirect, request
from flask import Blueprint
import time
import requests
from lark_oapi import Client, LogLevel, logger
from lark_oapi.api.drive.v1 import ListFileRequest
import chardet
from flask import Flask, request, jsonify
import lark_oapi as lark
from attendance import get_user_ids_and_names,get_attendance_data,save_to_csv,get_all_department_ids
from message import fetch_messages_for_chat,fetch_chats,save_messages,str_to_timestamp_seconds
from bitable import read_csv_with_auto_encoding_from_bytes,get_table_ids,extract_app_token,get_all_records
from files import list_files_recursive,extract_folder_token_from_url
from calendars import generate_code_verifier,generate_code_challenge
from OaConfig import APP_ID,APP_SECRET,REDIRECT_URI,TOKEN_STORE_FILE
app_oapi = Blueprint("oapi", __name__)
token_data = {}
code_verifier_store = {}
user_access_token = None
token_expires_at = 0
def save_token(app_id, data):
store = {}
if os.path.exists(TOKEN_STORE_FILE):
with open(TOKEN_STORE_FILE, "r") as f:
store = json.load(f)
store[app_id] = data
with open(TOKEN_STORE_FILE, "w") as f:
json.dump(store, f)
def load_token(app_id):
if os.path.exists(TOKEN_STORE_FILE):
with open(TOKEN_STORE_FILE, "r") as f:
store = json.load(f)
return store.get(app_id, {})
return {}
def ensure_token_valid():
global token_data
now = time.time()
if not token_data or not token_data.get("access_token") or now > token_data.get("expires_at", 0):
print("[ensure_token_valid] Access token expired or missing, try refresh token...")
if not refresh_access_token():
print("[ensure_token_valid] Refresh token failed, need user authorization")
return False
return True
def refresh_access_token():
global token_data, APP_ID, APP_SECRET
if not token_data.get("refresh_token"):
print("[refresh_access_token] No refresh_token available")
return False
resp = requests.post(
"https://open.feishu.cn/open-apis/authen/v2/oauth/token",
json={
"grant_type": "refresh_token",
"client_id": APP_ID,
"client_secret": APP_SECRET,
"refresh_token": token_data["refresh_token"]
}
)
data = resp.json()
print(f"[refresh_access_token] Response: {data}")
if data.get("code") == 0:
token_data["access_token"] = data["access_token"]
token_data["refresh_token"] = data["refresh_token"]
token_data["expires_at"] = time.time() + data["expires_in"] - 60
save_token(APP_ID, token_data)
print("[refresh_access_token] Refresh success")
return True
else:
print("[refresh_access_token] Refresh failed")
return False
#获取聊天消息
@app_oapi.route("/fetch_messages", methods=["POST"])
def fetch_messages_api():
data = request.json
if not data:
return jsonify({"error": "请求体必须是 JSON 格式"}), 400
app_id = APP_ID
app_secret = APP_SECRET
start_time_str = data.get("start_time")
end_time_str = data.get("end_time")
if not start_time_str or not end_time_str:
return jsonify({"error": "必须提供 start_time 和 end_time格式YYYY-MM-DD HH:MM:SS"}), 400
try:
start_time = str_to_timestamp_seconds(start_time_str)
end_time = str_to_timestamp_seconds(end_time_str)
except ValueError as e:
return jsonify({"error": str(e)}), 400
if start_time > end_time:
return jsonify({"error": "start_time 不能晚于 end_time"}), 400
client = lark.Client.builder() \
.app_id(app_id) \
.app_secret(app_secret) \
.log_level(lark.LogLevel.ERROR) \
.build()
chats = fetch_chats(client)
if not chats:
return jsonify({"error": "获取群聊失败"}), 500
all_messages = []
for chat in chats:
msgs = fetch_messages_for_chat(client, chat.chat_id, chat.name, start_time, end_time)
all_messages.extend(msgs)
if all_messages:
save_messages(all_messages)
return jsonify({"msg": f"已保存消息,共计 {len(all_messages)}"}), 200
else:
return jsonify({"msg": "该时间区间内无消息"}), 200
#获取文档
@app_oapi.route("/list_folder", methods=["POST"])
def list_folder():
app_id = APP_ID
app_secret = APP_SECRET
csv_file = request.files.get("csv_file")
if not all([app_id, app_secret, csv_file]): return jsonify({"error": "缺少参数 app_id, app_secret 或 csv_file"}), 400
client = lark.Client.builder().app_id(app_id).app_secret(app_secret).log_level(lark.LogLevel.ERROR).build()
option = lark.RequestOption.builder().build()
raw_bytes = csv_file.stream.read()
encoding = chardet.detect(raw_bytes)["encoding"] or "utf-8"
reader = csv.reader(io.StringIO(raw_bytes.decode(encoding)))
tree_result = []
next(reader, None) # Skip header
for row in reader:
if len(row) < 2: continue
folder_token = extract_folder_token_from_url(row[1].strip())
if folder_token: list_files_recursive(client, folder_token, option, tree_result)
return jsonify(tree_result)
#获取打卡记录
@app_oapi.route('/attendance', methods=['POST'])
def attendance():
data = request.get_json()
app_id = APP_ID
app_secret = APP_SECRET
start_time_str = data.get("start_time")
end_time_str = data.get("end_time")
if not all([app_id, app_secret, start_time_str, end_time_str]):
return jsonify({"error": "缺少必要的参数: app_id, app_secret, start_time, end_time"}), 400
start_time = str_to_timestamp_seconds(start_time_str)
end_time = str_to_timestamp_seconds(end_time_str)
client = lark.Client.builder() \
.app_id(app_id) \
.app_secret(app_secret) \
.log_level(lark.LogLevel.ERROR) \
.build()
# 获取所有部门 ID
department_ids = get_all_department_ids(client)
print(f"共获取到 {len(department_ids)} 个部门")
for idx, dep_id in enumerate(department_ids, start=1):
print(f"{idx}. {dep_id}")
all_check_in_records = []
for dep_id in department_ids:
user_ids, user_names = get_user_ids_and_names(client, dep_id)
print(f"\n部门 {dep_id} 员工列表(共 {len(user_ids)} 人):")
for uid in user_ids:
print(f" - {uid} : {user_names.get(uid, '未知姓名')}")
dep_records = get_attendance_data(client, user_ids, start_time, end_time, user_names)
all_check_in_records.extend(dep_records)
structured_data, file_path = save_to_csv(all_check_in_records)
return jsonify({
"message": "打卡统计已保存",
"file_path": file_path,
"data": structured_data
}), 200
#获取多维表格数据
@app_oapi.route('/fetch_records', methods=['POST'])
def fetch_records():
try:
app_id = APP_ID
app_secret = APP_SECRET
file = request.files.get("file")
if not all([app_id, app_secret, file]):
return jsonify({"error": "缺少参数 app_id, app_secret 或 上传的文件"}), 400
csv_bytes = file.read()
df = read_csv_with_auto_encoding_from_bytes(csv_bytes)
client = Client.builder().app_id(app_id).app_secret(app_secret).log_level(LogLevel.INFO).build()
results = []
for _, row in df.iterrows():
url = row.get("url", "")
app_token = extract_app_token(url)
if not app_token:
continue
table_ids = get_table_ids(client, app_token)
if not table_ids:
continue
tables = []
for table_id in table_ids:
items = get_all_records(client, app_token, table_id)
tables.append({
"table_id": table_id,
"items": items
})
results.append({
"app_token": app_token,
"tables": tables
})
return jsonify(results)
except Exception as e:
return jsonify({"error": str(e)}), 500
#获取日历日程安排
token_data = {}
# 1⃣ 生成 PKCE 参数
@app_oapi.route("/pkce", methods=["GET"])
def get_pkce():
global code_verifier_store
code_verifier = generate_code_verifier()
code_challenge = generate_code_challenge(code_verifier)
# 保存,方便 callback 时使用
code_verifier_store["verifier"] = code_verifier
code_verifier_store["challenge"] = code_challenge
return jsonify({
"code_verifier": code_verifier,
"code_challenge": code_challenge
})
# 2⃣ 回调换取 token
@app_oapi.route("/oauth/callback")
def oauth_callback():
"""
PKCE 回调接口:
1⃣ 获取 URL 里的 code
2⃣ 使用 code + code_verifier 换取 token
3⃣ 保存 token
"""
global token_data
# 1⃣ 获取授权码
code = request.args.get("code")
if not code:
return jsonify({"error": "Missing code"}), 400
# 2⃣ 检查配置信息
if not all([APP_ID, APP_SECRET, REDIRECT_URI]):
return jsonify({"error": "Missing config"}), 400
# 3⃣ 获取本地存储的 code_verifier
verifier = code_verifier_store.get("verifier")
if not verifier:
return jsonify({"error": "Missing code_verifier, 请先生成 PKCE code_verifier"}), 400
# 4⃣ 使用 code + verifier 换 token
try:
resp = requests.post(
"https://open.feishu.cn/open-apis/authen/v2/oauth/token",
json={
"grant_type": "authorization_code",
"client_id": APP_ID,
"client_secret": APP_SECRET,
"code": code,
"redirect_uri": REDIRECT_URI,
"code_verifier": verifier
},
timeout=10
)
data = resp.json()
print("[callback] token resp:", data)
except Exception as e:
return jsonify({"error": "请求 token 失败", "exception": str(e)}), 500
# 5⃣ 处理返回结果
if data.get("code") == 0:
token_data = {
"access_token": data["access_token"],
"refresh_token": data["refresh_token"],
"expires_at": time.time() + data["expires_in"] - 60
}
save_token(APP_ID, token_data)
return "授权成功,可以回到终端调用接口了。"
else:
return jsonify({"error": "Token获取失败", "raw": data}), 400
# 3⃣ 设置配置参数
@app_oapi.route("/set_config", methods=["GET"])
def set_config():
global token_data
token_data = load_token(APP_ID)
return jsonify({"message": "配置成功", "token_data": token_data})
# 4⃣ 获取日历和日程
@app_oapi.route("/calendar_events", methods=["POST"])
def calendar_events():
global token_data
if not ensure_token_valid():
challenge = code_verifier_store.get("challenge")
if not challenge:
return jsonify({"error": "请先访问 /pkce 获取 PKCE 参数"}), 400
auth_url = (
f"https://accounts.feishu.cn/open-apis/authen/v1/authorize"
f"?client_id={APP_ID}"
f"&redirect_uri={REDIRECT_URI}"
f"&scope=calendar:calendar.event:read%20calendar:calendar:readonly%20offline_access"
f"&state=auth"
f"&response_type=code"
f"&code_challenge={challenge}"
f"&code_challenge_method=S256"
)
return jsonify({"message": "需要授权", "auth_url": auth_url}), 401
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
calendars_resp = requests.get("https://open.feishu.cn/open-apis/calendar/v4/calendars", headers=headers).json()
if calendars_resp.get("code") != 0:
return jsonify({"error": "获取日历列表失败", "raw": calendars_resp}), 500
calendars = calendars_resp.get("data", {}).get("calendar_list", [])
all_events = []
for c in calendars:
cid = c["calendar_id"]
ev_resp = requests.get(
f"https://open.feishu.cn/open-apis/calendar/v4/calendars/{cid}/events",
headers=headers
).json()
if ev_resp.get("code") == 0:
all_events.extend(ev_resp.get("data", {}).get("items", []))
return jsonify({"events": all_events})