hikari.impl.interaction_server
Standard implementation of a REST based interactions server.
View Source
# -*- coding: utf-8 -*- # cython: language_level=3 # Copyright (c) 2020 Nekokatt # Copyright (c) 2021-present davfsa # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. """Standard implementation of a REST based interactions server.""" from __future__ import annotations __all__: typing.Sequence[str] = ("InteractionServer",) import asyncio import logging import threading import typing import aiohttp import aiohttp.web import aiohttp.web_runner from hikari import applications from hikari import errors from hikari.api import interaction_server from hikari.api import special_endpoints from hikari.interactions import base_interactions from hikari.internal import data_binding if typing.TYPE_CHECKING: import concurrent.futures import socket as socket_ import ssl import aiohttp.abc import aiohttp.typedefs # This is kept inline as pynacl is an optional dependency. from nacl import signing from hikari import files as files_ from hikari.api import entity_factory as entity_factory_api from hikari.api import rest as rest_api from hikari.interactions import command_interactions from hikari.interactions import component_interactions _InteractionT_co = typing.TypeVar("_InteractionT_co", bound=base_interactions.PartialInteraction, covariant=True) _MessageResponseBuilderT = typing.Union[ special_endpoints.InteractionDeferredBuilder, special_endpoints.InteractionMessageBuilder, ] _LOGGER: typing.Final[logging.Logger] = logging.getLogger("hikari.interaction_server") # Internal interaction and interaction response types. _PING_INTERACTION_TYPE: typing.Final[int] = 1 _PONG_RESPONSE_TYPE: typing.Final[int] = 1 # HTTP status codes. _OK_STATUS: typing.Final[int] = 200 _BAD_REQUEST_STATUS: typing.Final[int] = 400 _PAYLOAD_TOO_LARGE_STATUS: typing.Final[int] = 413 _UNSUPPORTED_MEDIA_TYPE_STATUS: typing.Final[int] = 415 _INTERNAL_SERVER_ERROR_STATUS: typing.Final[int] = 500 _NOT_IMPLEMENTED: typing.Final[int] = 501 _UTF_8_CHARSET: typing.Final[str] = "UTF-8" # Header keys and values _X_SIGNATURE_ED25519_HEADER: typing.Final[str] = "X-Signature-Ed25519" _X_SIGNATURE_TIMESTAMP_HEADER: typing.Final[str] = "X-Signature-Timestamp" _CONTENT_TYPE_KEY: typing.Final[str] = "Content-Type" _USER_AGENT_KEY: typing.Final[str] = "User-Agent" _APPLICATION_OCTET_STREAM: typing.Final[str] = "application/octet-stream" _JSON_CONTENT_TYPE: typing.Final[str] = "application/json" _JSON_TYPE_WITH_CHARSET: typing.Final[str] = f"{_JSON_CONTENT_TYPE}; charset={_UTF_8_CHARSET}" _TEXT_CONTENT_TYPE: typing.Final[str] = "text/plain" _TEXT_TYPE_WITH_CHARSET: typing.Final[str] = f"{_TEXT_CONTENT_TYPE}; charset={_UTF_8_CHARSET}" class _Response: __slots__: typing.Sequence[str] = ("_content_type", "_files", "_payload", "_status_code") def __init__( self, status_code: int, payload: typing.Optional[bytes] = None, *, content_type: typing.Optional[str] = None, files: typing.Sequence[files_.Resource[files_.AsyncReader]] = (), ) -> None: if payload and not content_type: content_type = _TEXT_TYPE_WITH_CHARSET self._content_type = content_type self._files = files self._payload = payload self._status_code = status_code @property def content_type(self) -> typing.Optional[str]: return self._content_type @property def files(self) -> typing.Sequence[files_.Resource[files_.AsyncReader]]: return self._files @property def headers(self) -> typing.Optional[typing.MutableMapping[str, str]]: return None @property def payload(self) -> typing.Optional[bytes]: return self._payload @property def status_code(self) -> int: return self._status_code # Constant response _PONG_RESPONSE: typing.Final[_Response] = _Response( _OK_STATUS, data_binding.dump_json({"type": _PONG_RESPONSE_TYPE}).encode(), content_type=_JSON_TYPE_WITH_CHARSET ) class _FilePayload(aiohttp.Payload): _value: files_.Resource[files_.AsyncReader] def __init__( self, value: files_.Resource[files_.AsyncReader], content_type: str, /, *, executor: typing.Optional[concurrent.futures.Executor] = None, headers: typing.Optional[typing.Dict[str, str]] = None, ) -> None: super().__init__(value=value, headers=headers, content_type=content_type) self._executor = executor async def write(self, writer: aiohttp.abc.AbstractStreamWriter) -> None: async with self._value.stream(executor=self._executor) as data: async for chunk in data: await writer.write(chunk) class InteractionServer(interaction_server.InteractionServer): """Standard implementation of `hikari.api.interaction_server.InteractionServer`. Parameters ---------- entity_factory : hikari.api.entity_factory.EntityFactory The entity factory instance this server should use. Other Parameters ---------------- dumps : aiohttp.typedefs.JSONEncoder The JSON encoder this server should use. Defaults to `json.dumps`. loads : aiohttp.typedefs.JSONDecoder The JSON decoder this server should use. Defaults to `json.loads`. public_key : bytes The public key this server should use for verifying request payloads from Discord. If left as `None` then the client will try to work this out using `rest_client`. rest_client : hikari.api.rest.RESTClient The client this should use for making REST requests. """ __slots__: typing.Sequence[str] = ( "_application_fetch_lock", "_close_event", "_dumps", "_entity_factory", "_executor", "_is_closing", "_listeners", "_loads", "_nacl", "_public_key", "_rest_client", "_server", ) def __init__( self, *, dumps: aiohttp.typedefs.JSONEncoder = data_binding.dump_json, entity_factory: entity_factory_api.EntityFactory, executor: typing.Optional[concurrent.futures.Executor] = None, loads: aiohttp.typedefs.JSONDecoder = data_binding.load_json, rest_client: rest_api.RESTClient, public_key: typing.Optional[bytes] = None, ) -> None: # This is kept inline as pynacl is an optional dependency. try: import nacl.exceptions import nacl.signing except ModuleNotFoundError as exc: raise RuntimeError( "You must install the optional `hikari[server]` dependencies to use the default interaction server." ) from exc # Building asyncio.Lock when there isn't a running loop may lead to runtime errors. self._application_fetch_lock: typing.Optional[asyncio.Lock] = None # Building asyncio.Event when there isn't a running loop may lead to runtime errors. self._close_event: typing.Optional[asyncio.Event] = None self._dumps = dumps self._entity_factory = entity_factory self._executor = executor self._is_closing = False self._listeners: typing.Dict[typing.Type[base_interactions.PartialInteraction], typing.Any] = {} self._loads = loads self._nacl = nacl self._rest_client = rest_client self._server: typing.Optional[aiohttp.web_runner.AppRunner] = None self._public_key = nacl.signing.VerifyKey(public_key) if public_key is not None else None @property def is_alive(self) -> bool: """Whether this interaction server is active.""" return self._server is not None async def _fetch_public_key(self) -> signing.VerifyKey: if self._application_fetch_lock is None: self._application_fetch_lock = asyncio.Lock() application: typing.Union[applications.Application, applications.AuthorizationApplication] async with self._application_fetch_lock: if self._public_key: return self._public_key if self._rest_client.token_type == applications.TokenType.BOT: application = await self._rest_client.fetch_application() else: application = (await self._rest_client.fetch_authorization()).application self._public_key = self._nacl.signing.VerifyKey(application.public_key) return self._public_key async def aiohttp_hook(self, request: aiohttp.web.Request) -> aiohttp.web.Response: """Handle an AIOHTTP interaction request. This method handles aiohttp specific detail before calling `InteractionServer.on_interaction` with the data extracted from the request if it can and handles building an aiohttp response. Parameters ---------- request : aiohttp.web.Request The received request. Returns ------- aiohttp.web.Response The aiohttp response. """ if request.content_type.lower() != _JSON_CONTENT_TYPE: _LOGGER.debug("Payload with invalid media type %r received", request.content_type) return aiohttp.web.Response( status=_UNSUPPORTED_MEDIA_TYPE_STATUS, body=b"Unsupported Media Type", content_type=_TEXT_CONTENT_TYPE, charset=_UTF_8_CHARSET, ) try: signature_header = bytes.fromhex(request.headers[_X_SIGNATURE_ED25519_HEADER]) timestamp_header = request.headers[_X_SIGNATURE_TIMESTAMP_HEADER].encode() except (KeyError, ValueError): user_agent = request.headers.get(_USER_AGENT_KEY, "NONE") _LOGGER.debug("Received a request with a missing or invalid signature header (UA %r)", user_agent) return aiohttp.web.Response( status=_BAD_REQUEST_STATUS, body=b"Missing or invalid required request signature header(s)", content_type=_TEXT_CONTENT_TYPE, charset=_UTF_8_CHARSET, ) try: body = await request.read() except aiohttp.web.HTTPRequestEntityTooLarge: _LOGGER.debug("Received a request with a payload that's too large to process") return aiohttp.web.Response( status=_PAYLOAD_TOO_LARGE_STATUS, body=b"Payload too large", content_type=_TEXT_CONTENT_TYPE, charset=_UTF_8_CHARSET, ) if not body: user_agent = request.headers.get(_USER_AGENT_KEY, "NONE") _LOGGER.debug("Received a body-less request (UA %r)", user_agent) return aiohttp.web.Response( status=_BAD_REQUEST_STATUS, body=b"POST request must have a body", content_type=_TEXT_CONTENT_TYPE, charset=_UTF_8_CHARSET, ) response = await self.on_interaction(body=body, signature=signature_header, timestamp=timestamp_header) if response.files: multipart = aiohttp.MultipartWriter(subtype="form-data") if response.payload: body_payload = aiohttp.BytesPayload(response.payload, content_type=response.content_type) body_payload.set_content_disposition("form-data", name="payload_json") multipart.append_payload(body_payload) for index, file_ in enumerate(response.files): async with file_.stream(head_only=True) as stream: mimetype = stream.mimetype or _APPLICATION_OCTET_STREAM payload = _FilePayload(file_, mimetype, executor=self._executor) payload.set_content_disposition("form-data", name=f"files[{index}]", filename=file_.filename) multipart.append_payload(payload) return aiohttp.web.Response(status=response.status_code, headers=response.headers, body=multipart) headers = response.headers if response.content_type: headers = headers or {} headers[_CONTENT_TYPE_KEY] = response.content_type return aiohttp.web.Response(status=response.status_code, headers=headers, body=response.payload) async def close(self) -> None: """Gracefully close the server and any open connections.""" if not self._server or not self._close_event: raise errors.ComponentStateConflictError("Cannot close an inactive interaction server") if self._is_closing: await self.join() return self._is_closing = True self._application_fetch_lock = None # This shutdown then cleanup ordering matters. await self._server.shutdown() await self._server.cleanup() self._close_event.set() self._close_event = None self._server = None async def join(self) -> None: """Wait for the process to halt before continuing.""" if not self._close_event: raise errors.ComponentStateConflictError("Cannot wait for an inactive interaction server to join") await self._close_event.wait() async def on_interaction(self, body: bytes, signature: bytes, timestamp: bytes) -> interaction_server.Response: """Handle an interaction received from Discord as a REST server. .. note:: If this server instance is alive then this will be called internally by the server but if the instance isn't alive then this may still be called externally to trigger interaction dispatch. Parameters ---------- body : bytes The interaction payload. signature : bytes Value of the `"X-Signature-Ed25519"` header used to verify the body. timestamp : bytes Value of the `"X-Signature-Timestamp"` header used to verify the body. Returns ------- hikari.api.interaction_server.Response Instructions on how the REST server calling this should respond to the interaction request. """ public_key = self._public_key or await self._fetch_public_key() try: public_key.verify(timestamp + body, signature) except (self._nacl.exceptions.BadSignatureError, ValueError): _LOGGER.error("Received a request with an invalid signature") return _Response(_BAD_REQUEST_STATUS, b"Invalid request signature") try: payload = self._loads(body.decode("utf-8")) interaction_type = int(payload["type"]) except (data_binding.JSONDecodeError, ValueError, TypeError) as exc: _LOGGER.error("Received a request with an invalid JSON body", exc_info=exc) return _Response(_BAD_REQUEST_STATUS, b"Invalid JSON body") except KeyError as exc: _LOGGER.error("Missing 'type' field in received JSON payload", exc_info=exc) return _Response(_BAD_REQUEST_STATUS, b"Missing required 'type' field in payload") if interaction_type == _PING_INTERACTION_TYPE: _LOGGER.debug("Responding to ping interaction") return _PONG_RESPONSE try: interaction = self._entity_factory.deserialize_interaction(payload) except errors.UnrecognisedEntityError: _LOGGER.debug("Ignoring unknown interaction type %s", interaction_type) return _Response(_NOT_IMPLEMENTED, b"Interaction type not implemented") except Exception as exc: asyncio.get_running_loop().call_exception_handler( {"message": "Exception occurred during interaction deserialization", "exception": exc} ) return _Response(_INTERNAL_SERVER_ERROR_STATUS, b"Exception occurred during interaction deserialization") if listener := self._listeners.get(type(interaction)): _LOGGER.debug("Dispatching interaction %s", interaction.id) try: result = await listener(interaction) raw_payload, files = result.build(self._entity_factory) payload = self._dumps(raw_payload) except Exception as exc: asyncio.get_running_loop().call_exception_handler( {"message": "Exception occurred during interaction dispatch", "exception": exc} ) return _Response(_INTERNAL_SERVER_ERROR_STATUS, b"Exception occurred during interaction dispatch") return _Response(_OK_STATUS, payload.encode(), files=files, content_type=_JSON_TYPE_WITH_CHARSET) _LOGGER.debug( "Ignoring interaction %s of type %s without registered listener", interaction.id, interaction.type ) return _Response(_NOT_IMPLEMENTED, b"Handler not set for this interaction type") async def start( self, backlog: int = 128, enable_signal_handlers: typing.Optional[bool] = None, host: typing.Optional[typing.Union[str, typing.Sequence[str]]] = None, port: typing.Optional[int] = None, path: typing.Optional[str] = None, reuse_address: typing.Optional[bool] = None, reuse_port: typing.Optional[bool] = None, socket: typing.Optional[socket_.socket] = None, shutdown_timeout: float = 60.0, ssl_context: typing.Optional[ssl.SSLContext] = None, ) -> None: """Start the bot and wait for the internal server to startup then return. .. note:: For more information on the other parameters such as defaults see AIOHTTP's documentation. Other Parameters ---------------- backlog : int The number of unaccepted connections that the system will allow before refusing new connections. enable_signal_handlers : typing.Optional[bool] Defaults to `True` if this is started in the main thread. If on a __non-Windows__ OS with builtin support for kernel-level POSIX signals, then setting this to `True` will allow treating keyboard interrupts and other OS signals to safely shut down the application as calls to shut down the application properly rather than just killing the process in a dirty state immediately. You should leave this enabled unless you plan to implement your own signal handling yourself. host : typing.Optional[typing.Union[str, aiohttp.web.HostSequence]] TCP/IP host or a sequence of hosts for the HTTP server. port : typing.Optional[int] TCP/IP port for the HTTP server. path : typing.Optional[str] File system path for HTTP server unix domain socket. reuse_address : typing.Optional[bool] Tells the kernel to reuse a local socket in TIME_WAIT state, without waiting for its natural timeout to expire. reuse_port : typing.Optional[bool] Tells the kernel to allow this endpoint to be bound to the same port as other existing endpoints are also bound to. socket : typing.Optional[socket.socket] A pre-existing socket object to accept connections on. shutdown_timeout : float A delay to wait for graceful server shutdown before forcefully disconnecting all open client sockets. This defaults to 60 seconds. ssl_context : typing.Optional[ssl.SSLContext] SSL context for HTTPS servers. """ if self._server: raise errors.ComponentStateConflictError("Cannot start an already active interaction server") if enable_signal_handlers is None: enable_signal_handlers = threading.current_thread() is threading.main_thread() self._close_event = asyncio.Event() self._is_closing = False aio_app = aiohttp.web.Application() aio_app.add_routes([aiohttp.web.post("/", self.aiohttp_hook)]) self._server = aiohttp.web_runner.AppRunner(aio_app, handle_signals=enable_signal_handlers, access_log=_LOGGER) await self._server.setup() sites: typing.List[aiohttp.web.BaseSite] = [] if host is not None: if isinstance(host, str): host = [host] for h in host: sites.append( aiohttp.web.TCPSite( self._server, h, port, shutdown_timeout=shutdown_timeout, ssl_context=ssl_context, backlog=backlog, reuse_address=reuse_address, reuse_port=reuse_port, ) ) elif path is None and socket is None or port is None: sites.append( aiohttp.web.TCPSite( self._server, port=port, shutdown_timeout=shutdown_timeout, ssl_context=ssl_context, backlog=backlog, reuse_address=reuse_address, reuse_port=reuse_port, ) ) if path is not None: sites.append( aiohttp.web.UnixSite( self._server, path, shutdown_timeout=shutdown_timeout, ssl_context=ssl_context, backlog=backlog ) ) if socket is not None: sites.append( aiohttp.web.SockSite( self._server, socket, shutdown_timeout=shutdown_timeout, ssl_context=ssl_context, backlog=backlog ) ) for site in sites: _LOGGER.info("Starting site on %s", site.name) await site.start() @typing.overload def get_listener( self, interaction_type: typing.Type[command_interactions.CommandInteraction], / ) -> typing.Optional[ interaction_server.ListenerT[command_interactions.CommandInteraction, _MessageResponseBuilderT] ]: ... @typing.overload def get_listener( self, interaction_type: typing.Type[component_interactions.ComponentInteraction], / ) -> typing.Optional[ interaction_server.ListenerT[component_interactions.ComponentInteraction, _MessageResponseBuilderT] ]: ... @typing.overload def get_listener( self, interaction_type: typing.Type[command_interactions.AutocompleteInteraction], / ) -> typing.Optional[ interaction_server.ListenerT[ command_interactions.AutocompleteInteraction, special_endpoints.InteractionAutocompleteBuilder ] ]: ... @typing.overload def get_listener( self, interaction_type: typing.Type[_InteractionT_co], / ) -> typing.Optional[interaction_server.ListenerT[_InteractionT_co, special_endpoints.InteractionResponseBuilder]]: ... def get_listener( self, interaction_type: typing.Type[_InteractionT_co], / ) -> typing.Optional[interaction_server.ListenerT[_InteractionT_co, special_endpoints.InteractionResponseBuilder]]: return self._listeners.get(interaction_type) @typing.overload def set_listener( self, interaction_type: typing.Type[command_interactions.CommandInteraction], listener: typing.Optional[ interaction_server.ListenerT[command_interactions.CommandInteraction, _MessageResponseBuilderT] ], /, *, replace: bool = False, ) -> None: ... @typing.overload def set_listener( self, interaction_type: typing.Type[component_interactions.ComponentInteraction], listener: typing.Optional[ interaction_server.ListenerT[component_interactions.ComponentInteraction, _MessageResponseBuilderT] ], /, *, replace: bool = False, ) -> None: ... @typing.overload def set_listener( self, interaction_type: typing.Type[command_interactions.AutocompleteInteraction], listener: typing.Optional[ interaction_server.ListenerT[ command_interactions.AutocompleteInteraction, special_endpoints.InteractionAutocompleteBuilder ] ], /, *, replace: bool = False, ) -> None: ... def set_listener( self, interaction_type: typing.Type[_InteractionT_co], listener: typing.Optional[ interaction_server.ListenerT[_InteractionT_co, special_endpoints.InteractionResponseBuilder] ], /, *, replace: bool = False, ) -> None: if listener: if not replace and interaction_type in self._listeners: raise TypeError(f"Listener already set for {interaction_type.__name__}") self._listeners[interaction_type] = listener else: self._listeners.pop(interaction_type, None)
View Source
class InteractionServer(interaction_server.InteractionServer): """Standard implementation of `hikari.api.interaction_server.InteractionServer`. Parameters ---------- entity_factory : hikari.api.entity_factory.EntityFactory The entity factory instance this server should use. Other Parameters ---------------- dumps : aiohttp.typedefs.JSONEncoder The JSON encoder this server should use. Defaults to `json.dumps`. loads : aiohttp.typedefs.JSONDecoder The JSON decoder this server should use. Defaults to `json.loads`. public_key : bytes The public key this server should use for verifying request payloads from Discord. If left as `None` then the client will try to work this out using `rest_client`. rest_client : hikari.api.rest.RESTClient The client this should use for making REST requests. """ __slots__: typing.Sequence[str] = ( "_application_fetch_lock", "_close_event", "_dumps", "_entity_factory", "_executor", "_is_closing", "_listeners", "_loads", "_nacl", "_public_key", "_rest_client", "_server", ) def __init__( self, *, dumps: aiohttp.typedefs.JSONEncoder = data_binding.dump_json, entity_factory: entity_factory_api.EntityFactory, executor: typing.Optional[concurrent.futures.Executor] = None, loads: aiohttp.typedefs.JSONDecoder = data_binding.load_json, rest_client: rest_api.RESTClient, public_key: typing.Optional[bytes] = None, ) -> None: # This is kept inline as pynacl is an optional dependency. try: import nacl.exceptions import nacl.signing except ModuleNotFoundError as exc: raise RuntimeError( "You must install the optional `hikari[server]` dependencies to use the default interaction server." ) from exc # Building asyncio.Lock when there isn't a running loop may lead to runtime errors. self._application_fetch_lock: typing.Optional[asyncio.Lock] = None # Building asyncio.Event when there isn't a running loop may lead to runtime errors. self._close_event: typing.Optional[asyncio.Event] = None self._dumps = dumps self._entity_factory = entity_factory self._executor = executor self._is_closing = False self._listeners: typing.Dict[typing.Type[base_interactions.PartialInteraction], typing.Any] = {} self._loads = loads self._nacl = nacl self._rest_client = rest_client self._server: typing.Optional[aiohttp.web_runner.AppRunner] = None self._public_key = nacl.signing.VerifyKey(public_key) if public_key is not None else None @property def is_alive(self) -> bool: """Whether this interaction server is active.""" return self._server is not None async def _fetch_public_key(self) -> signing.VerifyKey: if self._application_fetch_lock is None: self._application_fetch_lock = asyncio.Lock() application: typing.Union[applications.Application, applications.AuthorizationApplication] async with self._application_fetch_lock: if self._public_key: return self._public_key if self._rest_client.token_type == applications.TokenType.BOT: application = await self._rest_client.fetch_application() else: application = (await self._rest_client.fetch_authorization()).application self._public_key = self._nacl.signing.VerifyKey(application.public_key) return self._public_key async def aiohttp_hook(self, request: aiohttp.web.Request) -> aiohttp.web.Response: """Handle an AIOHTTP interaction request. This method handles aiohttp specific detail before calling `InteractionServer.on_interaction` with the data extracted from the request if it can and handles building an aiohttp response. Parameters ---------- request : aiohttp.web.Request The received request. Returns ------- aiohttp.web.Response The aiohttp response. """ if request.content_type.lower() != _JSON_CONTENT_TYPE: _LOGGER.debug("Payload with invalid media type %r received", request.content_type) return aiohttp.web.Response( status=_UNSUPPORTED_MEDIA_TYPE_STATUS, body=b"Unsupported Media Type", content_type=_TEXT_CONTENT_TYPE, charset=_UTF_8_CHARSET, ) try: signature_header = bytes.fromhex(request.headers[_X_SIGNATURE_ED25519_HEADER]) timestamp_header = request.headers[_X_SIGNATURE_TIMESTAMP_HEADER].encode() except (KeyError, ValueError): user_agent = request.headers.get(_USER_AGENT_KEY, "NONE") _LOGGER.debug("Received a request with a missing or invalid signature header (UA %r)", user_agent) return aiohttp.web.Response( status=_BAD_REQUEST_STATUS, body=b"Missing or invalid required request signature header(s)", content_type=_TEXT_CONTENT_TYPE, charset=_UTF_8_CHARSET, ) try: body = await request.read() except aiohttp.web.HTTPRequestEntityTooLarge: _LOGGER.debug("Received a request with a payload that's too large to process") return aiohttp.web.Response( status=_PAYLOAD_TOO_LARGE_STATUS, body=b"Payload too large", content_type=_TEXT_CONTENT_TYPE, charset=_UTF_8_CHARSET, ) if not body: user_agent = request.headers.get(_USER_AGENT_KEY, "NONE") _LOGGER.debug("Received a body-less request (UA %r)", user_agent) return aiohttp.web.Response( status=_BAD_REQUEST_STATUS, body=b"POST request must have a body", content_type=_TEXT_CONTENT_TYPE, charset=_UTF_8_CHARSET, ) response = await self.on_interaction(body=body, signature=signature_header, timestamp=timestamp_header) if response.files: multipart = aiohttp.MultipartWriter(subtype="form-data") if response.payload: body_payload = aiohttp.BytesPayload(response.payload, content_type=response.content_type) body_payload.set_content_disposition("form-data", name="payload_json") multipart.append_payload(body_payload) for index, file_ in enumerate(response.files): async with file_.stream(head_only=True) as stream: mimetype = stream.mimetype or _APPLICATION_OCTET_STREAM payload = _FilePayload(file_, mimetype, executor=self._executor) payload.set_content_disposition("form-data", name=f"files[{index}]", filename=file_.filename) multipart.append_payload(payload) return aiohttp.web.Response(status=response.status_code, headers=response.headers, body=multipart) headers = response.headers if response.content_type: headers = headers or {} headers[_CONTENT_TYPE_KEY] = response.content_type return aiohttp.web.Response(status=response.status_code, headers=headers, body=response.payload) async def close(self) -> None: """Gracefully close the server and any open connections.""" if not self._server or not self._close_event: raise errors.ComponentStateConflictError("Cannot close an inactive interaction server") if self._is_closing: await self.join() return self._is_closing = True self._application_fetch_lock = None # This shutdown then cleanup ordering matters. await self._server.shutdown() await self._server.cleanup() self._close_event.set() self._close_event = None self._server = None async def join(self) -> None: """Wait for the process to halt before continuing.""" if not self._close_event: raise errors.ComponentStateConflictError("Cannot wait for an inactive interaction server to join") await self._close_event.wait() async def on_interaction(self, body: bytes, signature: bytes, timestamp: bytes) -> interaction_server.Response: """Handle an interaction received from Discord as a REST server. .. note:: If this server instance is alive then this will be called internally by the server but if the instance isn't alive then this may still be called externally to trigger interaction dispatch. Parameters ---------- body : bytes The interaction payload. signature : bytes Value of the `"X-Signature-Ed25519"` header used to verify the body. timestamp : bytes Value of the `"X-Signature-Timestamp"` header used to verify the body. Returns ------- hikari.api.interaction_server.Response Instructions on how the REST server calling this should respond to the interaction request. """ public_key = self._public_key or await self._fetch_public_key() try: public_key.verify(timestamp + body, signature) except (self._nacl.exceptions.BadSignatureError, ValueError): _LOGGER.error("Received a request with an invalid signature") return _Response(_BAD_REQUEST_STATUS, b"Invalid request signature") try: payload = self._loads(body.decode("utf-8")) interaction_type = int(payload["type"]) except (data_binding.JSONDecodeError, ValueError, TypeError) as exc: _LOGGER.error("Received a request with an invalid JSON body", exc_info=exc) return _Response(_BAD_REQUEST_STATUS, b"Invalid JSON body") except KeyError as exc: _LOGGER.error("Missing 'type' field in received JSON payload", exc_info=exc) return _Response(_BAD_REQUEST_STATUS, b"Missing required 'type' field in payload") if interaction_type == _PING_INTERACTION_TYPE: _LOGGER.debug("Responding to ping interaction") return _PONG_RESPONSE try: interaction = self._entity_factory.deserialize_interaction(payload) except errors.UnrecognisedEntityError: _LOGGER.debug("Ignoring unknown interaction type %s", interaction_type) return _Response(_NOT_IMPLEMENTED, b"Interaction type not implemented") except Exception as exc: asyncio.get_running_loop().call_exception_handler( {"message": "Exception occurred during interaction deserialization", "exception": exc} ) return _Response(_INTERNAL_SERVER_ERROR_STATUS, b"Exception occurred during interaction deserialization") if listener := self._listeners.get(type(interaction)): _LOGGER.debug("Dispatching interaction %s", interaction.id) try: result = await listener(interaction) raw_payload, files = result.build(self._entity_factory) payload = self._dumps(raw_payload) except Exception as exc: asyncio.get_running_loop().call_exception_handler( {"message": "Exception occurred during interaction dispatch", "exception": exc} ) return _Response(_INTERNAL_SERVER_ERROR_STATUS, b"Exception occurred during interaction dispatch") return _Response(_OK_STATUS, payload.encode(), files=files, content_type=_JSON_TYPE_WITH_CHARSET) _LOGGER.debug( "Ignoring interaction %s of type %s without registered listener", interaction.id, interaction.type ) return _Response(_NOT_IMPLEMENTED, b"Handler not set for this interaction type") async def start( self, backlog: int = 128, enable_signal_handlers: typing.Optional[bool] = None, host: typing.Optional[typing.Union[str, typing.Sequence[str]]] = None, port: typing.Optional[int] = None, path: typing.Optional[str] = None, reuse_address: typing.Optional[bool] = None, reuse_port: typing.Optional[bool] = None, socket: typing.Optional[socket_.socket] = None, shutdown_timeout: float = 60.0, ssl_context: typing.Optional[ssl.SSLContext] = None, ) -> None: """Start the bot and wait for the internal server to startup then return. .. note:: For more information on the other parameters such as defaults see AIOHTTP's documentation. Other Parameters ---------------- backlog : int The number of unaccepted connections that the system will allow before refusing new connections. enable_signal_handlers : typing.Optional[bool] Defaults to `True` if this is started in the main thread. If on a __non-Windows__ OS with builtin support for kernel-level POSIX signals, then setting this to `True` will allow treating keyboard interrupts and other OS signals to safely shut down the application as calls to shut down the application properly rather than just killing the process in a dirty state immediately. You should leave this enabled unless you plan to implement your own signal handling yourself. host : typing.Optional[typing.Union[str, aiohttp.web.HostSequence]] TCP/IP host or a sequence of hosts for the HTTP server. port : typing.Optional[int] TCP/IP port for the HTTP server. path : typing.Optional[str] File system path for HTTP server unix domain socket. reuse_address : typing.Optional[bool] Tells the kernel to reuse a local socket in TIME_WAIT state, without waiting for its natural timeout to expire. reuse_port : typing.Optional[bool] Tells the kernel to allow this endpoint to be bound to the same port as other existing endpoints are also bound to. socket : typing.Optional[socket.socket] A pre-existing socket object to accept connections on. shutdown_timeout : float A delay to wait for graceful server shutdown before forcefully disconnecting all open client sockets. This defaults to 60 seconds. ssl_context : typing.Optional[ssl.SSLContext] SSL context for HTTPS servers. """ if self._server: raise errors.ComponentStateConflictError("Cannot start an already active interaction server") if enable_signal_handlers is None: enable_signal_handlers = threading.current_thread() is threading.main_thread() self._close_event = asyncio.Event() self._is_closing = False aio_app = aiohttp.web.Application() aio_app.add_routes([aiohttp.web.post("/", self.aiohttp_hook)]) self._server = aiohttp.web_runner.AppRunner(aio_app, handle_signals=enable_signal_handlers, access_log=_LOGGER) await self._server.setup() sites: typing.List[aiohttp.web.BaseSite] = [] if host is not None: if isinstance(host, str): host = [host] for h in host: sites.append( aiohttp.web.TCPSite( self._server, h, port, shutdown_timeout=shutdown_timeout, ssl_context=ssl_context, backlog=backlog, reuse_address=reuse_address, reuse_port=reuse_port, ) ) elif path is None and socket is None or port is None: sites.append( aiohttp.web.TCPSite( self._server, port=port, shutdown_timeout=shutdown_timeout, ssl_context=ssl_context, backlog=backlog, reuse_address=reuse_address, reuse_port=reuse_port, ) ) if path is not None: sites.append( aiohttp.web.UnixSite( self._server, path, shutdown_timeout=shutdown_timeout, ssl_context=ssl_context, backlog=backlog ) ) if socket is not None: sites.append( aiohttp.web.SockSite( self._server, socket, shutdown_timeout=shutdown_timeout, ssl_context=ssl_context, backlog=backlog ) ) for site in sites: _LOGGER.info("Starting site on %s", site.name) await site.start() @typing.overload def get_listener( self, interaction_type: typing.Type[command_interactions.CommandInteraction], / ) -> typing.Optional[ interaction_server.ListenerT[command_interactions.CommandInteraction, _MessageResponseBuilderT] ]: ... @typing.overload def get_listener( self, interaction_type: typing.Type[component_interactions.ComponentInteraction], / ) -> typing.Optional[ interaction_server.ListenerT[component_interactions.ComponentInteraction, _MessageResponseBuilderT] ]: ... @typing.overload def get_listener( self, interaction_type: typing.Type[command_interactions.AutocompleteInteraction], / ) -> typing.Optional[ interaction_server.ListenerT[ command_interactions.AutocompleteInteraction, special_endpoints.InteractionAutocompleteBuilder ] ]: ... @typing.overload def get_listener( self, interaction_type: typing.Type[_InteractionT_co], / ) -> typing.Optional[interaction_server.ListenerT[_InteractionT_co, special_endpoints.InteractionResponseBuilder]]: ... def get_listener( self, interaction_type: typing.Type[_InteractionT_co], / ) -> typing.Optional[interaction_server.ListenerT[_InteractionT_co, special_endpoints.InteractionResponseBuilder]]: return self._listeners.get(interaction_type) @typing.overload def set_listener( self, interaction_type: typing.Type[command_interactions.CommandInteraction], listener: typing.Optional[ interaction_server.ListenerT[command_interactions.CommandInteraction, _MessageResponseBuilderT] ], /, *, replace: bool = False, ) -> None: ... @typing.overload def set_listener( self, interaction_type: typing.Type[component_interactions.ComponentInteraction], listener: typing.Optional[ interaction_server.ListenerT[component_interactions.ComponentInteraction, _MessageResponseBuilderT] ], /, *, replace: bool = False, ) -> None: ... @typing.overload def set_listener( self, interaction_type: typing.Type[command_interactions.AutocompleteInteraction], listener: typing.Optional[ interaction_server.ListenerT[ command_interactions.AutocompleteInteraction, special_endpoints.InteractionAutocompleteBuilder ] ], /, *, replace: bool = False, ) -> None: ... def set_listener( self, interaction_type: typing.Type[_InteractionT_co], listener: typing.Optional[ interaction_server.ListenerT[_InteractionT_co, special_endpoints.InteractionResponseBuilder] ], /, *, replace: bool = False, ) -> None: if listener: if not replace and interaction_type in self._listeners: raise TypeError(f"Listener already set for {interaction_type.__name__}") self._listeners[interaction_type] = listener else: self._listeners.pop(interaction_type, None)
Standard implementation of hikari.api.interaction_server.InteractionServer
.
Parameters
- entity_factory (hikari.api.entity_factory.EntityFactory): The entity factory instance this server should use.
Other Parameters
- dumps (aiohttp.typedefs.JSONEncoder): The JSON encoder this server should use. Defaults to
json.dumps
. - loads (aiohttp.typedefs.JSONDecoder): The JSON decoder this server should use. Defaults to
json.loads
. - public_key (bytes): The public key this server should use for verifying request payloads from Discord. If left as
None
then the client will try to work this out usingrest_client
. - rest_client (hikari.api.rest.RESTClient): The client this should use for making REST requests.
Variables and properties
Whether this interaction server is active.
Methods
self,
*,
dumps: Callable[[Any], str] = <function dumps>,
entity_factory: hikari.api.entity_factory.EntityFactory,
executor: Optional[concurrent.futures._base.Executor] = None,
loads: Callable[[str], Any] = <function loads>,
rest_client: hikari.api.rest.RESTClient,
public_key: Optional[bytes] = None
):
View Source
def __init__( self, *, dumps: aiohttp.typedefs.JSONEncoder = data_binding.dump_json, entity_factory: entity_factory_api.EntityFactory, executor: typing.Optional[concurrent.futures.Executor] = None, loads: aiohttp.typedefs.JSONDecoder = data_binding.load_json, rest_client: rest_api.RESTClient, public_key: typing.Optional[bytes] = None, ) -> None: # This is kept inline as pynacl is an optional dependency. try: import nacl.exceptions import nacl.signing except ModuleNotFoundError as exc: raise RuntimeError( "You must install the optional `hikari[server]` dependencies to use the default interaction server." ) from exc # Building asyncio.Lock when there isn't a running loop may lead to runtime errors. self._application_fetch_lock: typing.Optional[asyncio.Lock] = None # Building asyncio.Event when there isn't a running loop may lead to runtime errors. self._close_event: typing.Optional[asyncio.Event] = None self._dumps = dumps self._entity_factory = entity_factory self._executor = executor self._is_closing = False self._listeners: typing.Dict[typing.Type[base_interactions.PartialInteraction], typing.Any] = {} self._loads = loads self._nacl = nacl self._rest_client = rest_client self._server: typing.Optional[aiohttp.web_runner.AppRunner] = None self._public_key = nacl.signing.VerifyKey(public_key) if public_key is not None else None
self,
request: aiohttp.web_request.Request
) -> aiohttp.web_response.Response:
View Source
async def aiohttp_hook(self, request: aiohttp.web.Request) -> aiohttp.web.Response: """Handle an AIOHTTP interaction request. This method handles aiohttp specific detail before calling `InteractionServer.on_interaction` with the data extracted from the request if it can and handles building an aiohttp response. Parameters ---------- request : aiohttp.web.Request The received request. Returns ------- aiohttp.web.Response The aiohttp response. """ if request.content_type.lower() != _JSON_CONTENT_TYPE: _LOGGER.debug("Payload with invalid media type %r received", request.content_type) return aiohttp.web.Response( status=_UNSUPPORTED_MEDIA_TYPE_STATUS, body=b"Unsupported Media Type", content_type=_TEXT_CONTENT_TYPE, charset=_UTF_8_CHARSET, ) try: signature_header = bytes.fromhex(request.headers[_X_SIGNATURE_ED25519_HEADER]) timestamp_header = request.headers[_X_SIGNATURE_TIMESTAMP_HEADER].encode() except (KeyError, ValueError): user_agent = request.headers.get(_USER_AGENT_KEY, "NONE") _LOGGER.debug("Received a request with a missing or invalid signature header (UA %r)", user_agent) return aiohttp.web.Response( status=_BAD_REQUEST_STATUS, body=b"Missing or invalid required request signature header(s)", content_type=_TEXT_CONTENT_TYPE, charset=_UTF_8_CHARSET, ) try: body = await request.read() except aiohttp.web.HTTPRequestEntityTooLarge: _LOGGER.debug("Received a request with a payload that's too large to process") return aiohttp.web.Response( status=_PAYLOAD_TOO_LARGE_STATUS, body=b"Payload too large", content_type=_TEXT_CONTENT_TYPE, charset=_UTF_8_CHARSET, ) if not body: user_agent = request.headers.get(_USER_AGENT_KEY, "NONE") _LOGGER.debug("Received a body-less request (UA %r)", user_agent) return aiohttp.web.Response( status=_BAD_REQUEST_STATUS, body=b"POST request must have a body", content_type=_TEXT_CONTENT_TYPE, charset=_UTF_8_CHARSET, ) response = await self.on_interaction(body=body, signature=signature_header, timestamp=timestamp_header) if response.files: multipart = aiohttp.MultipartWriter(subtype="form-data") if response.payload: body_payload = aiohttp.BytesPayload(response.payload, content_type=response.content_type) body_payload.set_content_disposition("form-data", name="payload_json") multipart.append_payload(body_payload) for index, file_ in enumerate(response.files): async with file_.stream(head_only=True) as stream: mimetype = stream.mimetype or _APPLICATION_OCTET_STREAM payload = _FilePayload(file_, mimetype, executor=self._executor) payload.set_content_disposition("form-data", name=f"files[{index}]", filename=file_.filename) multipart.append_payload(payload) return aiohttp.web.Response(status=response.status_code, headers=response.headers, body=multipart) headers = response.headers if response.content_type: headers = headers or {} headers[_CONTENT_TYPE_KEY] = response.content_type return aiohttp.web.Response(status=response.status_code, headers=headers, body=response.payload)
Handle an AIOHTTP interaction request.
This method handles aiohttp specific detail before calling InteractionServer.on_interaction
with the data extracted from the request if it can and handles building an aiohttp response.
Parameters
- request (aiohttp.web.Request): The received request.
Returns
- aiohttp.web.Response: The aiohttp response.
View Source
async def close(self) -> None: """Gracefully close the server and any open connections.""" if not self._server or not self._close_event: raise errors.ComponentStateConflictError("Cannot close an inactive interaction server") if self._is_closing: await self.join() return self._is_closing = True self._application_fetch_lock = None # This shutdown then cleanup ordering matters. await self._server.shutdown() await self._server.cleanup() self._close_event.set() self._close_event = None self._server = None
Gracefully close the server and any open connections.
self,
interaction_type: Type[+_InteractionT_co],
/
) -> 'typing.Optional[interaction_server.ListenerT[_InteractionT_co, special_endpoints.InteractionResponseBuilder]]':
View Source
def get_listener( self, interaction_type: typing.Type[_InteractionT_co], / ) -> typing.Optional[interaction_server.ListenerT[_InteractionT_co, special_endpoints.InteractionResponseBuilder]]: return self._listeners.get(interaction_type)
Get the listener registered for an interaction.
Parameters
- interaction_type (typing.Type[hikari.interactions.base_interactions.PartialInteraction]): Type of the interaction to get the registered listener for.
Returns
- typing.Optional[ListenersT[hikari.interactions.base_interactions.PartialInteraction, hikari.api.special_endpoints.InteractionResponseBuilder]: The callback registered for the provided interaction type if found, else
None
.
View Source
async def join(self) -> None: """Wait for the process to halt before continuing.""" if not self._close_event: raise errors.ComponentStateConflictError("Cannot wait for an inactive interaction server to join") await self._close_event.wait()
Wait for the process to halt before continuing.
self,
body: bytes,
signature: bytes,
timestamp: bytes
) -> hikari.api.interaction_server.Response:
View Source
async def on_interaction(self, body: bytes, signature: bytes, timestamp: bytes) -> interaction_server.Response: """Handle an interaction received from Discord as a REST server. .. note:: If this server instance is alive then this will be called internally by the server but if the instance isn't alive then this may still be called externally to trigger interaction dispatch. Parameters ---------- body : bytes The interaction payload. signature : bytes Value of the `"X-Signature-Ed25519"` header used to verify the body. timestamp : bytes Value of the `"X-Signature-Timestamp"` header used to verify the body. Returns ------- hikari.api.interaction_server.Response Instructions on how the REST server calling this should respond to the interaction request. """ public_key = self._public_key or await self._fetch_public_key() try: public_key.verify(timestamp + body, signature) except (self._nacl.exceptions.BadSignatureError, ValueError): _LOGGER.error("Received a request with an invalid signature") return _Response(_BAD_REQUEST_STATUS, b"Invalid request signature") try: payload = self._loads(body.decode("utf-8")) interaction_type = int(payload["type"]) except (data_binding.JSONDecodeError, ValueError, TypeError) as exc: _LOGGER.error("Received a request with an invalid JSON body", exc_info=exc) return _Response(_BAD_REQUEST_STATUS, b"Invalid JSON body") except KeyError as exc: _LOGGER.error("Missing 'type' field in received JSON payload", exc_info=exc) return _Response(_BAD_REQUEST_STATUS, b"Missing required 'type' field in payload") if interaction_type == _PING_INTERACTION_TYPE: _LOGGER.debug("Responding to ping interaction") return _PONG_RESPONSE try: interaction = self._entity_factory.deserialize_interaction(payload) except errors.UnrecognisedEntityError: _LOGGER.debug("Ignoring unknown interaction type %s", interaction_type) return _Response(_NOT_IMPLEMENTED, b"Interaction type not implemented") except Exception as exc: asyncio.get_running_loop().call_exception_handler( {"message": "Exception occurred during interaction deserialization", "exception": exc} ) return _Response(_INTERNAL_SERVER_ERROR_STATUS, b"Exception occurred during interaction deserialization") if listener := self._listeners.get(type(interaction)): _LOGGER.debug("Dispatching interaction %s", interaction.id) try: result = await listener(interaction) raw_payload, files = result.build(self._entity_factory) payload = self._dumps(raw_payload) except Exception as exc: asyncio.get_running_loop().call_exception_handler( {"message": "Exception occurred during interaction dispatch", "exception": exc} ) return _Response(_INTERNAL_SERVER_ERROR_STATUS, b"Exception occurred during interaction dispatch") return _Response(_OK_STATUS, payload.encode(), files=files, content_type=_JSON_TYPE_WITH_CHARSET) _LOGGER.debug( "Ignoring interaction %s of type %s without registered listener", interaction.id, interaction.type ) return _Response(_NOT_IMPLEMENTED, b"Handler not set for this interaction type")
Handle an interaction received from Discord as a REST server.
Note: If this server instance is alive then this will be called internally by the server but if the instance isn't alive then this may still be called externally to trigger interaction dispatch.
Parameters
- body (bytes): The interaction payload.
- signature (bytes): Value of the
"X-Signature-Ed25519"
header used to verify the body. - timestamp (bytes): Value of the
"X-Signature-Timestamp"
header used to verify the body.
Returns
- hikari.api.interaction_server.Response: Instructions on how the REST server calling this should respond to the interaction request.
self,
interaction_type: Type[+_InteractionT_co],
listener: 'typing.Optional[interaction_server.ListenerT[_InteractionT_co, special_endpoints.InteractionResponseBuilder]]',
/,
*,
replace: bool = False
) -> None:
View Source
def set_listener( self, interaction_type: typing.Type[_InteractionT_co], listener: typing.Optional[ interaction_server.ListenerT[_InteractionT_co, special_endpoints.InteractionResponseBuilder] ], /, *, replace: bool = False, ) -> None: if listener: if not replace and interaction_type in self._listeners: raise TypeError(f"Listener already set for {interaction_type.__name__}") self._listeners[interaction_type] = listener else: self._listeners.pop(interaction_type, None)
Set the listener callback for this interaction server.
Parameters
- interaction_type (typing.Type[hikari.interactions.base_interactions.PartialInteraction]): The type of interaction this listener should be registered for.
- listener (typing.Optional[ListenerT[hikari.interactions.base_interactions.PartialInteraction, hikari.api.special_endpoints.InteractionResponseBuilder]]): The asynchronous listener callback to set or
None
to unset the previous listener.
Other Parameters
- replace (bool): Whether this call should replace the previously set listener or not. This call will raise a
ValueError
if set toFalse
when a listener is already set.
Raises
- TypeError: If
replace
isFalse
when a listener is already set.
self,
backlog: int = 128,
enable_signal_handlers: Optional[bool] = None,
host: Union[str, Sequence[str], NoneType] = None,
port: Optional[int] = None,
path: Optional[str] = None,
reuse_address: Optional[bool] = None,
reuse_port: Optional[bool] = None,
socket: Optional[socket.socket] = None,
shutdown_timeout: float = 60.0,
ssl_context: Optional[ssl.SSLContext] = None
) -> None:
View Source
async def start( self, backlog: int = 128, enable_signal_handlers: typing.Optional[bool] = None, host: typing.Optional[typing.Union[str, typing.Sequence[str]]] = None, port: typing.Optional[int] = None, path: typing.Optional[str] = None, reuse_address: typing.Optional[bool] = None, reuse_port: typing.Optional[bool] = None, socket: typing.Optional[socket_.socket] = None, shutdown_timeout: float = 60.0, ssl_context: typing.Optional[ssl.SSLContext] = None, ) -> None: """Start the bot and wait for the internal server to startup then return. .. note:: For more information on the other parameters such as defaults see AIOHTTP's documentation. Other Parameters ---------------- backlog : int The number of unaccepted connections that the system will allow before refusing new connections. enable_signal_handlers : typing.Optional[bool] Defaults to `True` if this is started in the main thread. If on a __non-Windows__ OS with builtin support for kernel-level POSIX signals, then setting this to `True` will allow treating keyboard interrupts and other OS signals to safely shut down the application as calls to shut down the application properly rather than just killing the process in a dirty state immediately. You should leave this enabled unless you plan to implement your own signal handling yourself. host : typing.Optional[typing.Union[str, aiohttp.web.HostSequence]] TCP/IP host or a sequence of hosts for the HTTP server. port : typing.Optional[int] TCP/IP port for the HTTP server. path : typing.Optional[str] File system path for HTTP server unix domain socket. reuse_address : typing.Optional[bool] Tells the kernel to reuse a local socket in TIME_WAIT state, without waiting for its natural timeout to expire. reuse_port : typing.Optional[bool] Tells the kernel to allow this endpoint to be bound to the same port as other existing endpoints are also bound to. socket : typing.Optional[socket.socket] A pre-existing socket object to accept connections on. shutdown_timeout : float A delay to wait for graceful server shutdown before forcefully disconnecting all open client sockets. This defaults to 60 seconds. ssl_context : typing.Optional[ssl.SSLContext] SSL context for HTTPS servers. """ if self._server: raise errors.ComponentStateConflictError("Cannot start an already active interaction server") if enable_signal_handlers is None: enable_signal_handlers = threading.current_thread() is threading.main_thread() self._close_event = asyncio.Event() self._is_closing = False aio_app = aiohttp.web.Application() aio_app.add_routes([aiohttp.web.post("/", self.aiohttp_hook)]) self._server = aiohttp.web_runner.AppRunner(aio_app, handle_signals=enable_signal_handlers, access_log=_LOGGER) await self._server.setup() sites: typing.List[aiohttp.web.BaseSite] = [] if host is not None: if isinstance(host, str): host = [host] for h in host: sites.append( aiohttp.web.TCPSite( self._server, h, port, shutdown_timeout=shutdown_timeout, ssl_context=ssl_context, backlog=backlog, reuse_address=reuse_address, reuse_port=reuse_port, ) ) elif path is None and socket is None or port is None: sites.append( aiohttp.web.TCPSite( self._server, port=port, shutdown_timeout=shutdown_timeout, ssl_context=ssl_context, backlog=backlog, reuse_address=reuse_address, reuse_port=reuse_port, ) ) if path is not None: sites.append( aiohttp.web.UnixSite( self._server, path, shutdown_timeout=shutdown_timeout, ssl_context=ssl_context, backlog=backlog ) ) if socket is not None: sites.append( aiohttp.web.SockSite( self._server, socket, shutdown_timeout=shutdown_timeout, ssl_context=ssl_context, backlog=backlog ) ) for site in sites: _LOGGER.info("Starting site on %s", site.name) await site.start()
Start the bot and wait for the internal server to startup then return.
Note: For more information on the other parameters such as defaults see AIOHTTP's documentation.
Other Parameters
- backlog (int): The number of unaccepted connections that the system will allow before refusing new connections.
enable_signal_handlers (typing.Optional[bool]): Defaults to
True
if this is started in the main thread.If on a __non-Windows__ OS with builtin support for kernel-level POSIX signals, then setting this to
True
will allow treating keyboard interrupts and other OS signals to safely shut down the application as calls to shut down the application properly rather than just killing the process in a dirty state immediately. You should leave this enabled unless you plan to implement your own signal handling yourself.- host (typing.Optional[typing.Union[str, aiohttp.web.HostSequence]]): TCP/IP host or a sequence of hosts for the HTTP server.
- port (typing.Optional[int]): TCP/IP port for the HTTP server.
- path (typing.Optional[str]): File system path for HTTP server unix domain socket.
- reuse_address (typing.Optional[bool]): Tells the kernel to reuse a local socket in TIME_WAIT state, without waiting for its natural timeout to expire.
- reuse_port (typing.Optional[bool]): Tells the kernel to allow this endpoint to be bound to the same port as other existing endpoints are also bound to.
- socket (typing.Optional[socket.socket]): A pre-existing socket object to accept connections on.
- shutdown_timeout (float): A delay to wait for graceful server shutdown before forcefully disconnecting all open client sockets. This defaults to 60 seconds.
- ssl_context (typing.Optional[ssl.SSLContext]): SSL context for HTTPS servers.