hikari.internal.net
General bits and pieces that are reused between components.
View Source
# -*- coding: utf-8 -*- # cython: language_level=3 # Copyright (c) 2020 Nekokatt # Copyright (c) 2021-present davfsa # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. """General bits and pieces that are reused between components.""" from __future__ import annotations __all__: typing.Sequence[str] = ("generate_error_response", "create_client_session") import http import typing import aiohttp from hikari import errors if typing.TYPE_CHECKING: from hikari.impl import config from hikari.internal import data_binding async def generate_error_response(response: aiohttp.ClientResponse) -> errors.HTTPError: """Given an erroneous HTTP response, return a corresponding exception.""" real_url = str(response.real_url) raw_body = await response.read() # Little hack to stop mypy from complaining when using `*args` args: typing.List[typing.Any] = [real_url, response.headers, raw_body] try: json_body = await response.json() args.append(json_body.get("message", "")) args.append(json_body.get("code", 0)) raw_error_array: typing.Optional[data_binding.JSONObject] = json_body.get("errors") except aiohttp.ContentTypeError: raw_error_array = None if response.status == http.HTTPStatus.BAD_REQUEST: return errors.BadRequestError(*args, errors=raw_error_array) if response.status == http.HTTPStatus.UNAUTHORIZED: return errors.UnauthorizedError(*args) if response.status == http.HTTPStatus.FORBIDDEN: return errors.ForbiddenError(*args) if response.status == http.HTTPStatus.NOT_FOUND: return errors.NotFoundError(*args) status = http.HTTPStatus(response.status) if 400 <= status < 500: return errors.ClientHTTPResponseError(real_url, status, response.headers, raw_body) elif 500 <= status < 600: return errors.InternalServerError(real_url, status, response.headers, raw_body) else: return errors.HTTPResponseError(real_url, status, response.headers, raw_body) def create_tcp_connector( http_settings: config.HTTPSettings, *, dns_cache: typing.Union[bool, int] = True, limit: int = 100, ) -> aiohttp.TCPConnector: """Create a TCP connector and return it. Parameters ---------- http_settings : config.HTTPSettings HTTP settings to use for the connector. Optional Parameters ------------------- dns_cache: typing.Union[None, bool, int] If `True`, DNS caching is used with a default TTL of 10 seconds. If `False`, DNS caching is disabled. If an `int` is given, then DNS caching is enabled with an explicit TTL set. If `None`, the cache will be enabled and never invalidate. limit : int Number of connections to allow in the pool at a maximum. Returns ------- aiohttp.TCPConnector TCP connector to use. """ return aiohttp.TCPConnector( enable_cleanup_closed=http_settings.enable_cleanup_closed, force_close=http_settings.force_close_transports, limit=limit, ssl=http_settings.ssl, ttl_dns_cache=dns_cache if not isinstance(dns_cache, bool) else 10, use_dns_cache=dns_cache is not False, ) def create_client_session( connector: aiohttp.BaseConnector, connector_owner: bool, http_settings: config.HTTPSettings, raise_for_status: bool, trust_env: bool, ws_response_cls: typing.Type[aiohttp.ClientWebSocketResponse] = aiohttp.ClientWebSocketResponse, ) -> aiohttp.ClientSession: """Generate a client session using the given settings. .. warning:: You must invoke this from within a running event loop. .. note:: If you pass an explicit connector, then the connection that is created will not own the connector. You will be expected to manually close it __after__ the returned client session is closed to prevent leaking resources. Parameters ---------- connector : aiohttp.BaseConnector The connector to use. connector_owner : bool If `True`, then the client session will close the connector on shutdown. Otherwise, you must do it manually. http_settings : hikari.impl.config.HTTPSettings HTTP settings to use. raise_for_status : bool `True` to default to throwing exceptions if a request fails, or `False` to default to not. trust_env : bool `True` to trust anything in environment variables and the `netrc` file, `False` to ignore it. ws_response_cls : typing.Type[aiohttp.ClientWebSocketResponse] The websocket response class to use. Defaults to `aiohttp.ClientWebSocketResponse`. Returns ------- aiohttp.ClientSession The client session to use. """ return aiohttp.ClientSession( connector=connector, connector_owner=connector_owner, raise_for_status=raise_for_status, timeout=aiohttp.ClientTimeout( connect=http_settings.timeouts.acquire_and_connect, sock_connect=http_settings.timeouts.request_socket_connect, sock_read=http_settings.timeouts.request_socket_read, total=http_settings.timeouts.total, ), trust_env=trust_env, version=aiohttp.HttpVersion11, ws_response_class=ws_response_cls, )
# def create_client_session(
connector: aiohttp.connector.BaseConnector,
connector_owner: bool,
http_settings: hikari.impl.config.HTTPSettings,
raise_for_status: bool,
trust_env: bool,
ws_response_cls: Type[aiohttp.client_ws.ClientWebSocketResponse] = <class 'aiohttp.client_ws.ClientWebSocketResponse'>
) -> aiohttp.client.ClientSession:
connector: aiohttp.connector.BaseConnector,
connector_owner: bool,
http_settings: hikari.impl.config.HTTPSettings,
raise_for_status: bool,
trust_env: bool,
ws_response_cls: Type[aiohttp.client_ws.ClientWebSocketResponse] = <class 'aiohttp.client_ws.ClientWebSocketResponse'>
) -> aiohttp.client.ClientSession:
View Source
def create_client_session( connector: aiohttp.BaseConnector, connector_owner: bool, http_settings: config.HTTPSettings, raise_for_status: bool, trust_env: bool, ws_response_cls: typing.Type[aiohttp.ClientWebSocketResponse] = aiohttp.ClientWebSocketResponse, ) -> aiohttp.ClientSession: """Generate a client session using the given settings. .. warning:: You must invoke this from within a running event loop. .. note:: If you pass an explicit connector, then the connection that is created will not own the connector. You will be expected to manually close it __after__ the returned client session is closed to prevent leaking resources. Parameters ---------- connector : aiohttp.BaseConnector The connector to use. connector_owner : bool If `True`, then the client session will close the connector on shutdown. Otherwise, you must do it manually. http_settings : hikari.impl.config.HTTPSettings HTTP settings to use. raise_for_status : bool `True` to default to throwing exceptions if a request fails, or `False` to default to not. trust_env : bool `True` to trust anything in environment variables and the `netrc` file, `False` to ignore it. ws_response_cls : typing.Type[aiohttp.ClientWebSocketResponse] The websocket response class to use. Defaults to `aiohttp.ClientWebSocketResponse`. Returns ------- aiohttp.ClientSession The client session to use. """ return aiohttp.ClientSession( connector=connector, connector_owner=connector_owner, raise_for_status=raise_for_status, timeout=aiohttp.ClientTimeout( connect=http_settings.timeouts.acquire_and_connect, sock_connect=http_settings.timeouts.request_socket_connect, sock_read=http_settings.timeouts.request_socket_read, total=http_settings.timeouts.total, ), trust_env=trust_env, version=aiohttp.HttpVersion11, ws_response_class=ws_response_cls, )
Generate a client session using the given settings.
Warning: You must invoke this from within a running event loop.
Note: If you pass an explicit connector, then the connection that is created will not own the connector. You will be expected to manually close it __after__ the returned client session is closed to prevent leaking resources.
Parameters
- connector (aiohttp.BaseConnector): The connector to use.
- connector_owner (bool): If
True
, then the client session will close the connector on shutdown. Otherwise, you must do it manually. - http_settings (hikari.impl.config.HTTPSettings): HTTP settings to use.
- raise_for_status (bool):
True
to default to throwing exceptions if a request fails, orFalse
to default to not. - trust_env (bool):
True
to trust anything in environment variables and thenetrc
file,False
to ignore it. ws_response_cls (typing.Type[aiohttp.ClientWebSocketResponse]): The websocket response class to use.
Defaults to
aiohttp.ClientWebSocketResponse
.
Returns
- aiohttp.ClientSession: The client session to use.
# async def generate_error_response(
response: aiohttp.client_reqrep.ClientResponse
) -> hikari.errors.HTTPError:
response: aiohttp.client_reqrep.ClientResponse
) -> hikari.errors.HTTPError:
View Source
async def generate_error_response(response: aiohttp.ClientResponse) -> errors.HTTPError: """Given an erroneous HTTP response, return a corresponding exception.""" real_url = str(response.real_url) raw_body = await response.read() # Little hack to stop mypy from complaining when using `*args` args: typing.List[typing.Any] = [real_url, response.headers, raw_body] try: json_body = await response.json() args.append(json_body.get("message", "")) args.append(json_body.get("code", 0)) raw_error_array: typing.Optional[data_binding.JSONObject] = json_body.get("errors") except aiohttp.ContentTypeError: raw_error_array = None if response.status == http.HTTPStatus.BAD_REQUEST: return errors.BadRequestError(*args, errors=raw_error_array) if response.status == http.HTTPStatus.UNAUTHORIZED: return errors.UnauthorizedError(*args) if response.status == http.HTTPStatus.FORBIDDEN: return errors.ForbiddenError(*args) if response.status == http.HTTPStatus.NOT_FOUND: return errors.NotFoundError(*args) status = http.HTTPStatus(response.status) if 400 <= status < 500: return errors.ClientHTTPResponseError(real_url, status, response.headers, raw_body) elif 500 <= status < 600: return errors.InternalServerError(real_url, status, response.headers, raw_body) else: return errors.HTTPResponseError(real_url, status, response.headers, raw_body)
Given an erroneous HTTP response, return a corresponding exception.