XinXiKuaiBaoYuan/django-backend/venv/lib/python3.9/site-packages/openai/_client.py

501 lines
19 KiB
Python
Raw Permalink Normal View History

# File generated from our OpenAPI spec by Stainless.
from __future__ import annotations
import os
import asyncio
from typing import Any, Union, Mapping
from typing_extensions import Self, override
import httpx
from . import resources, _exceptions
from ._qs import Querystring
from ._types import (
NOT_GIVEN,
Omit,
Timeout,
NotGiven,
Transport,
ProxiesTypes,
RequestOptions,
)
from ._utils import is_given, is_mapping, get_async_library
from ._version import __version__
from ._streaming import Stream as Stream
from ._streaming import AsyncStream as AsyncStream
from ._exceptions import OpenAIError, APIStatusError
from ._base_client import DEFAULT_MAX_RETRIES, SyncAPIClient, AsyncAPIClient
__all__ = [
"Timeout",
"Transport",
"ProxiesTypes",
"RequestOptions",
"resources",
"OpenAI",
"AsyncOpenAI",
"Client",
"AsyncClient",
]
class OpenAI(SyncAPIClient):
completions: resources.Completions
chat: resources.Chat
edits: resources.Edits
embeddings: resources.Embeddings
files: resources.Files
images: resources.Images
audio: resources.Audio
moderations: resources.Moderations
models: resources.Models
fine_tuning: resources.FineTuning
fine_tunes: resources.FineTunes
beta: resources.Beta
with_raw_response: OpenAIWithRawResponse
# client options
api_key: str
organization: str | None
def __init__(
self,
*,
api_key: str | None = None,
organization: str | None = None,
base_url: str | httpx.URL | None = None,
timeout: Union[float, Timeout, None, NotGiven] = NOT_GIVEN,
max_retries: int = DEFAULT_MAX_RETRIES,
default_headers: Mapping[str, str] | None = None,
default_query: Mapping[str, object] | None = None,
# Configure a custom httpx client. See the [httpx documentation](https://www.python-httpx.org/api/#client) for more details.
http_client: httpx.Client | None = None,
# Enable or disable schema validation for data returned by the API.
# When enabled an error APIResponseValidationError is raised
# if the API responds with invalid data for the expected schema.
#
# This parameter may be removed or changed in the future.
# If you rely on this feature, please open a GitHub issue
# outlining your use-case to help us decide if it should be
# part of our public interface in the future.
_strict_response_validation: bool = False,
) -> None:
"""Construct a new synchronous openai client instance.
This automatically infers the following arguments from their corresponding environment variables if they are not provided:
- `api_key` from `OPENAI_API_KEY`
- `organization` from `OPENAI_ORG_ID`
"""
if api_key is None:
api_key = os.environ.get("OPENAI_API_KEY")
if api_key is None:
raise OpenAIError(
"The api_key client option must be set either by passing api_key to the client or by setting the OPENAI_API_KEY environment variable"
)
self.api_key = api_key
if organization is None:
organization = os.environ.get("OPENAI_ORG_ID")
self.organization = organization
if base_url is None:
base_url = os.environ.get("OPENAI_BASE_URL")
if base_url is None:
base_url = f"https://api.openai.com/v1"
super().__init__(
version=__version__,
base_url=base_url,
max_retries=max_retries,
timeout=timeout,
http_client=http_client,
custom_headers=default_headers,
custom_query=default_query,
_strict_response_validation=_strict_response_validation,
)
self._default_stream_cls = Stream
self.completions = resources.Completions(self)
self.chat = resources.Chat(self)
self.edits = resources.Edits(self)
self.embeddings = resources.Embeddings(self)
self.files = resources.Files(self)
self.images = resources.Images(self)
self.audio = resources.Audio(self)
self.moderations = resources.Moderations(self)
self.models = resources.Models(self)
self.fine_tuning = resources.FineTuning(self)
self.fine_tunes = resources.FineTunes(self)
self.beta = resources.Beta(self)
self.with_raw_response = OpenAIWithRawResponse(self)
@property
@override
def qs(self) -> Querystring:
return Querystring(array_format="comma")
@property
@override
def auth_headers(self) -> dict[str, str]:
api_key = self.api_key
return {"Authorization": f"Bearer {api_key}"}
@property
@override
def default_headers(self) -> dict[str, str | Omit]:
return {
**super().default_headers,
"X-Stainless-Async": "false",
"OpenAI-Organization": self.organization if self.organization is not None else Omit(),
**self._custom_headers,
}
def copy(
self,
*,
api_key: str | None = None,
organization: str | None = None,
base_url: str | httpx.URL | None = None,
timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
http_client: httpx.Client | None = None,
max_retries: int | NotGiven = NOT_GIVEN,
default_headers: Mapping[str, str] | None = None,
set_default_headers: Mapping[str, str] | None = None,
default_query: Mapping[str, object] | None = None,
set_default_query: Mapping[str, object] | None = None,
_extra_kwargs: Mapping[str, Any] = {},
) -> Self:
"""
Create a new client instance re-using the same options given to the current client with optional overriding.
"""
if default_headers is not None and set_default_headers is not None:
raise ValueError("The `default_headers` and `set_default_headers` arguments are mutually exclusive")
if default_query is not None and set_default_query is not None:
raise ValueError("The `default_query` and `set_default_query` arguments are mutually exclusive")
headers = self._custom_headers
if default_headers is not None:
headers = {**headers, **default_headers}
elif set_default_headers is not None:
headers = set_default_headers
params = self._custom_query
if default_query is not None:
params = {**params, **default_query}
elif set_default_query is not None:
params = set_default_query
http_client = http_client or self._client
return self.__class__(
api_key=api_key or self.api_key,
organization=organization or self.organization,
base_url=base_url or str(self.base_url),
timeout=self.timeout if isinstance(timeout, NotGiven) else timeout,
http_client=http_client,
max_retries=max_retries if is_given(max_retries) else self.max_retries,
default_headers=headers,
default_query=params,
**_extra_kwargs,
)
# Alias for `copy` for nicer inline usage, e.g.
# client.with_options(timeout=10).foo.create(...)
with_options = copy
def __del__(self) -> None:
if not hasattr(self, "_has_custom_http_client") or not hasattr(self, "close"):
# this can happen if the '__init__' method raised an error
return
if self._has_custom_http_client:
return
self.close()
@override
def _make_status_error(
self,
err_msg: str,
*,
body: object,
response: httpx.Response,
) -> APIStatusError:
data = body.get("error", body) if is_mapping(body) else body
if response.status_code == 400:
return _exceptions.BadRequestError(err_msg, response=response, body=data)
if response.status_code == 401:
return _exceptions.AuthenticationError(err_msg, response=response, body=data)
if response.status_code == 403:
return _exceptions.PermissionDeniedError(err_msg, response=response, body=data)
if response.status_code == 404:
return _exceptions.NotFoundError(err_msg, response=response, body=data)
if response.status_code == 409:
return _exceptions.ConflictError(err_msg, response=response, body=data)
if response.status_code == 422:
return _exceptions.UnprocessableEntityError(err_msg, response=response, body=data)
if response.status_code == 429:
return _exceptions.RateLimitError(err_msg, response=response, body=data)
if response.status_code >= 500:
return _exceptions.InternalServerError(err_msg, response=response, body=data)
return APIStatusError(err_msg, response=response, body=data)
class AsyncOpenAI(AsyncAPIClient):
completions: resources.AsyncCompletions
chat: resources.AsyncChat
edits: resources.AsyncEdits
embeddings: resources.AsyncEmbeddings
files: resources.AsyncFiles
images: resources.AsyncImages
audio: resources.AsyncAudio
moderations: resources.AsyncModerations
models: resources.AsyncModels
fine_tuning: resources.AsyncFineTuning
fine_tunes: resources.AsyncFineTunes
beta: resources.AsyncBeta
with_raw_response: AsyncOpenAIWithRawResponse
# client options
api_key: str
organization: str | None
def __init__(
self,
*,
api_key: str | None = None,
organization: str | None = None,
base_url: str | httpx.URL | None = None,
timeout: Union[float, Timeout, None, NotGiven] = NOT_GIVEN,
max_retries: int = DEFAULT_MAX_RETRIES,
default_headers: Mapping[str, str] | None = None,
default_query: Mapping[str, object] | None = None,
# Configure a custom httpx client. See the [httpx documentation](https://www.python-httpx.org/api/#asyncclient) for more details.
http_client: httpx.AsyncClient | None = None,
# Enable or disable schema validation for data returned by the API.
# When enabled an error APIResponseValidationError is raised
# if the API responds with invalid data for the expected schema.
#
# This parameter may be removed or changed in the future.
# If you rely on this feature, please open a GitHub issue
# outlining your use-case to help us decide if it should be
# part of our public interface in the future.
_strict_response_validation: bool = False,
) -> None:
"""Construct a new async openai client instance.
This automatically infers the following arguments from their corresponding environment variables if they are not provided:
- `api_key` from `OPENAI_API_KEY`
- `organization` from `OPENAI_ORG_ID`
"""
if api_key is None:
api_key = os.environ.get("OPENAI_API_KEY")
if api_key is None:
raise OpenAIError(
"The api_key client option must be set either by passing api_key to the client or by setting the OPENAI_API_KEY environment variable"
)
self.api_key = api_key
if organization is None:
organization = os.environ.get("OPENAI_ORG_ID")
self.organization = organization
if base_url is None:
base_url = os.environ.get("OPENAI_BASE_URL")
if base_url is None:
base_url = f"https://api.openai.com/v1"
super().__init__(
version=__version__,
base_url=base_url,
max_retries=max_retries,
timeout=timeout,
http_client=http_client,
custom_headers=default_headers,
custom_query=default_query,
_strict_response_validation=_strict_response_validation,
)
self._default_stream_cls = AsyncStream
self.completions = resources.AsyncCompletions(self)
self.chat = resources.AsyncChat(self)
self.edits = resources.AsyncEdits(self)
self.embeddings = resources.AsyncEmbeddings(self)
self.files = resources.AsyncFiles(self)
self.images = resources.AsyncImages(self)
self.audio = resources.AsyncAudio(self)
self.moderations = resources.AsyncModerations(self)
self.models = resources.AsyncModels(self)
self.fine_tuning = resources.AsyncFineTuning(self)
self.fine_tunes = resources.AsyncFineTunes(self)
self.beta = resources.AsyncBeta(self)
self.with_raw_response = AsyncOpenAIWithRawResponse(self)
@property
@override
def qs(self) -> Querystring:
return Querystring(array_format="comma")
@property
@override
def auth_headers(self) -> dict[str, str]:
api_key = self.api_key
return {"Authorization": f"Bearer {api_key}"}
@property
@override
def default_headers(self) -> dict[str, str | Omit]:
return {
**super().default_headers,
"X-Stainless-Async": f"async:{get_async_library()}",
"OpenAI-Organization": self.organization if self.organization is not None else Omit(),
**self._custom_headers,
}
def copy(
self,
*,
api_key: str | None = None,
organization: str | None = None,
base_url: str | httpx.URL | None = None,
timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
http_client: httpx.AsyncClient | None = None,
max_retries: int | NotGiven = NOT_GIVEN,
default_headers: Mapping[str, str] | None = None,
set_default_headers: Mapping[str, str] | None = None,
default_query: Mapping[str, object] | None = None,
set_default_query: Mapping[str, object] | None = None,
_extra_kwargs: Mapping[str, Any] = {},
) -> Self:
"""
Create a new client instance re-using the same options given to the current client with optional overriding.
"""
if default_headers is not None and set_default_headers is not None:
raise ValueError("The `default_headers` and `set_default_headers` arguments are mutually exclusive")
if default_query is not None and set_default_query is not None:
raise ValueError("The `default_query` and `set_default_query` arguments are mutually exclusive")
headers = self._custom_headers
if default_headers is not None:
headers = {**headers, **default_headers}
elif set_default_headers is not None:
headers = set_default_headers
params = self._custom_query
if default_query is not None:
params = {**params, **default_query}
elif set_default_query is not None:
params = set_default_query
http_client = http_client or self._client
return self.__class__(
api_key=api_key or self.api_key,
organization=organization or self.organization,
base_url=base_url or str(self.base_url),
timeout=self.timeout if isinstance(timeout, NotGiven) else timeout,
http_client=http_client,
max_retries=max_retries if is_given(max_retries) else self.max_retries,
default_headers=headers,
default_query=params,
**_extra_kwargs,
)
# Alias for `copy` for nicer inline usage, e.g.
# client.with_options(timeout=10).foo.create(...)
with_options = copy
def __del__(self) -> None:
if not hasattr(self, "_has_custom_http_client") or not hasattr(self, "close"):
# this can happen if the '__init__' method raised an error
return
if self._has_custom_http_client:
return
try:
asyncio.get_running_loop().create_task(self.close())
except Exception:
pass
@override
def _make_status_error(
self,
err_msg: str,
*,
body: object,
response: httpx.Response,
) -> APIStatusError:
data = body.get("error", body) if is_mapping(body) else body
if response.status_code == 400:
return _exceptions.BadRequestError(err_msg, response=response, body=data)
if response.status_code == 401:
return _exceptions.AuthenticationError(err_msg, response=response, body=data)
if response.status_code == 403:
return _exceptions.PermissionDeniedError(err_msg, response=response, body=data)
if response.status_code == 404:
return _exceptions.NotFoundError(err_msg, response=response, body=data)
if response.status_code == 409:
return _exceptions.ConflictError(err_msg, response=response, body=data)
if response.status_code == 422:
return _exceptions.UnprocessableEntityError(err_msg, response=response, body=data)
if response.status_code == 429:
return _exceptions.RateLimitError(err_msg, response=response, body=data)
if response.status_code >= 500:
return _exceptions.InternalServerError(err_msg, response=response, body=data)
return APIStatusError(err_msg, response=response, body=data)
class OpenAIWithRawResponse:
def __init__(self, client: OpenAI) -> None:
self.completions = resources.CompletionsWithRawResponse(client.completions)
self.chat = resources.ChatWithRawResponse(client.chat)
self.edits = resources.EditsWithRawResponse(client.edits)
self.embeddings = resources.EmbeddingsWithRawResponse(client.embeddings)
self.files = resources.FilesWithRawResponse(client.files)
self.images = resources.ImagesWithRawResponse(client.images)
self.audio = resources.AudioWithRawResponse(client.audio)
self.moderations = resources.ModerationsWithRawResponse(client.moderations)
self.models = resources.ModelsWithRawResponse(client.models)
self.fine_tuning = resources.FineTuningWithRawResponse(client.fine_tuning)
self.fine_tunes = resources.FineTunesWithRawResponse(client.fine_tunes)
self.beta = resources.BetaWithRawResponse(client.beta)
class AsyncOpenAIWithRawResponse:
def __init__(self, client: AsyncOpenAI) -> None:
self.completions = resources.AsyncCompletionsWithRawResponse(client.completions)
self.chat = resources.AsyncChatWithRawResponse(client.chat)
self.edits = resources.AsyncEditsWithRawResponse(client.edits)
self.embeddings = resources.AsyncEmbeddingsWithRawResponse(client.embeddings)
self.files = resources.AsyncFilesWithRawResponse(client.files)
self.images = resources.AsyncImagesWithRawResponse(client.images)
self.audio = resources.AsyncAudioWithRawResponse(client.audio)
self.moderations = resources.AsyncModerationsWithRawResponse(client.moderations)
self.models = resources.AsyncModelsWithRawResponse(client.models)
self.fine_tuning = resources.AsyncFineTuningWithRawResponse(client.fine_tuning)
self.fine_tunes = resources.AsyncFineTunesWithRawResponse(client.fine_tunes)
self.beta = resources.AsyncBetaWithRawResponse(client.beta)
Client = OpenAI
AsyncClient = AsyncOpenAI