sdk/oapiSdk/lark_oapi/card/action_handler.py

133 lines
4.9 KiB
Python

import hashlib
import json
from typing import Optional, Callable, Any
from lark_oapi.core.const import *
from lark_oapi.core.enum import LogLevel
from lark_oapi.core.exception import InvalidArgsException, AccessDeniedException, CardException, \
NoAuthorizationException
from lark_oapi.core.http import HttpHandler
from lark_oapi.core.json import JSON
from lark_oapi.core.log import logger
from lark_oapi.core.model import RawRequest, RawResponse
from lark_oapi.core.utils import Strings, AESCipher
from .model import Card
class CardActionHandler(HttpHandler):
def __init__(self) -> None:
self._encrypt_key: Optional[str] = None
self._verification_token: Optional[str] = None
self._processor: Optional[Callable[[Card], Any]] = None
def do(self, req: RawRequest) -> RawResponse:
logger.debug(f"card access, uri: {req.uri}, "
f"headers: {JSON.marshal(req.headers)}, "
f"body: {str(req.body, UTF_8) if req.body is not None else None}")
resp = RawResponse()
resp.status_code = 200
resp.set_content_type(f"{APPLICATION_JSON}; charset=utf-8")
try:
if req.body is None:
raise InvalidArgsException("request body is null")
# 消息解密
plaintext = self._decrypt(req.body)
# 反序列化
card = JSON.unmarshal(plaintext, Card)
card.raw = req
if URL_VERIFICATION == card.type:
# 验证回调地址
# 校验 token
if self._verification_token != card.token:
raise AccessDeniedException("invalid verification_token")
# 返回 Challenge Code
resp_body = "{\"challenge\":\"%s\"}" % card.challenge
resp.content = resp_body.encode(UTF_8)
return resp
else:
# 否则验签
self._verify_sign(req)
if self._processor is None:
raise CardException("processor not found")
# 处理业务逻辑
result: Any = self._processor(card)
# 返回结果
if result is None:
resp.content = "{\"msg\":\"success\"}".encode(UTF_8)
elif isinstance(result, bytes):
resp.content = result
elif isinstance(result, str):
resp.content = bytes(result, UTF_8)
elif isinstance(result, RawResponse):
resp = result
else:
resp.content = JSON.marshal(result).encode(UTF_8)
return resp
except Exception as e:
logger.exception(
f"handle card failed, uri: {req.uri}, request_id: {req.headers.get(X_REQUEST_ID)}, err: {e}")
resp.status_code = 500
resp_body = "{\"msg\":\"%s\"}" % str(e)
resp.content = resp_body.encode(UTF_8)
return resp
def _decrypt(self, content: bytes) -> str:
plaintext: str
encrypt = json.loads(content).get("encrypt")
if Strings.is_not_empty(encrypt):
if Strings.is_empty(self._encrypt_key):
raise NoAuthorizationException("encrypt_key not found")
plaintext = AESCipher(self._encrypt_key).decrypt_str(encrypt)
else:
plaintext = str(content, UTF_8)
return plaintext
def _verify_sign(self, request: RawRequest) -> None:
if self._verification_token is None or self._verification_token == "":
return
timestamp = request.headers.get(LARK_REQUEST_TIMESTAMP)
nonce = request.headers.get(LARK_REQUEST_NONCE)
signature = request.headers.get(LARK_REQUEST_SIGNATURE)
bs = (timestamp + nonce + self._verification_token).encode(UTF_8) + request.body
h = hashlib.sha1(bs)
if signature != h.hexdigest():
raise AccessDeniedException("signature verification failed")
@staticmethod
def builder(encrypt_key: str, verification_token: str, level: LogLevel = None) -> "CardActionHandlerBuilder":
if level is not None:
logger.setLevel(int(level.value))
return CardActionHandlerBuilder(encrypt_key, verification_token)
class CardActionHandlerBuilder(object):
def __init__(self, encrypt_key: str, verification_token: str) -> None:
self._encrypt_key = encrypt_key
self._verification_token = verification_token
self._processor: Optional[Callable[[Card], Any]] = None
def register(self, f: Callable[[Card], Any]) -> "CardActionHandlerBuilder":
self._processor = f
return self
def build(self) -> CardActionHandler:
card_action_handler = CardActionHandler()
card_action_handler._encrypt_key = self._encrypt_key
card_action_handler._verification_token = self._verification_token
card_action_handler._processor = self._processor
return card_action_handler