631 lines
23 KiB
Python
631 lines
23 KiB
Python
# Code generated by Lark OpenAPI.
|
|
|
|
import io
|
|
from typing import Any, Optional, Union, Dict, List, Set, IO, Callable, Type
|
|
from lark_oapi.core.const import UTF_8, CONTENT_TYPE, APPLICATION_JSON
|
|
from lark_oapi.core import JSON
|
|
from lark_oapi.core.token import verify
|
|
from lark_oapi.core.http import Transport
|
|
from lark_oapi.core.model import Config, RequestOption, RawResponse
|
|
from lark_oapi.core.utils import Files
|
|
from requests_toolbelt import MultipartEncoder
|
|
from ..model.copy_file_request import CopyFileRequest
|
|
from ..model.copy_file_response import CopyFileResponse
|
|
from ..model.create_folder_file_request import CreateFolderFileRequest
|
|
from ..model.create_folder_file_response import CreateFolderFileResponse
|
|
from ..model.create_shortcut_file_request import CreateShortcutFileRequest
|
|
from ..model.create_shortcut_file_response import CreateShortcutFileResponse
|
|
from ..model.delete_file_request import DeleteFileRequest
|
|
from ..model.delete_file_response import DeleteFileResponse
|
|
from ..model.delete_subscribe_file_request import DeleteSubscribeFileRequest
|
|
from ..model.delete_subscribe_file_response import DeleteSubscribeFileResponse
|
|
from ..model.download_file_request import DownloadFileRequest
|
|
from ..model.download_file_response import DownloadFileResponse
|
|
from ..model.get_subscribe_file_request import GetSubscribeFileRequest
|
|
from ..model.get_subscribe_file_response import GetSubscribeFileResponse
|
|
from ..model.list_file_request import ListFileRequest
|
|
from ..model.list_file_response import ListFileResponse
|
|
from ..model.move_file_request import MoveFileRequest
|
|
from ..model.move_file_response import MoveFileResponse
|
|
from ..model.subscribe_file_request import SubscribeFileRequest
|
|
from ..model.subscribe_file_response import SubscribeFileResponse
|
|
from ..model.task_check_file_request import TaskCheckFileRequest
|
|
from ..model.task_check_file_response import TaskCheckFileResponse
|
|
from ..model.upload_all_file_request import UploadAllFileRequest
|
|
from ..model.upload_all_file_response import UploadAllFileResponse
|
|
from ..model.upload_finish_file_request import UploadFinishFileRequest
|
|
from ..model.upload_finish_file_response import UploadFinishFileResponse
|
|
from ..model.upload_part_file_request import UploadPartFileRequest
|
|
from ..model.upload_part_file_response import UploadPartFileResponse
|
|
from ..model.upload_prepare_file_request import UploadPrepareFileRequest
|
|
from ..model.upload_prepare_file_response import UploadPrepareFileResponse
|
|
|
|
|
|
class File(object):
|
|
def __init__(self, config: Config) -> None:
|
|
self.config: Config = config
|
|
|
|
def copy(self, request: CopyFileRequest, option: Optional[RequestOption] = None) -> CopyFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 添加 content-type
|
|
if request.body is not None:
|
|
option.headers[CONTENT_TYPE] = f"{APPLICATION_JSON}; charset=utf-8"
|
|
|
|
# 发起请求
|
|
resp: RawResponse = Transport.execute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: CopyFileResponse = JSON.unmarshal(str(resp.content, UTF_8), CopyFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
async def acopy(self, request: CopyFileRequest, option: Optional[RequestOption] = None) -> CopyFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 发起请求
|
|
resp: RawResponse = await Transport.aexecute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: CopyFileResponse = JSON.unmarshal(str(resp.content, UTF_8), CopyFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
def create_folder(self, request: CreateFolderFileRequest,
|
|
option: Optional[RequestOption] = None) -> CreateFolderFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 添加 content-type
|
|
if request.body is not None:
|
|
option.headers[CONTENT_TYPE] = f"{APPLICATION_JSON}; charset=utf-8"
|
|
|
|
# 发起请求
|
|
resp: RawResponse = Transport.execute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: CreateFolderFileResponse = JSON.unmarshal(str(resp.content, UTF_8), CreateFolderFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
async def acreate_folder(self, request: CreateFolderFileRequest,
|
|
option: Optional[RequestOption] = None) -> CreateFolderFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 发起请求
|
|
resp: RawResponse = await Transport.aexecute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: CreateFolderFileResponse = JSON.unmarshal(str(resp.content, UTF_8), CreateFolderFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
def create_shortcut(self, request: CreateShortcutFileRequest,
|
|
option: Optional[RequestOption] = None) -> CreateShortcutFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 添加 content-type
|
|
if request.body is not None:
|
|
option.headers[CONTENT_TYPE] = f"{APPLICATION_JSON}; charset=utf-8"
|
|
|
|
# 发起请求
|
|
resp: RawResponse = Transport.execute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: CreateShortcutFileResponse = JSON.unmarshal(str(resp.content, UTF_8), CreateShortcutFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
async def acreate_shortcut(self, request: CreateShortcutFileRequest,
|
|
option: Optional[RequestOption] = None) -> CreateShortcutFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 发起请求
|
|
resp: RawResponse = await Transport.aexecute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: CreateShortcutFileResponse = JSON.unmarshal(str(resp.content, UTF_8), CreateShortcutFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
def delete(self, request: DeleteFileRequest, option: Optional[RequestOption] = None) -> DeleteFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 添加 content-type
|
|
if request.body is not None:
|
|
option.headers[CONTENT_TYPE] = f"{APPLICATION_JSON}; charset=utf-8"
|
|
|
|
# 发起请求
|
|
resp: RawResponse = Transport.execute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: DeleteFileResponse = JSON.unmarshal(str(resp.content, UTF_8), DeleteFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
async def adelete(self, request: DeleteFileRequest, option: Optional[RequestOption] = None) -> DeleteFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 发起请求
|
|
resp: RawResponse = await Transport.aexecute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: DeleteFileResponse = JSON.unmarshal(str(resp.content, UTF_8), DeleteFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
def delete_subscribe(self, request: DeleteSubscribeFileRequest,
|
|
option: Optional[RequestOption] = None) -> DeleteSubscribeFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 添加 content-type
|
|
if request.body is not None:
|
|
option.headers[CONTENT_TYPE] = f"{APPLICATION_JSON}; charset=utf-8"
|
|
|
|
# 发起请求
|
|
resp: RawResponse = Transport.execute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: DeleteSubscribeFileResponse = JSON.unmarshal(str(resp.content, UTF_8), DeleteSubscribeFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
async def adelete_subscribe(self, request: DeleteSubscribeFileRequest,
|
|
option: Optional[RequestOption] = None) -> DeleteSubscribeFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 发起请求
|
|
resp: RawResponse = await Transport.aexecute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: DeleteSubscribeFileResponse = JSON.unmarshal(str(resp.content, UTF_8), DeleteSubscribeFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
def download(self, request: DownloadFileRequest, option: Optional[RequestOption] = None) -> DownloadFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 添加 content-type
|
|
if request.body is not None:
|
|
option.headers[CONTENT_TYPE] = f"{APPLICATION_JSON}; charset=utf-8"
|
|
|
|
# 发起请求
|
|
resp: RawResponse = Transport.execute(self.config, request, option)
|
|
|
|
# 处理二进制流
|
|
content_type = resp.headers.get(CONTENT_TYPE)
|
|
response: DownloadFileResponse = DownloadFileResponse()
|
|
if 200 <= resp.status_code < 300:
|
|
response.code = 0
|
|
response.file = io.BytesIO(resp.content)
|
|
response.file_name = Files.parse_file_name(resp.headers)
|
|
elif content_type is not None and content_type.startswith(APPLICATION_JSON):
|
|
response = JSON.unmarshal(str(resp.content, UTF_8), DownloadFileResponse)
|
|
|
|
response.raw = resp
|
|
return response
|
|
|
|
async def adownload(self, request: DownloadFileRequest,
|
|
option: Optional[RequestOption] = None) -> DownloadFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 发起请求
|
|
resp: RawResponse = await Transport.aexecute(self.config, request, option)
|
|
|
|
# 处理二进制流
|
|
content_type = resp.headers.get(CONTENT_TYPE)
|
|
response: DownloadFileResponse = DownloadFileResponse()
|
|
if 200 <= resp.status_code < 300:
|
|
response.code = 0
|
|
response.file = io.BytesIO(resp.content)
|
|
response.file_name = Files.parse_file_name(resp.headers)
|
|
elif content_type is not None and content_type.startswith(APPLICATION_JSON):
|
|
response = JSON.unmarshal(str(resp.content, UTF_8), DownloadFileResponse)
|
|
|
|
response.raw = resp
|
|
return response
|
|
|
|
def get_subscribe(self, request: GetSubscribeFileRequest,
|
|
option: Optional[RequestOption] = None) -> GetSubscribeFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 添加 content-type
|
|
if request.body is not None:
|
|
option.headers[CONTENT_TYPE] = f"{APPLICATION_JSON}; charset=utf-8"
|
|
|
|
# 发起请求
|
|
resp: RawResponse = Transport.execute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: GetSubscribeFileResponse = JSON.unmarshal(str(resp.content, UTF_8), GetSubscribeFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
async def aget_subscribe(self, request: GetSubscribeFileRequest,
|
|
option: Optional[RequestOption] = None) -> GetSubscribeFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 发起请求
|
|
resp: RawResponse = await Transport.aexecute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: GetSubscribeFileResponse = JSON.unmarshal(str(resp.content, UTF_8), GetSubscribeFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
def list(self, request: ListFileRequest, option: Optional[RequestOption] = None) -> ListFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 添加 content-type
|
|
if request.body is not None:
|
|
option.headers[CONTENT_TYPE] = f"{APPLICATION_JSON}; charset=utf-8"
|
|
|
|
# 发起请求
|
|
resp: RawResponse = Transport.execute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: ListFileResponse = JSON.unmarshal(str(resp.content, UTF_8), ListFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
async def alist(self, request: ListFileRequest, option: Optional[RequestOption] = None) -> ListFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 发起请求
|
|
resp: RawResponse = await Transport.aexecute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: ListFileResponse = JSON.unmarshal(str(resp.content, UTF_8), ListFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
def move(self, request: MoveFileRequest, option: Optional[RequestOption] = None) -> MoveFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 添加 content-type
|
|
if request.body is not None:
|
|
option.headers[CONTENT_TYPE] = f"{APPLICATION_JSON}; charset=utf-8"
|
|
|
|
# 发起请求
|
|
resp: RawResponse = Transport.execute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: MoveFileResponse = JSON.unmarshal(str(resp.content, UTF_8), MoveFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
async def amove(self, request: MoveFileRequest, option: Optional[RequestOption] = None) -> MoveFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 发起请求
|
|
resp: RawResponse = await Transport.aexecute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: MoveFileResponse = JSON.unmarshal(str(resp.content, UTF_8), MoveFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
def subscribe(self, request: SubscribeFileRequest, option: Optional[RequestOption] = None) -> SubscribeFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 添加 content-type
|
|
if request.body is not None:
|
|
option.headers[CONTENT_TYPE] = f"{APPLICATION_JSON}; charset=utf-8"
|
|
|
|
# 发起请求
|
|
resp: RawResponse = Transport.execute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: SubscribeFileResponse = JSON.unmarshal(str(resp.content, UTF_8), SubscribeFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
async def asubscribe(self, request: SubscribeFileRequest,
|
|
option: Optional[RequestOption] = None) -> SubscribeFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 发起请求
|
|
resp: RawResponse = await Transport.aexecute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: SubscribeFileResponse = JSON.unmarshal(str(resp.content, UTF_8), SubscribeFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
def task_check(self, request: TaskCheckFileRequest,
|
|
option: Optional[RequestOption] = None) -> TaskCheckFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 添加 content-type
|
|
if request.body is not None:
|
|
option.headers[CONTENT_TYPE] = f"{APPLICATION_JSON}; charset=utf-8"
|
|
|
|
# 发起请求
|
|
resp: RawResponse = Transport.execute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: TaskCheckFileResponse = JSON.unmarshal(str(resp.content, UTF_8), TaskCheckFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
async def atask_check(self, request: TaskCheckFileRequest,
|
|
option: Optional[RequestOption] = None) -> TaskCheckFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 发起请求
|
|
resp: RawResponse = await Transport.aexecute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: TaskCheckFileResponse = JSON.unmarshal(str(resp.content, UTF_8), TaskCheckFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
def upload_all(self, request: UploadAllFileRequest,
|
|
option: Optional[RequestOption] = None) -> UploadAllFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 添加 content-type
|
|
if request.body is not None:
|
|
form_data = MultipartEncoder(Files.parse_form_data(request.body))
|
|
request.body = form_data
|
|
option.headers[CONTENT_TYPE] = form_data.content_type
|
|
|
|
# 发起请求
|
|
resp: RawResponse = Transport.execute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: UploadAllFileResponse = JSON.unmarshal(str(resp.content, UTF_8), UploadAllFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
async def aupload_all(self, request: UploadAllFileRequest,
|
|
option: Optional[RequestOption] = None) -> UploadAllFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 解析文件
|
|
request.files = Files.extract_files(request.body)
|
|
|
|
# 发起请求
|
|
resp: RawResponse = await Transport.aexecute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: UploadAllFileResponse = JSON.unmarshal(str(resp.content, UTF_8), UploadAllFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
def upload_finish(self, request: UploadFinishFileRequest,
|
|
option: Optional[RequestOption] = None) -> UploadFinishFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 添加 content-type
|
|
if request.body is not None:
|
|
option.headers[CONTENT_TYPE] = f"{APPLICATION_JSON}; charset=utf-8"
|
|
|
|
# 发起请求
|
|
resp: RawResponse = Transport.execute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: UploadFinishFileResponse = JSON.unmarshal(str(resp.content, UTF_8), UploadFinishFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
async def aupload_finish(self, request: UploadFinishFileRequest,
|
|
option: Optional[RequestOption] = None) -> UploadFinishFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 发起请求
|
|
resp: RawResponse = await Transport.aexecute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: UploadFinishFileResponse = JSON.unmarshal(str(resp.content, UTF_8), UploadFinishFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
def upload_part(self, request: UploadPartFileRequest,
|
|
option: Optional[RequestOption] = None) -> UploadPartFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 添加 content-type
|
|
if request.body is not None:
|
|
form_data = MultipartEncoder(Files.parse_form_data(request.body))
|
|
request.body = form_data
|
|
option.headers[CONTENT_TYPE] = form_data.content_type
|
|
|
|
# 发起请求
|
|
resp: RawResponse = Transport.execute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: UploadPartFileResponse = JSON.unmarshal(str(resp.content, UTF_8), UploadPartFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
async def aupload_part(self, request: UploadPartFileRequest,
|
|
option: Optional[RequestOption] = None) -> UploadPartFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 解析文件
|
|
request.files = Files.extract_files(request.body)
|
|
|
|
# 发起请求
|
|
resp: RawResponse = await Transport.aexecute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: UploadPartFileResponse = JSON.unmarshal(str(resp.content, UTF_8), UploadPartFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
def upload_prepare(self, request: UploadPrepareFileRequest,
|
|
option: Optional[RequestOption] = None) -> UploadPrepareFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 添加 content-type
|
|
if request.body is not None:
|
|
option.headers[CONTENT_TYPE] = f"{APPLICATION_JSON}; charset=utf-8"
|
|
|
|
# 发起请求
|
|
resp: RawResponse = Transport.execute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: UploadPrepareFileResponse = JSON.unmarshal(str(resp.content, UTF_8), UploadPrepareFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|
|
|
|
async def aupload_prepare(self, request: UploadPrepareFileRequest,
|
|
option: Optional[RequestOption] = None) -> UploadPrepareFileResponse:
|
|
if option is None:
|
|
option = RequestOption()
|
|
|
|
# 鉴权、获取 token
|
|
verify(self.config, request, option)
|
|
|
|
# 发起请求
|
|
resp: RawResponse = await Transport.aexecute(self.config, request, option)
|
|
|
|
# 反序列化
|
|
response: UploadPrepareFileResponse = JSON.unmarshal(str(resp.content, UTF_8), UploadPrepareFileResponse)
|
|
response.raw = resp
|
|
|
|
return response
|