hikari.impl.event_manager_base
A base implementation for an event manager.
View Source
# -*- coding: utf-8 -*- # cython: language_level=3 # Copyright (c) 2020 Nekokatt # Copyright (c) 2021-present davfsa # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. """A base implementation for an event manager.""" from __future__ import annotations __all__: typing.Sequence[str] = ("filtered", "EventManagerBase", "EventStream") import asyncio import inspect import itertools import logging import sys import types import typing import warnings import weakref import attr from hikari import errors from hikari import iterators from hikari.api import config from hikari.api import event_manager as event_manager_ from hikari.events import base_events from hikari.events import shard_events from hikari.internal import aio from hikari.internal import fast_protocol from hikari.internal import reflect from hikari.internal import ux if typing.TYPE_CHECKING: from hikari import intents as intents_ from hikari.api import event_factory as event_factory_ from hikari.api import shard as gateway_shard from hikari.internal import data_binding _ConsumerT = typing.Callable[ [gateway_shard.GatewayShard, data_binding.JSONObject], typing.Coroutine[typing.Any, typing.Any, None] ] _ListenerMapT = typing.Dict[ typing.Type[base_events.EventT], typing.List[event_manager_.CallbackT[base_events.EventT]], ] _WaiterT = typing.Tuple[ typing.Optional[event_manager_.PredicateT[base_events.EventT]], asyncio.Future[base_events.EventT] ] _WaiterMapT = typing.Dict[typing.Type[base_events.EventT], typing.Set[_WaiterT[base_events.EventT]]] _EventManagerBaseT = typing.TypeVar("_EventManagerBaseT", bound="EventManagerBase") _UnboundMethodT = typing.Callable[ [_EventManagerBaseT, gateway_shard.GatewayShard, data_binding.JSONObject], typing.Coroutine[typing.Any, typing.Any, None], ] _EventStreamT = typing.TypeVar("_EventStreamT", bound="EventStream[typing.Any]") if sys.version_info >= (3, 10): # We can use types.UnionType on 3.10+ _UNIONS = frozenset((typing.Union, types.UnionType)) else: _UNIONS = frozenset((typing.Union,)) _LOGGER: typing.Final[logging.Logger] = logging.getLogger("hikari.event_manager") @typing.runtime_checkable class _FilteredMethodT(fast_protocol.FastProtocolChecking, typing.Protocol): __slots__: typing.Sequence[str] = () async def __call__(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject, /) -> None: raise NotImplementedError @property def __cache_components__(self) -> config.CacheComponents: raise NotImplementedError @property def __events_bitmask__(self) -> int: raise NotImplementedError def _generate_weak_listener( reference: weakref.WeakMethod[typing.Any], ) -> typing.Callable[[base_events.EventT], typing.Coroutine[typing.Any, typing.Any, None]]: async def call_weak_method(event: base_events.EventT) -> None: method = reference() if method is None: raise TypeError( "dead weak referenced subscriber method cannot be executed, try actually closing your event streamers" ) await method(event) return call_weak_method class EventStream(event_manager_.EventStream[base_events.EventT]): """An implementation of an event `EventStream` class. .. note:: While calling `EventStream.filter` on an active "opened" event stream will return a wrapping lazy iterator, calling it on an inactive "closed" event stream will return the event stream and add the given predicates to the streamer. """ __slots__: typing.Sequence[str] = ( "__weakref__", "_active", "_event", "_event_manager", "_event_type", "_filters", "_limit", "_queue", "_registered_listener", "_timeout", ) __weakref__: typing.Optional[weakref.ref[EventStream[base_events.EventT]]] def __init__( self, event_manager: event_manager_.EventManager, event_type: typing.Type[base_events.EventT], *, timeout: typing.Union[float, int, None], limit: typing.Optional[int] = None, ) -> None: self._active = False self._event: typing.Optional[asyncio.Event] = None self._event_manager = event_manager self._event_type = event_type self._filters: iterators.All[base_events.EventT] = iterators.All(()) self._limit = limit self._queue: typing.List[base_events.EventT] = [] self._registered_listener: typing.Optional[ typing.Callable[[base_events.EventT], typing.Coroutine[typing.Any, typing.Any, None]] ] = None # The registered wrapping function for the weak ref to this class's _listener method. self._timeout = timeout # These are only included at runtime in-order to avoid the model being typed as an asynchronous context manager. if not typing.TYPE_CHECKING: async def __aenter__(self: event_manager_.EventStreamT) -> event_manager_.EventStreamT: # This is sync only. warnings.warn( "Using EventStream as an async context manager has been deprecated since 2.0.0.dev104. " "Please use it as a synchronous context manager (e.g. with bot.stream(...)) instead.", category=DeprecationWarning, stacklevel=2, ) self.open() return self async def __aexit__( self, exc_type: typing.Optional[typing.Type[BaseException]], exc: typing.Optional[BaseException], exc_tb: typing.Optional[types.TracebackType], ) -> None: self.close() def __enter__(self: _EventStreamT) -> _EventStreamT: self.open() return self def __exit__( self, exc_type: typing.Optional[typing.Type[BaseException]], exc_val: typing.Optional[BaseException], exc_tb: typing.Optional[types.TracebackType], ) -> None: self.close() async def __anext__(self) -> base_events.EventT: if not self._active: raise TypeError("stream must be started with before entering it") while not self._queue: if not self._event: self._event = asyncio.Event() try: await asyncio.wait_for(self._event.wait(), timeout=self._timeout) except asyncio.TimeoutError: raise StopAsyncIteration from None self._event.clear() return self._queue.pop(0) def __await__(self) -> typing.Generator[None, None, typing.Sequence[base_events.EventT]]: return self._await_all().__await__() def __del__(self) -> None: # For the sake of protecting highly intelligent people who forget to close this, we register the event listener # with a weakref then try to close this on deletion. While this may lead to their consoles being spammed, this # is a small price to pay as it'll be way more obvious what's wrong than if we just left them with a vague # ominous memory leak. if self._active: _LOGGER.warning("active %r streamer fell out of scope before being closed", self._event_type.__name__) self.close() async def _await_all(self) -> typing.Sequence[base_events.EventT]: self.open() result = [event async for event in self] self.close() return result async def _listener(self, event: base_events.EventT) -> None: if not self._filters(event) or (self._limit is not None and len(self._queue) >= self._limit): return self._queue.append(event) if self._event: self._event.set() def close(self) -> None: if self._active and self._registered_listener is not None: try: self._event_manager.unsubscribe(self._event_type, self._registered_listener) except ValueError: pass self._registered_listener = None self._active = False def filter( self: _EventStreamT, *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[base_events.EventT], bool]], **attrs: typing.Any, ) -> _EventStreamT: filter_ = self._map_predicates_and_attr_getters("filter", *predicates, **attrs) if self._active: self._queue = [entry for entry in self._queue if filter_(entry)] self._filters |= filter_ return self def open(self) -> None: if not self._active: # For the sake of protecting highly intelligent people who forget to close this, we register the event # listener with a weakref then try to close this on deletion. While this may lead to their consoles being # spammed, this is a small price to pay as it'll be way more obvious what's wrong than if we just left them # with a vague ominous memory leak. reference = weakref.WeakMethod(self._listener) listener = _generate_weak_listener(reference) self._registered_listener = listener self._event_manager.subscribe(self._event_type, listener) self._active = True def _assert_is_listener(parameters: typing.Iterator[inspect.Parameter], /) -> None: if next(parameters, None) is None: raise TypeError("Event listener must have one positional argument for the event object.") if any(param.default is inspect.Parameter.empty for param in parameters): raise TypeError("Only the first argument for a listener can be required, the event argument.") def filtered( event_types: typing.Union[typing.Type[base_events.Event], typing.Sequence[typing.Type[base_events.Event]]], cache_components: config.CacheComponents = config.CacheComponents.NONE, /, ) -> typing.Callable[[_UnboundMethodT[_EventManagerBaseT]], _UnboundMethodT[_EventManagerBaseT]]: """Add metadata to a consumer method to indicate when it should be unmarshalled and dispatched. Parameters ---------- event_types Types of the events this raw consumer method may dispatch. This may either be a singular type of a sequence of types. Other Parameters ---------------- cache_components : hikari.api.config.CacheComponents Bitfield of the cache components this event may make altering calls to. This defaults to `hikari.api.config.CacheComponents.NONE`. """ if isinstance(event_types, typing.Sequence): # dict.fromkeys is used to remove any duplicate entries here event_types = tuple(dict.fromkeys(itertools.chain.from_iterable(e.dispatches() for e in event_types))) else: event_types = event_types.dispatches() bitmask = 0 for event_type in event_types: bitmask |= event_type.bitmask() # https://github.com/python/mypy/issues/2087 def decorator(method: _UnboundMethodT[_EventManagerBaseT], /) -> _UnboundMethodT[_EventManagerBaseT]: method.__cache_components__ = cache_components # type: ignore[attr-defined] method.__events_bitmask__ = bitmask # type: ignore[attr-defined] assert isinstance(method, _FilteredMethodT), "Incorrect attribute(s) set for a filtered method" return method # type: ignore[unreachable] return decorator @attr.define(weakref_slot=False) class _Consumer: callback: _ConsumerT = attr.field(hash=True) """The callback function for this consumer.""" events_bitmask: int = attr.field() """The registered events bitmask.""" is_caching: bool = attr.field() """Cached value of whether or not this consumer is making cache calls in the current env.""" listener_group_count: int = attr.field(init=False, default=0) """The number of listener groups registered to this consumer.""" waiter_group_count: int = attr.field(init=False, default=0) """The number of waiters groups registered to this consumer.""" @property def is_enabled(self) -> bool: return self.is_caching or self.listener_group_count > 0 or self.waiter_group_count > 0 class EventManagerBase(event_manager_.EventManager): """Provides functionality to consume and dispatch events. Specific event handlers should be in functions named `on_xxx` where `xxx` is the raw event name being dispatched in lower-case. """ __slots__: typing.Sequence[str] = ( "_consumers", "_event_factory", "_intents", "_listeners", "_waiters", ) def __init__( self, event_factory: event_factory_.EventFactory, intents: intents_.Intents, *, cache_components: config.CacheComponents = config.CacheComponents.NONE, ) -> None: self._consumers: typing.Dict[str, _Consumer] = {} self._event_factory = event_factory self._intents = intents self._listeners: _ListenerMapT[base_events.Event] = {} self._waiters: _WaiterMapT[base_events.Event] = {} for name, member in inspect.getmembers(self): if name.startswith("on_"): event_name = name[3:] if isinstance(member, _FilteredMethodT): caching = (member.__cache_components__ & cache_components) != 0 self._consumers[event_name] = _Consumer(member, member.__events_bitmask__, caching) else: self._consumers[event_name] = _Consumer(member, -1, cache_components != cache_components.NONE) def _increment_listener_group_count( self, event_type: typing.Type[base_events.Event], count: typing.Literal[-1, 1] ) -> None: event_bitmask = event_type.bitmask() for consumer in self._consumers.values(): if (consumer.events_bitmask & event_bitmask) == event_bitmask: consumer.listener_group_count += count def _increment_waiter_group_count( self, event_type: typing.Type[base_events.Event], count: typing.Literal[-1, 1] ) -> None: event_bitmask = event_type.bitmask() for consumer in self._consumers.values(): if (consumer.events_bitmask & event_bitmask) == event_bitmask: consumer.waiter_group_count += count def _enabled_for_event(self, event_type: typing.Type[base_events.Event], /) -> bool: for cls in event_type.dispatches(): if cls in self._listeners or cls in self._waiters: return True return False def _check_event(self, event_type: typing.Type[typing.Any], nested: int) -> None: try: is_event = issubclass(event_type, base_events.Event) except TypeError: is_event = False if not is_event: raise TypeError("'event_type' is a non-Event type") # Collection of combined bitfield combinations of intents that # could be enabled to receive this event. expected_intent_groups = base_events.get_required_intents_for(event_type) if expected_intent_groups: for expected_intent_group in expected_intent_groups: if (self._intents & expected_intent_group) == expected_intent_group: break else: expected_intents_str = ", ".join(map(str, expected_intent_groups)) warnings.warn( f"You have tried to listen to {event_type.__name__}, but this will only ever be triggered if " f"you enable one of the following intents: {expected_intents_str}.", category=errors.MissingIntentWarning, stacklevel=nested + 3, ) def consume_raw_event( self, event_name: str, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject ) -> None: if self._enabled_for_event(shard_events.ShardPayloadEvent): payload_event = self._event_factory.deserialize_shard_payload_event(shard, payload, name=event_name) self.dispatch(payload_event) consumer = self._consumers[event_name.lower()] asyncio.create_task(self._handle_dispatch(consumer, shard, payload), name=f"dispatch {event_name}") # Yes, this is not generic. The reason for this is MyPy complains about # using ABCs that are not concrete in generic types passed to functions. # For the sake of UX, I will check this at runtime instead and let the # user use a static type checker. def subscribe( self, event_type: typing.Type[typing.Any], callback: event_manager_.CallbackT[typing.Any], *, _nested: int = 0, ) -> None: if not inspect.iscoroutinefunction(callback): raise TypeError("Cannot subscribe a non-coroutine function callback") # `_nested` is used to show the correct source code snippet if an intent # warning is triggered. self._check_event(event_type, _nested) _LOGGER.debug( "subscribing callback 'async def %s%s' to event-type %s.%s", getattr(callback, "__name__", "<anon>"), inspect.signature(callback), event_type.__module__, event_type.__qualname__, ) try: self._listeners[event_type].append(callback) except KeyError: self._listeners[event_type] = [callback] self._increment_listener_group_count(event_type, 1) def get_listeners( self, event_type: typing.Type[base_events.EventT], /, *, polymorphic: bool = True, ) -> typing.Collection[event_manager_.CallbackT[base_events.EventT]]: if polymorphic: listeners: typing.List[event_manager_.CallbackT[base_events.EventT]] = [] for event in event_type.dispatches(): if subscribed_listeners := self._listeners.get(event): listeners.extend(subscribed_listeners) return listeners if items := self._listeners.get(event_type): return items.copy() return [] # Yes, this is not generic. The reason for this is MyPy complains about # using ABCs that are not concrete in generic types passed to functions. # For the sake of UX, I will check this at runtime instead and let the # user use a static type checker. def unsubscribe( self, event_type: typing.Type[typing.Any], callback: event_manager_.CallbackT[typing.Any], ) -> None: if listeners := self._listeners.get(event_type): _LOGGER.debug( "unsubscribing callback %s%s from event-type %s.%s", getattr(callback, "__name__", "<anon>"), inspect.signature(callback), event_type.__module__, event_type.__qualname__, ) listeners.remove(callback) if not listeners: del self._listeners[event_type] self._increment_listener_group_count(event_type, -1) def listen( self, *event_types: typing.Type[base_events.EventT], ) -> typing.Callable[[event_manager_.CallbackT[base_events.EventT]], event_manager_.CallbackT[base_events.EventT]]: def decorator( callback: event_manager_.CallbackT[base_events.EventT], ) -> event_manager_.CallbackT[base_events.EventT]: # Avoid resolving forward references in the function's signature if # event_type was explicitly provided as this may lead to errors. if event_types: _assert_is_listener(iter(inspect.signature(callback).parameters.values())) resolved_types = event_types else: signature = reflect.resolve_signature(callback) params = signature.parameters.values() _assert_is_listener(iter(params)) event_param = next(iter(params)) annotation = event_param.annotation if annotation is event_param.empty: raise TypeError("Must provide the event type in the @listen decorator or as a type hint!") if typing.get_origin(annotation) in _UNIONS: # Resolve the types inside the union resolved_types = typing.get_args(annotation) else: # Just pass back the annotation resolved_types = (annotation,) for resolved_type in resolved_types: self.subscribe(resolved_type, callback, _nested=1) return callback return decorator def dispatch(self, event: base_events.EventT) -> asyncio.Future[typing.Any]: if not isinstance(event, base_events.Event): raise TypeError(f"Events must be subclasses of {base_events.Event.__name__}, not {type(event).__name__}") tasks: typing.List[typing.Coroutine[None, typing.Any, None]] = [] for cls in event.dispatches(): if listeners := self._listeners.get(cls): for callback in listeners: tasks.append(self._invoke_callback(callback, event)) if cls not in self._waiters: continue waiter_set = self._waiters[cls] for waiter in tuple(waiter_set): predicate, future = waiter if not future.done(): try: if predicate and not predicate(event): continue except Exception as ex: future.set_exception(ex) else: future.set_result(event) waiter_set.remove(waiter) if not waiter_set: del self._waiters[cls] self._increment_waiter_group_count(cls, -1) return asyncio.gather(*tasks) if tasks else aio.completed_future() def stream( self, event_type: typing.Type[base_events.EventT], /, timeout: typing.Union[float, int, None], limit: typing.Optional[int] = None, ) -> event_manager_.EventStream[base_events.EventT]: self._check_event(event_type, 1) return EventStream(self, event_type, timeout=timeout, limit=limit) async def wait_for( self, event_type: typing.Type[base_events.EventT], /, timeout: typing.Union[float, int, None], predicate: typing.Optional[event_manager_.PredicateT[base_events.EventT]] = None, ) -> base_events.EventT: if not inspect.isclass(event_type) or not issubclass(event_type, base_events.Event): raise TypeError("Cannot wait for a non-Event type") self._check_event(event_type, 1) future: asyncio.Future[base_events.EventT] = asyncio.get_running_loop().create_future() try: waiter_set = self._waiters[event_type] except KeyError: waiter_set = set() self._waiters[event_type] = waiter_set self._increment_waiter_group_count(event_type, 1) pair = (predicate, future) waiter_set.add(pair) # type: ignore[arg-type] try: return await asyncio.wait_for(future, timeout=timeout) except asyncio.TimeoutError: waiter_set.remove(pair) # type: ignore[arg-type] if not waiter_set: del self._waiters[event_type] self._increment_waiter_group_count(event_type, -1) raise async def _handle_dispatch( self, consumer: _Consumer, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject, ) -> None: if not consumer.is_enabled: name = consumer.callback.__name__ _LOGGER.log( ux.TRACE, "Skipping raw dispatch for %s due to lack of any registered listeners or cache need", name ) return try: await consumer.callback(shard, payload) except asyncio.CancelledError: # Skip cancelled errors, likely caused by the event loop being shut down. pass except errors.UnrecognisedEntityError: _LOGGER.debug("Event referenced an unrecognised entity, discarding") except BaseException as ex: asyncio.get_running_loop().call_exception_handler( { "message": "Exception occurred in raw event dispatch conduit", "exception": ex, "task": asyncio.current_task(), } ) async def _invoke_callback( self, callback: event_manager_.CallbackT[base_events.EventT], event: base_events.EventT ) -> None: try: await callback(event) except Exception as ex: # Skip the first frame in logs, we don't care for it. trio = type(ex), ex, ex.__traceback__.tb_next if ex.__traceback__ is not None else None if base_events.is_no_recursive_throw_event(event): _LOGGER.error( "an exception occurred handling an event (%s), but it has been ignored", type(event).__name__, exc_info=trio, ) else: exception_event = base_events.ExceptionEvent( exception=ex, failed_event=event, failed_callback=callback, ) log = _LOGGER.debug if self.get_listeners(type(exception_event), polymorphic=True) else _LOGGER.error log("an exception occurred handling an event (%s)", type(event).__name__, exc_info=trio) await self.dispatch(exception_event)
View Source
class EventManagerBase(event_manager_.EventManager): """Provides functionality to consume and dispatch events. Specific event handlers should be in functions named `on_xxx` where `xxx` is the raw event name being dispatched in lower-case. """ __slots__: typing.Sequence[str] = ( "_consumers", "_event_factory", "_intents", "_listeners", "_waiters", ) def __init__( self, event_factory: event_factory_.EventFactory, intents: intents_.Intents, *, cache_components: config.CacheComponents = config.CacheComponents.NONE, ) -> None: self._consumers: typing.Dict[str, _Consumer] = {} self._event_factory = event_factory self._intents = intents self._listeners: _ListenerMapT[base_events.Event] = {} self._waiters: _WaiterMapT[base_events.Event] = {} for name, member in inspect.getmembers(self): if name.startswith("on_"): event_name = name[3:] if isinstance(member, _FilteredMethodT): caching = (member.__cache_components__ & cache_components) != 0 self._consumers[event_name] = _Consumer(member, member.__events_bitmask__, caching) else: self._consumers[event_name] = _Consumer(member, -1, cache_components != cache_components.NONE) def _increment_listener_group_count( self, event_type: typing.Type[base_events.Event], count: typing.Literal[-1, 1] ) -> None: event_bitmask = event_type.bitmask() for consumer in self._consumers.values(): if (consumer.events_bitmask & event_bitmask) == event_bitmask: consumer.listener_group_count += count def _increment_waiter_group_count( self, event_type: typing.Type[base_events.Event], count: typing.Literal[-1, 1] ) -> None: event_bitmask = event_type.bitmask() for consumer in self._consumers.values(): if (consumer.events_bitmask & event_bitmask) == event_bitmask: consumer.waiter_group_count += count def _enabled_for_event(self, event_type: typing.Type[base_events.Event], /) -> bool: for cls in event_type.dispatches(): if cls in self._listeners or cls in self._waiters: return True return False def _check_event(self, event_type: typing.Type[typing.Any], nested: int) -> None: try: is_event = issubclass(event_type, base_events.Event) except TypeError: is_event = False if not is_event: raise TypeError("'event_type' is a non-Event type") # Collection of combined bitfield combinations of intents that # could be enabled to receive this event. expected_intent_groups = base_events.get_required_intents_for(event_type) if expected_intent_groups: for expected_intent_group in expected_intent_groups: if (self._intents & expected_intent_group) == expected_intent_group: break else: expected_intents_str = ", ".join(map(str, expected_intent_groups)) warnings.warn( f"You have tried to listen to {event_type.__name__}, but this will only ever be triggered if " f"you enable one of the following intents: {expected_intents_str}.", category=errors.MissingIntentWarning, stacklevel=nested + 3, ) def consume_raw_event( self, event_name: str, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject ) -> None: if self._enabled_for_event(shard_events.ShardPayloadEvent): payload_event = self._event_factory.deserialize_shard_payload_event(shard, payload, name=event_name) self.dispatch(payload_event) consumer = self._consumers[event_name.lower()] asyncio.create_task(self._handle_dispatch(consumer, shard, payload), name=f"dispatch {event_name}") # Yes, this is not generic. The reason for this is MyPy complains about # using ABCs that are not concrete in generic types passed to functions. # For the sake of UX, I will check this at runtime instead and let the # user use a static type checker. def subscribe( self, event_type: typing.Type[typing.Any], callback: event_manager_.CallbackT[typing.Any], *, _nested: int = 0, ) -> None: if not inspect.iscoroutinefunction(callback): raise TypeError("Cannot subscribe a non-coroutine function callback") # `_nested` is used to show the correct source code snippet if an intent # warning is triggered. self._check_event(event_type, _nested) _LOGGER.debug( "subscribing callback 'async def %s%s' to event-type %s.%s", getattr(callback, "__name__", "<anon>"), inspect.signature(callback), event_type.__module__, event_type.__qualname__, ) try: self._listeners[event_type].append(callback) except KeyError: self._listeners[event_type] = [callback] self._increment_listener_group_count(event_type, 1) def get_listeners( self, event_type: typing.Type[base_events.EventT], /, *, polymorphic: bool = True, ) -> typing.Collection[event_manager_.CallbackT[base_events.EventT]]: if polymorphic: listeners: typing.List[event_manager_.CallbackT[base_events.EventT]] = [] for event in event_type.dispatches(): if subscribed_listeners := self._listeners.get(event): listeners.extend(subscribed_listeners) return listeners if items := self._listeners.get(event_type): return items.copy() return [] # Yes, this is not generic. The reason for this is MyPy complains about # using ABCs that are not concrete in generic types passed to functions. # For the sake of UX, I will check this at runtime instead and let the # user use a static type checker. def unsubscribe( self, event_type: typing.Type[typing.Any], callback: event_manager_.CallbackT[typing.Any], ) -> None: if listeners := self._listeners.get(event_type): _LOGGER.debug( "unsubscribing callback %s%s from event-type %s.%s", getattr(callback, "__name__", "<anon>"), inspect.signature(callback), event_type.__module__, event_type.__qualname__, ) listeners.remove(callback) if not listeners: del self._listeners[event_type] self._increment_listener_group_count(event_type, -1) def listen( self, *event_types: typing.Type[base_events.EventT], ) -> typing.Callable[[event_manager_.CallbackT[base_events.EventT]], event_manager_.CallbackT[base_events.EventT]]: def decorator( callback: event_manager_.CallbackT[base_events.EventT], ) -> event_manager_.CallbackT[base_events.EventT]: # Avoid resolving forward references in the function's signature if # event_type was explicitly provided as this may lead to errors. if event_types: _assert_is_listener(iter(inspect.signature(callback).parameters.values())) resolved_types = event_types else: signature = reflect.resolve_signature(callback) params = signature.parameters.values() _assert_is_listener(iter(params)) event_param = next(iter(params)) annotation = event_param.annotation if annotation is event_param.empty: raise TypeError("Must provide the event type in the @listen decorator or as a type hint!") if typing.get_origin(annotation) in _UNIONS: # Resolve the types inside the union resolved_types = typing.get_args(annotation) else: # Just pass back the annotation resolved_types = (annotation,) for resolved_type in resolved_types: self.subscribe(resolved_type, callback, _nested=1) return callback return decorator def dispatch(self, event: base_events.EventT) -> asyncio.Future[typing.Any]: if not isinstance(event, base_events.Event): raise TypeError(f"Events must be subclasses of {base_events.Event.__name__}, not {type(event).__name__}") tasks: typing.List[typing.Coroutine[None, typing.Any, None]] = [] for cls in event.dispatches(): if listeners := self._listeners.get(cls): for callback in listeners: tasks.append(self._invoke_callback(callback, event)) if cls not in self._waiters: continue waiter_set = self._waiters[cls] for waiter in tuple(waiter_set): predicate, future = waiter if not future.done(): try: if predicate and not predicate(event): continue except Exception as ex: future.set_exception(ex) else: future.set_result(event) waiter_set.remove(waiter) if not waiter_set: del self._waiters[cls] self._increment_waiter_group_count(cls, -1) return asyncio.gather(*tasks) if tasks else aio.completed_future() def stream( self, event_type: typing.Type[base_events.EventT], /, timeout: typing.Union[float, int, None], limit: typing.Optional[int] = None, ) -> event_manager_.EventStream[base_events.EventT]: self._check_event(event_type, 1) return EventStream(self, event_type, timeout=timeout, limit=limit) async def wait_for( self, event_type: typing.Type[base_events.EventT], /, timeout: typing.Union[float, int, None], predicate: typing.Optional[event_manager_.PredicateT[base_events.EventT]] = None, ) -> base_events.EventT: if not inspect.isclass(event_type) or not issubclass(event_type, base_events.Event): raise TypeError("Cannot wait for a non-Event type") self._check_event(event_type, 1) future: asyncio.Future[base_events.EventT] = asyncio.get_running_loop().create_future() try: waiter_set = self._waiters[event_type] except KeyError: waiter_set = set() self._waiters[event_type] = waiter_set self._increment_waiter_group_count(event_type, 1) pair = (predicate, future) waiter_set.add(pair) # type: ignore[arg-type] try: return await asyncio.wait_for(future, timeout=timeout) except asyncio.TimeoutError: waiter_set.remove(pair) # type: ignore[arg-type] if not waiter_set: del self._waiters[event_type] self._increment_waiter_group_count(event_type, -1) raise async def _handle_dispatch( self, consumer: _Consumer, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject, ) -> None: if not consumer.is_enabled: name = consumer.callback.__name__ _LOGGER.log( ux.TRACE, "Skipping raw dispatch for %s due to lack of any registered listeners or cache need", name ) return try: await consumer.callback(shard, payload) except asyncio.CancelledError: # Skip cancelled errors, likely caused by the event loop being shut down. pass except errors.UnrecognisedEntityError: _LOGGER.debug("Event referenced an unrecognised entity, discarding") except BaseException as ex: asyncio.get_running_loop().call_exception_handler( { "message": "Exception occurred in raw event dispatch conduit", "exception": ex, "task": asyncio.current_task(), } ) async def _invoke_callback( self, callback: event_manager_.CallbackT[base_events.EventT], event: base_events.EventT ) -> None: try: await callback(event) except Exception as ex: # Skip the first frame in logs, we don't care for it. trio = type(ex), ex, ex.__traceback__.tb_next if ex.__traceback__ is not None else None if base_events.is_no_recursive_throw_event(event): _LOGGER.error( "an exception occurred handling an event (%s), but it has been ignored", type(event).__name__, exc_info=trio, ) else: exception_event = base_events.ExceptionEvent( exception=ex, failed_event=event, failed_callback=callback, ) log = _LOGGER.debug if self.get_listeners(type(exception_event), polymorphic=True) else _LOGGER.error log("an exception occurred handling an event (%s)", type(event).__name__, exc_info=trio) await self.dispatch(exception_event)
Provides functionality to consume and dispatch events.
Specific event handlers should be in functions named on_xxx
where xxx
is the raw event name being dispatched in lower-case.
Methods
self,
event_factory: hikari.api.event_factory.EventFactory,
intents: hikari.intents.Intents,
*,
cache_components: hikari.api.config.CacheComponents = <CacheComponents.NONE: 0>
):
View Source
def __init__( self, event_factory: event_factory_.EventFactory, intents: intents_.Intents, *, cache_components: config.CacheComponents = config.CacheComponents.NONE, ) -> None: self._consumers: typing.Dict[str, _Consumer] = {} self._event_factory = event_factory self._intents = intents self._listeners: _ListenerMapT[base_events.Event] = {} self._waiters: _WaiterMapT[base_events.Event] = {} for name, member in inspect.getmembers(self): if name.startswith("on_"): event_name = name[3:] if isinstance(member, _FilteredMethodT): caching = (member.__cache_components__ & cache_components) != 0 self._consumers[event_name] = _Consumer(member, member.__events_bitmask__, caching) else: self._consumers[event_name] = _Consumer(member, -1, cache_components != cache_components.NONE)
self,
event_name: str,
shard: hikari.api.shard.GatewayShard,
payload: Dict[str, Any]
) -> None:
View Source
def consume_raw_event( self, event_name: str, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject ) -> None: if self._enabled_for_event(shard_events.ShardPayloadEvent): payload_event = self._event_factory.deserialize_shard_payload_event(shard, payload, name=event_name) self.dispatch(payload_event) consumer = self._consumers[event_name.lower()] asyncio.create_task(self._handle_dispatch(consumer, shard, payload), name=f"dispatch {event_name}")
Consume a raw event.
Parameters
- event_name (str): The case-insensitive name of the event being triggered.
- shard (hikari.api.shard.GatewayShard): Object of the shard that received this event.
- payload (hikari.internal.data_binding.JSONObject): Payload of the event being triggered.
Raises
- LookupError: If there is no consumer for the event.
View Source
def dispatch(self, event: base_events.EventT) -> asyncio.Future[typing.Any]: if not isinstance(event, base_events.Event): raise TypeError(f"Events must be subclasses of {base_events.Event.__name__}, not {type(event).__name__}") tasks: typing.List[typing.Coroutine[None, typing.Any, None]] = [] for cls in event.dispatches(): if listeners := self._listeners.get(cls): for callback in listeners: tasks.append(self._invoke_callback(callback, event)) if cls not in self._waiters: continue waiter_set = self._waiters[cls] for waiter in tuple(waiter_set): predicate, future = waiter if not future.done(): try: if predicate and not predicate(event): continue except Exception as ex: future.set_exception(ex) else: future.set_result(event) waiter_set.remove(waiter) if not waiter_set: del self._waiters[cls] self._increment_waiter_group_count(cls, -1) return asyncio.gather(*tasks) if tasks else aio.completed_future()
Dispatch an event.
Parameters
- event (hikari.events.base_events.Event): The event to dispatch.
Example
We can dispatch custom events by first defining a class that derives from hikari.events.base_events.Event
.
import attr
from hikari.traits import RESTAware
from hikari.events.base_events import Event
from hikari.users import User
from hikari.snowflakes import Snowflake
@attr.define()
class EveryoneMentionedEvent(Event):
app: RESTAware = attr.field()
author: User = attr.field()
'''The user who mentioned everyone.'''
content: str = attr.field()
'''The message that was sent.'''
message_id: Snowflake = attr.field()
'''The message ID.'''
channel_id: Snowflake = attr.field()
'''The channel ID.'''
We can then dispatch our event as we see fit.
from hikari.events.messages import MessageCreateEvent
@bot.listen(MessageCreateEvent)
async def on_message(event):
if "@everyone" in event.content or "@here" in event.content:
event = EveryoneMentionedEvent(
author=event.author,
content=event.content,
message_id=event.id,
channel_id=event.channel_id,
)
bot.dispatch(event)
This event can be listened to elsewhere by subscribing to it with EventManager.subscribe
.
@bot.listen(EveryoneMentionedEvent)
async def on_everyone_mentioned(event):
print(event.user, "just pinged everyone in", event.channel_id)
Returns
- asyncio.Future[typing.Any]: A future that can be optionally awaited. If awaited, the future will complete once all corresponding event listeners have been invoked. If not awaited, this will schedule the dispatch of the events in the background for later.
See Also
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.unsubscribe
hikari.api.event_manager.EventManager.wait_for
self,
event_type: Type[~EventT],
/,
*,
polymorphic: bool = True
) -> Collection[Callable[[~EventT], Coroutine[Any, Any, NoneType]]]:
View Source
def get_listeners( self, event_type: typing.Type[base_events.EventT], /, *, polymorphic: bool = True, ) -> typing.Collection[event_manager_.CallbackT[base_events.EventT]]: if polymorphic: listeners: typing.List[event_manager_.CallbackT[base_events.EventT]] = [] for event in event_type.dispatches(): if subscribed_listeners := self._listeners.get(event): listeners.extend(subscribed_listeners) return listeners if items := self._listeners.get(event_type): return items.copy() return []
Get the listeners for a given event type, if there are any.
Parameters
- event_type (typing.Type[T]): The event type to look for.
T
must be a subclass ofhikari.events.base_events.Event
. - polymorphic (bool): If
True
, this will also return the listeners for all the event typesevent_type
will dispatch. IfFalse
, then only listeners for this class specifically are returned. The default isTrue
.
Returns
- typing.Collection[typing.Callable[[T], typing.Coroutine[typing.Any, typing.Any, None]]: A copy of the collection of listeners for the event. Will return an empty collection if nothing is registered.
self,
*event_types: Type[~EventT]
) -> Callable[[Callable[[~EventT], Coroutine[Any, Any, NoneType]]], Callable[[~EventT], Coroutine[Any, Any, NoneType]]]:
View Source
def listen( self, *event_types: typing.Type[base_events.EventT], ) -> typing.Callable[[event_manager_.CallbackT[base_events.EventT]], event_manager_.CallbackT[base_events.EventT]]: def decorator( callback: event_manager_.CallbackT[base_events.EventT], ) -> event_manager_.CallbackT[base_events.EventT]: # Avoid resolving forward references in the function's signature if # event_type was explicitly provided as this may lead to errors. if event_types: _assert_is_listener(iter(inspect.signature(callback).parameters.values())) resolved_types = event_types else: signature = reflect.resolve_signature(callback) params = signature.parameters.values() _assert_is_listener(iter(params)) event_param = next(iter(params)) annotation = event_param.annotation if annotation is event_param.empty: raise TypeError("Must provide the event type in the @listen decorator or as a type hint!") if typing.get_origin(annotation) in _UNIONS: # Resolve the types inside the union resolved_types = typing.get_args(annotation) else: # Just pass back the annotation resolved_types = (annotation,) for resolved_type in resolved_types: self.subscribe(resolved_type, callback, _nested=1) return callback return decorator
Generate a decorator to subscribe a callback to an event type.
This is a second-order decorator.
Parameters
- *event_types (typing.Optional[typing.Type[T]]): The event types to subscribe to. The implementation may allow this to be undefined. If this is the case, the event type will be inferred instead from the type hints on the function signature.
T
must be a subclass ofhikari.events.base_events.Event
.
Returns
- typing.Callable[[T], T]: A decorator for a coroutine function that passes it to
EventManager.subscribe
before returning the function reference.
See Also
hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.unsubscribe
hikari.api.event_manager.EventManager.wait_for
self,
event_type: Type[~EventT],
/,
timeout: Union[float, int, NoneType],
limit: Optional[int] = None
) -> hikari.api.event_manager.EventStream[~EventT]:
View Source
def stream( self, event_type: typing.Type[base_events.EventT], /, timeout: typing.Union[float, int, None], limit: typing.Optional[int] = None, ) -> event_manager_.EventStream[base_events.EventT]: self._check_event(event_type, 1) return EventStream(self, event_type, timeout=timeout, limit=limit)
Return a stream iterator for the given event and sub-events.
Warning: If you use await stream.open()
to start the stream then you must also close it with await stream.close()
otherwise it may queue events in memory indefinitely.
Parameters
- event_type (typing.Type[hikari.events.base_events.Event]): The event type to listen for. This will listen for subclasses of this type additionally.
- timeout (typing.Optional[int, float]): How long this streamer should wait for the next event before ending the iteration. If
None
then this will continue until explicitly broken from. - limit (typing.Optional[int]): The limit for how many events this should queue at one time before dropping extra incoming events, leave this as
None
for the cache size to be unlimited.
Returns
- EventStream[hikari.events.base_events.Event]: The async iterator to handle streamed events. This must be started with
with stream:
orstream.open()
before asynchronously iterating over it.
Examples
with bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) as stream:
async for user_id in stream.map("user_id").limit(50):
...
or using open()
and close()
stream = bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id))
stream.open()
async for user_id in stream.map("user_id").limit(50)
...
stream.close()
See Also
hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.unsubscribe
hikari.api.event_manager.EventManager.wait_for
self,
event_type: Type[Any],
callback: Callable[[Any], Coroutine[Any, Any, NoneType]],
*,
_nested: int = 0
) -> None:
View Source
def subscribe( self, event_type: typing.Type[typing.Any], callback: event_manager_.CallbackT[typing.Any], *, _nested: int = 0, ) -> None: if not inspect.iscoroutinefunction(callback): raise TypeError("Cannot subscribe a non-coroutine function callback") # `_nested` is used to show the correct source code snippet if an intent # warning is triggered. self._check_event(event_type, _nested) _LOGGER.debug( "subscribing callback 'async def %s%s' to event-type %s.%s", getattr(callback, "__name__", "<anon>"), inspect.signature(callback), event_type.__module__, event_type.__qualname__, ) try: self._listeners[event_type].append(callback) except KeyError: self._listeners[event_type] = [callback] self._increment_listener_group_count(event_type, 1)
Subscribe a given callback to a given event type.
Parameters
- event_type (typing.Type[T]): The event type to listen for. This will also listen for any subclasses of the given type.
T
must be a subclass ofhikari.events.base_events.Event
. - callback: Must be a coroutine function to invoke. This should consume an instance of the given event, or an instance of a valid subclass if one exists. Any result is discarded.
Example
The following demonstrates subscribing a callback to message creation events.
from hikari.events.messages import MessageCreateEvent
async def on_message(event):
...
bot.subscribe(MessageCreateEvent, on_message)
See Also
hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.unsubscribe
hikari.api.event_manager.EventManager.wait_for
self,
event_type: Type[Any],
callback: Callable[[Any], Coroutine[Any, Any, NoneType]]
) -> None:
View Source
def unsubscribe( self, event_type: typing.Type[typing.Any], callback: event_manager_.CallbackT[typing.Any], ) -> None: if listeners := self._listeners.get(event_type): _LOGGER.debug( "unsubscribing callback %s%s from event-type %s.%s", getattr(callback, "__name__", "<anon>"), inspect.signature(callback), event_type.__module__, event_type.__qualname__, ) listeners.remove(callback) if not listeners: del self._listeners[event_type] self._increment_listener_group_count(event_type, -1)
Unsubscribe a given callback from a given event type, if present.
Parameters
- event_type (typing.Type[T]): The event type to unsubscribe from. This must be the same exact type as was originally subscribed with to be removed correctly.
T
must derive fromhikari.events.base_events.Event
. - callback: The callback to unsubscribe.
Example
The following demonstrates unsubscribing a callback from a message creation event.
from hikari.events.messages import MessageCreateEvent
async def on_message(event):
...
bot.unsubscribe(MessageCreateEvent, on_message)
See Also
hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.wait_for
self,
event_type: Type[~EventT],
/,
timeout: Union[float, int, NoneType],
predicate: Optional[Callable[[~EventT], bool]] = None
) -> ~EventT:
View Source
async def wait_for( self, event_type: typing.Type[base_events.EventT], /, timeout: typing.Union[float, int, None], predicate: typing.Optional[event_manager_.PredicateT[base_events.EventT]] = None, ) -> base_events.EventT: if not inspect.isclass(event_type) or not issubclass(event_type, base_events.Event): raise TypeError("Cannot wait for a non-Event type") self._check_event(event_type, 1) future: asyncio.Future[base_events.EventT] = asyncio.get_running_loop().create_future() try: waiter_set = self._waiters[event_type] except KeyError: waiter_set = set() self._waiters[event_type] = waiter_set self._increment_waiter_group_count(event_type, 1) pair = (predicate, future) waiter_set.add(pair) # type: ignore[arg-type] try: return await asyncio.wait_for(future, timeout=timeout) except asyncio.TimeoutError: waiter_set.remove(pair) # type: ignore[arg-type] if not waiter_set: del self._waiters[event_type] self._increment_waiter_group_count(event_type, -1) raise
Wait for a given event to occur once, then return the event.
Warning: Async predicates are not supported.
Parameters
- event_type (typing.Type[hikari.events.base_events.Event]): The event type to listen for. This will listen for subclasses of this type additionally.
- predicate: A function taking the event as the single parameter. This should return
True
if the event is one you want to return, orFalse
if the event should not be returned. If left asNone
(the default), then the first matching event type that the bot receives (or any subtype) will be the one returned. - timeout (typing.Union[float, int, None]): The amount of time to wait before raising an
asyncio.TimeoutError
and giving up instead. This is measured in seconds. IfNone
, then no timeout will be waited for (no timeout can result in "leaking" of coroutines that never complete if called in an uncontrolled way, so is not recommended).
Returns
- hikari.events.base_events.Event: The event that was provided.
Raises
- asyncio.TimeoutError: If the timeout is not
None
and is reached before an event is received that the predicate returnsTrue
for.
See Also
hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.unsubscribe
View Source
class EventStream(event_manager_.EventStream[base_events.EventT]): """An implementation of an event `EventStream` class. .. note:: While calling `EventStream.filter` on an active "opened" event stream will return a wrapping lazy iterator, calling it on an inactive "closed" event stream will return the event stream and add the given predicates to the streamer. """ __slots__: typing.Sequence[str] = ( "__weakref__", "_active", "_event", "_event_manager", "_event_type", "_filters", "_limit", "_queue", "_registered_listener", "_timeout", ) __weakref__: typing.Optional[weakref.ref[EventStream[base_events.EventT]]] def __init__( self, event_manager: event_manager_.EventManager, event_type: typing.Type[base_events.EventT], *, timeout: typing.Union[float, int, None], limit: typing.Optional[int] = None, ) -> None: self._active = False self._event: typing.Optional[asyncio.Event] = None self._event_manager = event_manager self._event_type = event_type self._filters: iterators.All[base_events.EventT] = iterators.All(()) self._limit = limit self._queue: typing.List[base_events.EventT] = [] self._registered_listener: typing.Optional[ typing.Callable[[base_events.EventT], typing.Coroutine[typing.Any, typing.Any, None]] ] = None # The registered wrapping function for the weak ref to this class's _listener method. self._timeout = timeout # These are only included at runtime in-order to avoid the model being typed as an asynchronous context manager. if not typing.TYPE_CHECKING: async def __aenter__(self: event_manager_.EventStreamT) -> event_manager_.EventStreamT: # This is sync only. warnings.warn( "Using EventStream as an async context manager has been deprecated since 2.0.0.dev104. " "Please use it as a synchronous context manager (e.g. with bot.stream(...)) instead.", category=DeprecationWarning, stacklevel=2, ) self.open() return self async def __aexit__( self, exc_type: typing.Optional[typing.Type[BaseException]], exc: typing.Optional[BaseException], exc_tb: typing.Optional[types.TracebackType], ) -> None: self.close() def __enter__(self: _EventStreamT) -> _EventStreamT: self.open() return self def __exit__( self, exc_type: typing.Optional[typing.Type[BaseException]], exc_val: typing.Optional[BaseException], exc_tb: typing.Optional[types.TracebackType], ) -> None: self.close() async def __anext__(self) -> base_events.EventT: if not self._active: raise TypeError("stream must be started with before entering it") while not self._queue: if not self._event: self._event = asyncio.Event() try: await asyncio.wait_for(self._event.wait(), timeout=self._timeout) except asyncio.TimeoutError: raise StopAsyncIteration from None self._event.clear() return self._queue.pop(0) def __await__(self) -> typing.Generator[None, None, typing.Sequence[base_events.EventT]]: return self._await_all().__await__() def __del__(self) -> None: # For the sake of protecting highly intelligent people who forget to close this, we register the event listener # with a weakref then try to close this on deletion. While this may lead to their consoles being spammed, this # is a small price to pay as it'll be way more obvious what's wrong than if we just left them with a vague # ominous memory leak. if self._active: _LOGGER.warning("active %r streamer fell out of scope before being closed", self._event_type.__name__) self.close() async def _await_all(self) -> typing.Sequence[base_events.EventT]: self.open() result = [event async for event in self] self.close() return result async def _listener(self, event: base_events.EventT) -> None: if not self._filters(event) or (self._limit is not None and len(self._queue) >= self._limit): return self._queue.append(event) if self._event: self._event.set() def close(self) -> None: if self._active and self._registered_listener is not None: try: self._event_manager.unsubscribe(self._event_type, self._registered_listener) except ValueError: pass self._registered_listener = None self._active = False def filter( self: _EventStreamT, *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[base_events.EventT], bool]], **attrs: typing.Any, ) -> _EventStreamT: filter_ = self._map_predicates_and_attr_getters("filter", *predicates, **attrs) if self._active: self._queue = [entry for entry in self._queue if filter_(entry)] self._filters |= filter_ return self def open(self) -> None: if not self._active: # For the sake of protecting highly intelligent people who forget to close this, we register the event # listener with a weakref then try to close this on deletion. While this may lead to their consoles being # spammed, this is a small price to pay as it'll be way more obvious what's wrong than if we just left them # with a vague ominous memory leak. reference = weakref.WeakMethod(self._listener) listener = _generate_weak_listener(reference) self._registered_listener = listener self._event_manager.subscribe(self._event_type, listener) self._active = True
An implementation of an event EventStream
class.
Note: While calling EventStream.filter
on an active "opened" event stream will return a wrapping lazy iterator, calling it on an inactive "closed" event stream will return the event stream and add the given predicates to the streamer.
Methods
self,
event_manager: hikari.api.event_manager.EventManager,
event_type: Type[~EventT],
*,
timeout: Union[float, int, NoneType],
limit: Optional[int] = None
):
View Source
def __init__( self, event_manager: event_manager_.EventManager, event_type: typing.Type[base_events.EventT], *, timeout: typing.Union[float, int, None], limit: typing.Optional[int] = None, ) -> None: self._active = False self._event: typing.Optional[asyncio.Event] = None self._event_manager = event_manager self._event_type = event_type self._filters: iterators.All[base_events.EventT] = iterators.All(()) self._limit = limit self._queue: typing.List[base_events.EventT] = [] self._registered_listener: typing.Optional[ typing.Callable[[base_events.EventT], typing.Coroutine[typing.Any, typing.Any, None]] ] = None # The registered wrapping function for the weak ref to this class's _listener method. self._timeout = timeout
View Source
def awaiting(self, window_size: int = 10) -> LazyIterator[ValueT]: """Await each item concurrently in a fixed size window. .. warning:: Setting a large window size, or setting it to 0 to await everything is a dangerous thing to do if you are making API calls. Some endpoints will get ratelimited and cause a backup of waiting tasks, others may begin to spam global rate limits instead (the `fetch_user` endpoint seems to be notorious for doing this). .. note:: This call assumes that the iterator contains awaitable values as input. MyPy cannot detect this nicely, so any cast is forced internally. If the item is not awaitable, you will receive a `TypeError` instead. You have been warned. You cannot escape the ways of the duck type young grasshopper. Parameters ---------- window_size : int The window size of how many tasks to await at once. You can set this to `0` to await everything at once, but see the below warning. Returns ------- LazyIterator[ValueT] The new lazy iterator to return. """ # Not type safe. Can I make this type safe? return _AwaitingLazyIterator(typing.cast("LazyIterator[typing.Awaitable[ValueT]]", self), window_size)
Await each item concurrently in a fixed size window.
Warning: Setting a large window size, or setting it to 0 to await everything is a dangerous thing to do if you are making API calls. Some endpoints will get ratelimited and cause a backup of waiting tasks, others may begin to spam global rate limits instead (the fetch_user
endpoint seems to be notorious for doing this).
Note: This call assumes that the iterator contains awaitable values as input. MyPy cannot detect this nicely, so any cast is forced internally. If the item is not awaitable, you will receive a TypeError
instead. You have been warned. You cannot escape the ways of the duck type young grasshopper.
Parameters
- window_size (int): The window size of how many tasks to await at once. You can set this to
0
to await everything at once, but see the below warning.
Returns
- LazyIterator[ValueT]: The new lazy iterator to return.
View Source
def chunk(self, chunk_size: int) -> LazyIterator[typing.Sequence[ValueT]]: """Return results in chunks of up to `chunk_size` amount of entries. Parameters ---------- chunk_size : int The limit for how many results should be returned in each chunk. Returns ------- LazyIterator[typing.Sequence[ValueT]] `LazyIterator` that emits each chunked sequence. """ return _ChunkedLazyIterator(self, chunk_size)
Return results in chunks of up to chunk_size
amount of entries.
Parameters
- chunk_size (int): The limit for how many results should be returned in each chunk.
Returns
- LazyIterator[typing.Sequence[ValueT]]:
LazyIterator
that emits each chunked sequence.
View Source
def close(self) -> None: if self._active and self._registered_listener is not None: try: self._event_manager.unsubscribe(self._event_type, self._registered_listener) except ValueError: pass self._registered_listener = None self._active = False
Mark this streamer as closed to stop it from queueing and receiving events.
If called on an already closed streamer then this will do nothing.
Note: with streamer
may be used as a short-cut for opening and closing a streamer.
self,
collector: Callable[[Sequence[~ValueT]], Collection[~ValueT]]
) -> Collection[~ValueT]:
View Source
async def collect( self, collector: typing.Callable[[typing.Sequence[ValueT]], typing.Collection[ValueT]] ) -> typing.Collection[ValueT]: """Collect the results into a given type and return it. Parameters ---------- collector A function that consumes a sequence of values and returns a collection. """ return collector(await self)
Collect the results into a given type and return it.
Parameters
- collector: A function that consumes a sequence of values and returns a collection.
View Source
async def count(self) -> int: """Count the number of results. Returns ------- int Number of results found. """ count = 0 async for _ in self: count += 1 return count
Count the number of results.
Returns
- int: Number of results found.
self,
*,
start: int = 0
) -> hikari.iterators.LazyIterator[typing.Tuple[int, ~ValueT]]:
View Source
def enumerate(self, *, start: int = 0) -> LazyIterator[typing.Tuple[int, ValueT]]: """Enumerate the paginated results lazily. This behaves as an asyncio-friendly version of `enumerate` which uses much less memory than collecting all the results first and calling `enumerate` across them. Parameters ---------- start : int Optional int to start at. If omitted, this is `0`. Examples -------- ```py >>> async for i, item in paginated_results.enumerate(): ... print(i, item) (0, foo) (1, bar) (2, baz) (3, bork) (4, qux) >>> async for i, item in paginated_results.enumerate(start=9): ... print(i, item) (9, foo) (10, bar) (11, baz) (12, bork) (13, qux) >>> async for i, item in paginated_results.enumerate(start=9).limit(3): ... print(i, item) (9, foo) (10, bar) (11, baz) ``` Returns ------- LazyIterator[typing.Tuple[int, T]] A paginated results view that asynchronously yields an increasing counter in a tuple with each result, lazily. """ return _EnumeratedLazyIterator(self, start=start)
Enumerate the paginated results lazily.
This behaves as an asyncio-friendly version of enumerate
which uses much less memory than collecting all the results first and calling enumerate
across them.
Parameters
- start (int): Optional int to start at. If omitted, this is
0
.
Examples
>>> async for i, item in paginated_results.enumerate():
... print(i, item)
(0, foo)
(1, bar)
(2, baz)
(3, bork)
(4, qux)
>>> async for i, item in paginated_results.enumerate(start=9):
... print(i, item)
(9, foo)
(10, bar)
(11, baz)
(12, bork)
(13, qux)
>>> async for i, item in paginated_results.enumerate(start=9).limit(3):
... print(i, item)
(9, foo)
(10, bar)
(11, baz)
Returns
- LazyIterator[typing.Tuple[int, T]]: A paginated results view that asynchronously yields an increasing counter in a tuple with each result, lazily.
self: ~_EventStreamT,
*predicates: Union[Tuple[str, Any], Callable[[~EventT], bool]],
**attrs: Any
) -> ~_EventStreamT:
View Source
def filter( self: _EventStreamT, *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[base_events.EventT], bool]], **attrs: typing.Any, ) -> _EventStreamT: filter_ = self._map_predicates_and_attr_getters("filter", *predicates, **attrs) if self._active: self._queue = [entry for entry in self._queue if filter_(entry)] self._filters |= filter_ return self
Filter the items by one or more conditions.
Each condition is treated as a predicate, being called with each item that this iterator would return when it is requested.
All conditions must evaluate to True
for the item to be returned. If this is not met, then the item is discarded and ignored, the next matching item will be returned instead, if there is one.
Parameters
- *predicates (typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]): Predicates to invoke. These are functions that take a value and return
True
if it is of interest, orFalse
otherwise. These may instead include 2-tuple
objects consisting of astr
attribute name (nested attributes are referred to using the.
operator), and values to compare for equality. This allows you to specify conditions such asmembers.filter(("user.bot", True))
. - **attrs (typing.Any): Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
Returns
- EventStream[ValueT]: The current stream with the new filter applied.
self,
flattener: Union[hikari.internal.spel.AttrGetter[~ValueT, Union[AsyncIterator[~AnotherValueT], Iterable[~AnotherValueT]]], Callable[[~ValueT], Union[AsyncIterator[~AnotherValueT], Iterable[~AnotherValueT]]]]
) -> hikari.iterators.LazyIterator[~AnotherValueT]:
View Source
def flat_map(self, flattener: _FlattenerT[ValueT, AnotherValueT]) -> LazyIterator[AnotherValueT]: r"""Perform a flat mapping operation. This will pass each item in the iterator to the given `function` parameter, expecting a new `typing.Iterable` or `typing.AsyncIterator` to be returned as the result. This means you can map to a new `LazyIterator`, `typing.AsyncIterator`, `typing.Iterable`, async generator, or generator. Remember that `typing.Iterator` implicitly provides `typing.Iterable` compatibility. This is used to provide lazy conversions, and can be used to implement reactive-like pipelines if desired. All results are combined into one large lazy iterator and yielded lazily. Parameters ---------- flattener A function that returns either an async iterator or iterator of new values. Could be an attribute name instead. Example ------- The following example generates a distinct collection of all mentioned users in the given channel from the past 500 messages. ```py def iter_mentioned_users(message: hikari.Message) -> typing.Iterable[Snowflake]: for match in re.findall(r"<@!?(\d+)>", message.content): yield Snowflake(match) mentioned_users = await ( channel .history() .limit(500) .map(".content") .flat_map(iter_mentioned_users) .distinct() ) ``` Returns ------- LazyIterator[AnotherValueT] The new lazy iterator to return. """ return _FlatMapLazyIterator(self, flattener)
Perform a flat mapping operation.
This will pass each item in the iterator to the given function
parameter, expecting a new typing.Iterable
or typing.AsyncIterator
to be returned as the result. This means you can map to a new LazyIterator
, typing.AsyncIterator
, typing.Iterable
, async generator, or generator.
Remember that typing.Iterator
implicitly provides typing.Iterable
compatibility.
This is used to provide lazy conversions, and can be used to implement reactive-like pipelines if desired.
All results are combined into one large lazy iterator and yielded lazily.
Parameters
- flattener: A function that returns either an async iterator or iterator of new values. Could be an attribute name instead.
Example
The following example generates a distinct collection of all mentioned users in the given channel from the past 500 messages.
def iter_mentioned_users(message: hikari.Message) -> typing.Iterable[Snowflake]:
for match in re.findall(r"<@!?(\d+)>", message.content):
yield Snowflake(match)
mentioned_users = await (
channel
.history()
.limit(500)
.map(".content")
.flat_map(iter_mentioned_users)
.distinct()
)
Returns
- LazyIterator[AnotherValueT]: The new lazy iterator to return.
View Source
async def for_each(self, consumer: typing.Callable[[ValueT], typing.Any]) -> None: """Pass each value to a given consumer immediately.""" if asyncio.iscoroutinefunction(consumer): async for item in self: await consumer(item) else: async for item in self: consumer(item)
Pass each value to a given consumer immediately.
View Source
async def last(self) -> ValueT: """Return the last element of this iterator only. .. note:: This method will consume the whole iterator if run. Returns ------- ValueT The last result. Raises ------ LookupError If no result exists. """ return await self.reversed().next()
Return the last element of this iterator only.
Note: This method will consume the whole iterator if run.
Returns
- ValueT: The last result.
Raises
- LookupError: If no result exists.
View Source
def limit(self, limit: int) -> LazyIterator[ValueT]: """Limit the number of items you receive from this async iterator. Parameters ---------- limit : int The number of items to get. This must be greater than zero. Examples -------- ```py >>> async for item in paginated_results.limit(3): ... print(item) ``` Returns ------- LazyIterator[ValueT] A paginated results view that asynchronously yields a maximum of the given number of items before completing. """ return _LimitedLazyIterator(self, limit)
Limit the number of items you receive from this async iterator.
Parameters
- limit (int): The number of items to get. This must be greater than zero.
Examples
>>> async for item in paginated_results.limit(3):
... print(item)
Returns
- LazyIterator[ValueT]: A paginated results view that asynchronously yields a maximum of the given number of items before completing.
self,
transformation: Union[Callable[[~ValueT], ~AnotherValueT], str]
) -> hikari.iterators.LazyIterator[~AnotherValueT]:
View Source
def map( self, transformation: typing.Union[typing.Callable[[ValueT], AnotherValueT], str], ) -> LazyIterator[AnotherValueT]: """Map the values to a different value. Parameters ---------- transformation : typing.Union[typing.Callable[[ValueT], bool], str] The function to use to map the attribute. This may alternatively be a string attribute name to replace the input value with. You can provide nested attributes using the `.` operator. Returns ------- LazyIterator[AnotherValueT] `LazyIterator` that maps each value to another value. """ if isinstance(transformation, str): transformation = typing.cast("spel.AttrGetter[ValueT, AnotherValueT]", spel.AttrGetter(transformation)) return _MappingLazyIterator(self, transformation)
Map the values to a different value.
Parameters
- transformation (typing.Union[typing.Callable[[ValueT], bool], str]): The function to use to map the attribute. This may alternatively be a string attribute name to replace the input value with. You can provide nested attributes using the
.
operator.
Returns
- LazyIterator[AnotherValueT]:
LazyIterator
that maps each value to another value.
View Source
async def next(self) -> ValueT: """Return the next element of this iterator only. Returns ------- ValueT The next result. Raises ------ LookupError If no more results exist. """ try: return await self.__anext__() except StopAsyncIteration: raise LookupError("No elements were found") from None
Return the next element of this iterator only.
Returns
- ValueT: The next result.
Raises
- LookupError: If no more results exist.
View Source
def open(self) -> None: if not self._active: # For the sake of protecting highly intelligent people who forget to close this, we register the event # listener with a weakref then try to close this on deletion. While this may lead to their consoles being # spammed, this is a small price to pay as it'll be way more obvious what's wrong than if we just left them # with a vague ominous memory leak. reference = weakref.WeakMethod(self._listener) listener = _generate_weak_listener(reference) self._registered_listener = listener self._event_manager.subscribe(self._event_type, listener) self._active = True
Mark this streamer as opened to let it start receiving and queueing events.
If called on an already started streamer then this will do nothing.
Note: with streamer
may be used as a short-cut for opening and closing a stream.
View Source
def reversed(self) -> LazyIterator[ValueT]: """Return a lazy iterator of the remainder of this iterator's values reversed. Returns ------- LazyIterator[ValueT] The lazy iterator of this iterator's remaining values reversed. """ return _ReversedLazyIterator(self)
Return a lazy iterator of the remainder of this iterator's values reversed.
Returns
- LazyIterator[ValueT]: The lazy iterator of this iterator's remaining values reversed.
View Source
def skip(self, number: int) -> LazyIterator[ValueT]: """Drop the given number of items, then yield anything after. Parameters ---------- number : int The max number of items to drop before any items are yielded. Returns ------- LazyIterator[ValueT] A paginated results view that asynchronously yields all items AFTER the given number of items are discarded first. """ return _DropCountLazyIterator(self, number)
Drop the given number of items, then yield anything after.
Parameters
- number (int): The max number of items to drop before any items are yielded.
Returns
- LazyIterator[ValueT]: A paginated results view that asynchronously yields all items AFTER the given number of items are discarded first.
self,
*predicates: Union[Tuple[str, Any], Callable[[~ValueT], bool]],
**attrs: Any
) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
def skip_until( self, *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[ValueT], bool]], **attrs: typing.Any, ) -> LazyIterator[ValueT]: """Discard items while all conditions are False. Items after this will be yielded as normal. Parameters ---------- *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]] Predicates to invoke. These are functions that take a value and return `True` if it is of interest, or `False` otherwise. These may instead include 2-`tuple` objects consisting of a `str` attribute name (nested attributes are referred to using the `.` operator), and values to compare for equality. This allows you to specify conditions such as `members.skip_until(("user.bot", True))`. **attrs : typing.Any Alternative to passing 2-tuples. Cannot specify nested attributes using this method. Returns ------- LazyIterator[ValueT] LazyIterator that only emits values once a condition has failed. All items before this are discarded. """ conditions = self._map_predicates_and_attr_getters("skip_until", *predicates, **attrs) return _DropWhileLazyIterator(self, ~conditions)
Discard items while all conditions are False.
Items after this will be yielded as normal.
Parameters
- *predicates (typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]): Predicates to invoke. These are functions that take a value and return
True
if it is of interest, orFalse
otherwise. These may instead include 2-tuple
objects consisting of astr
attribute name (nested attributes are referred to using the.
operator), and values to compare for equality. This allows you to specify conditions such asmembers.skip_until(("user.bot", True))
. - **attrs (typing.Any): Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
Returns
- LazyIterator[ValueT]: LazyIterator that only emits values once a condition has failed. All items before this are discarded.
self,
*predicates: Union[Tuple[str, Any], Callable[[~ValueT], bool]],
**attrs: Any
) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
def skip_while( self, *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[ValueT], bool]], **attrs: typing.Any, ) -> LazyIterator[ValueT]: """Discard items while all conditions are True. Items after this will be yielded as normal. Parameters ---------- *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]] Predicates to invoke. These are functions that take a value and return `True` if it is of interest, or `False` otherwise. These may instead include 2-`tuple` objects consisting of a `str` attribute name (nested attributes are referred to using the `.` operator), and values to compare for equality. This allows you to specify conditions such as `members.skip_while(("user.bot", True))`. **attrs : typing.Any Alternative to passing 2-tuples. Cannot specify nested attributes using this method. Returns ------- LazyIterator[ValueT] LazyIterator that only emits values once a condition has been met. All items before this are discarded. """ conditions = self._map_predicates_and_attr_getters("skip_while", *predicates, **attrs) return _DropWhileLazyIterator(self, conditions)
Discard items while all conditions are True.
Items after this will be yielded as normal.
Parameters
- *predicates (typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]): Predicates to invoke. These are functions that take a value and return
True
if it is of interest, orFalse
otherwise. These may instead include 2-tuple
objects consisting of astr
attribute name (nested attributes are referred to using the.
operator), and values to compare for equality. This allows you to specify conditions such asmembers.skip_while(("user.bot", True))
. - **attrs (typing.Any): Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
Returns
- LazyIterator[ValueT]: LazyIterator that only emits values once a condition has been met. All items before this are discarded.
View Source
async def sort(self, *, key: typing.Any = None, reverse: bool = False) -> typing.Sequence[ValueT]: """Collect all results, then sort the collection before returning it.""" return sorted(await self, key=key, reverse=reverse)
Collect all results, then sort the collection before returning it.
self,
*predicates: Union[Tuple[str, Any], Callable[[~ValueT], bool]],
**attrs: Any
) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
def take_until( self, *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[ValueT], bool]], **attrs: typing.Any, ) -> LazyIterator[ValueT]: """Return each item until any conditions pass or the end is reached. Parameters ---------- *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]] Predicates to invoke. These are functions that take a value and return `True` if it is of interest, or `False` otherwise. These may instead include 2-`tuple` objects consisting of a `str` attribute name (nested attributes are referred to using the `.` operator), and values to compare for equality. This allows you to specify conditions such as `members.take_until(("user.bot", True))`. **attrs : typing.Any Alternative to passing 2-tuples. Cannot specify nested attributes using this method. Returns ------- LazyIterator[ValueT] LazyIterator that only emits values until any conditions are matched. """ conditions = self._map_predicates_and_attr_getters("take_until", *predicates, **attrs) return _TakeWhileLazyIterator(self, ~conditions)
Return each item until any conditions pass or the end is reached.
Parameters
- *predicates (typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]): Predicates to invoke. These are functions that take a value and return
True
if it is of interest, orFalse
otherwise. These may instead include 2-tuple
objects consisting of astr
attribute name (nested attributes are referred to using the.
operator), and values to compare for equality. This allows you to specify conditions such asmembers.take_until(("user.bot", True))
. - **attrs (typing.Any): Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
Returns
- LazyIterator[ValueT]: LazyIterator that only emits values until any conditions are matched.
self,
*predicates: Union[Tuple[str, Any], Callable[[~ValueT], bool]],
**attrs: Any
) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
def take_while( self, *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[ValueT], bool]], **attrs: typing.Any, ) -> LazyIterator[ValueT]: """Return each item until any conditions fail or the end is reached. Parameters ---------- *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]] Predicates to invoke. These are functions that take a value and return `True` if it is of interest, or `False` otherwise. These may instead include 2-`tuple` objects consisting of a `str` attribute name (nested attributes are referred to using the `.` operator), and values to compare for equality. This allows you to specify conditions such as `members.take_while(("user.bot", True))`. **attrs : typing.Any Alternative to passing 2-tuples. Cannot specify nested attributes using this method. Returns ------- LazyIterator[ValueT] LazyIterator that only emits values until any conditions are not matched. """ conditions = self._map_predicates_and_attr_getters("take_while", *predicates, **attrs) return _TakeWhileLazyIterator(self, conditions)
Return each item until any conditions fail or the end is reached.
Parameters
- *predicates (typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]): Predicates to invoke. These are functions that take a value and return
True
if it is of interest, orFalse
otherwise. These may instead include 2-tuple
objects consisting of astr
attribute name (nested attributes are referred to using the.
operator), and values to compare for equality. This allows you to specify conditions such asmembers.take_while(("user.bot", True))
. - **attrs (typing.Any): Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
Returns
- LazyIterator[ValueT]: LazyIterator that only emits values until any conditions are not matched.
event_types: Union[Type[hikari.events.base_events.Event], Sequence[Type[hikari.events.base_events.Event]]],
cache_components: hikari.api.config.CacheComponents = <CacheComponents.NONE: 0>,
/
) -> Callable[[Callable[[~_EventManagerBaseT, hikari.api.shard.GatewayShard, Dict[str, Any]], Coroutine[Any, Any, NoneType]]], Callable[[~_EventManagerBaseT, hikari.api.shard.GatewayShard, Dict[str, Any]], Coroutine[Any, Any, NoneType]]]:
View Source
def filtered( event_types: typing.Union[typing.Type[base_events.Event], typing.Sequence[typing.Type[base_events.Event]]], cache_components: config.CacheComponents = config.CacheComponents.NONE, /, ) -> typing.Callable[[_UnboundMethodT[_EventManagerBaseT]], _UnboundMethodT[_EventManagerBaseT]]: """Add metadata to a consumer method to indicate when it should be unmarshalled and dispatched. Parameters ---------- event_types Types of the events this raw consumer method may dispatch. This may either be a singular type of a sequence of types. Other Parameters ---------------- cache_components : hikari.api.config.CacheComponents Bitfield of the cache components this event may make altering calls to. This defaults to `hikari.api.config.CacheComponents.NONE`. """ if isinstance(event_types, typing.Sequence): # dict.fromkeys is used to remove any duplicate entries here event_types = tuple(dict.fromkeys(itertools.chain.from_iterable(e.dispatches() for e in event_types))) else: event_types = event_types.dispatches() bitmask = 0 for event_type in event_types: bitmask |= event_type.bitmask() # https://github.com/python/mypy/issues/2087 def decorator(method: _UnboundMethodT[_EventManagerBaseT], /) -> _UnboundMethodT[_EventManagerBaseT]: method.__cache_components__ = cache_components # type: ignore[attr-defined] method.__events_bitmask__ = bitmask # type: ignore[attr-defined] assert isinstance(method, _FilteredMethodT), "Incorrect attribute(s) set for a filtered method" return method # type: ignore[unreachable] return decorator
Add metadata to a consumer method to indicate when it should be unmarshalled and dispatched.
Parameters
- event_types: Types of the events this raw consumer method may dispatch. This may either be a singular type of a sequence of types.
Other Parameters
- cache_components (hikari.api.config.CacheComponents): Bitfield of the cache components this event may make altering calls to. This defaults to
hikari.api.config.CacheComponents.NONE
.