sdk/oapiSdk/bitable.py

147 lines
4.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, request, jsonify
import pandas as pd
from lark_oapi import Client, LogLevel, logger
from lark_oapi.api.bitable.v1 import (
ListAppTableRequest,
SearchAppTableRecordRequest,
SearchAppTableRecordRequestBody,
SearchAppTableRecordResponse,
)
import re
from urllib.parse import urlparse
from lark_oapi import JSON # 确保导入这个工具
import io
app = Flask(__name__)
def read_csv_with_auto_encoding_from_bytes(csv_bytes):
# 这里用 pandas 直接读取bytes猜测编码失败时默认 utf-8
try:
return pd.read_csv(io.BytesIO(csv_bytes))
except UnicodeDecodeError:
# 如果utf-8解码失败可尝试其他编码检测或硬编码
# 这里简单尝试 gbk
return pd.read_csv(io.BytesIO(csv_bytes), encoding='gbk')
def extract_app_token(url):
path = urlparse(url).path
match = re.search(r'/(base|wiki)/([^/]+)', path)
if match:
return match.group(2)
return None
def get_table_ids(client, app_token):
request = ListAppTableRequest.builder() \
.app_token(app_token) \
.page_size(50) \
.build()
response = client.bitable.v1.app_table.list(request)
if not response.success() or not response.data or not response.data.items:
logger.error(f"获取 table_id 失败: {response.code}, {response.msg}")
return []
return [item.table_id for item in response.data.items]
def get_all_records_dict(client, app_token, table_id):
items = get_all_records(client, app_token, table_id)
# 提取字段为 dict不使用 JSON.marshal()
return [
{
"record_id": item.record_id,
"fields": item.fields # item.fields 本身是 dict
}
for item in items
]
def get_all_records(client, app_token, table_id):
all_items = []
page_token = None
while True:
req_builder = SearchAppTableRecordRequest.builder() \
.app_token(app_token) \
.table_id(table_id) \
.page_size(50) \
.request_body(SearchAppTableRecordRequestBody.builder().build())
if page_token:
req_builder.page_token(page_token)
request = req_builder.build()
response: SearchAppTableRecordResponse = client.bitable.v1.app_table_record.search(request)
if not response.success() or not response.data:
logger.error(f"获取记录失败: {response.code}, {response.msg}")
break
items = response.data.items
# ✅ 使用 JSON.marshal 序列化为 dict
items_dict = [
{
"record_id": item.record_id,
"fields": item.fields # 已经是 dict不需要再序列化
}
for item in items
]
all_items.extend(items_dict)
if response.data.has_more:
page_token = response.data.page_token
else:
break
return all_items
@app.route('/fetch_records', methods=['POST'])
def fetch_records():
try:
APP_ID = request.form.get("app_id")
APP_SECRET = request.form.get("app_secret")
file = request.files.get("file")
if not all([APP_ID, APP_SECRET, file]):
return jsonify({"error": "缺少参数 app_id, app_secret 或 上传的文件"}), 400
csv_bytes = file.read()
df = read_csv_with_auto_encoding_from_bytes(csv_bytes)
client = Client.builder().app_id(APP_ID).app_secret(APP_SECRET).log_level(LogLevel.INFO).build()
results = []
for _, row in df.iterrows():
url = row.get("url", "")
app_token = extract_app_token(url)
if not app_token:
continue
table_ids = get_table_ids(client, app_token)
if not table_ids:
continue
tables = []
for table_id in table_ids:
items = get_all_records(client, app_token, table_id)
tables.append({
"table_id": table_id,
"items": items
})
results.append({
"app_token": app_token,
"tables": tables
})
return jsonify(results)
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8828)