Back to top

hikari.impl.event_manager_base

A base implementation for an event manager.

View Source
# -*- coding: utf-8 -*-
# cython: language_level=3
# Copyright (c) 2020 Nekokatt
# Copyright (c) 2021-present davfsa
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""A base implementation for an event manager."""

from __future__ import annotations

__all__: typing.Sequence[str] = ("filtered", "EventManagerBase", "EventStream")

import asyncio
import inspect
import itertools
import logging
import sys
import types
import typing
import warnings
import weakref

import attr

from hikari import errors
from hikari import iterators
from hikari.api import config
from hikari.api import event_manager as event_manager_
from hikari.events import base_events
from hikari.events import shard_events
from hikari.internal import aio
from hikari.internal import fast_protocol
from hikari.internal import reflect
from hikari.internal import ux

if typing.TYPE_CHECKING:
    from hikari import intents as intents_
    from hikari.api import event_factory as event_factory_
    from hikari.api import shard as gateway_shard
    from hikari.internal import data_binding

    _ConsumerT = typing.Callable[
        [gateway_shard.GatewayShard, data_binding.JSONObject], typing.Coroutine[typing.Any, typing.Any, None]
    ]
    _ListenerMapT = typing.Dict[
        typing.Type[base_events.EventT],
        typing.List[event_manager_.CallbackT[base_events.EventT]],
    ]
    _WaiterT = typing.Tuple[
        typing.Optional[event_manager_.PredicateT[base_events.EventT]], asyncio.Future[base_events.EventT]
    ]
    _WaiterMapT = typing.Dict[typing.Type[base_events.EventT], typing.Set[_WaiterT[base_events.EventT]]]

    _EventManagerBaseT = typing.TypeVar("_EventManagerBaseT", bound="EventManagerBase")
    _UnboundMethodT = typing.Callable[
        [_EventManagerBaseT, gateway_shard.GatewayShard, data_binding.JSONObject],
        typing.Coroutine[typing.Any, typing.Any, None],
    ]

    _EventStreamT = typing.TypeVar("_EventStreamT", bound="EventStream[typing.Any]")


if sys.version_info >= (3, 10):
    # We can use types.UnionType on 3.10+
    _UNIONS = frozenset((typing.Union, types.UnionType))
else:
    _UNIONS = frozenset((typing.Union,))

_LOGGER: typing.Final[logging.Logger] = logging.getLogger("hikari.event_manager")


@typing.runtime_checkable
class _FilteredMethodT(fast_protocol.FastProtocolChecking, typing.Protocol):
    __slots__: typing.Sequence[str] = ()

    async def __call__(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject, /) -> None:
        raise NotImplementedError

    @property
    def __cache_components__(self) -> config.CacheComponents:
        raise NotImplementedError

    @property
    def __events_bitmask__(self) -> int:
        raise NotImplementedError


def _generate_weak_listener(
    reference: weakref.WeakMethod[typing.Any],
) -> typing.Callable[[base_events.EventT], typing.Coroutine[typing.Any, typing.Any, None]]:
    async def call_weak_method(event: base_events.EventT) -> None:
        method = reference()
        if method is None:
            raise TypeError(
                "dead weak referenced subscriber method cannot be executed, try actually closing your event streamers"
            )

        await method(event)

    return call_weak_method


class EventStream(event_manager_.EventStream[base_events.EventT]):
    """An implementation of an event `EventStream` class.

    .. note::
        While calling `EventStream.filter` on an active "opened" event stream
        will return a wrapping lazy iterator, calling it on an inactive "closed"
        event stream will return the event stream and add the given predicates
        to the streamer.
    """

    __slots__: typing.Sequence[str] = (
        "__weakref__",
        "_active",
        "_event",
        "_event_manager",
        "_event_type",
        "_filters",
        "_limit",
        "_queue",
        "_registered_listener",
        "_timeout",
    )

    __weakref__: typing.Optional[weakref.ref[EventStream[base_events.EventT]]]

    def __init__(
        self,
        event_manager: event_manager_.EventManager,
        event_type: typing.Type[base_events.EventT],
        *,
        timeout: typing.Union[float, int, None],
        limit: typing.Optional[int] = None,
    ) -> None:
        self._active = False
        self._event: typing.Optional[asyncio.Event] = None
        self._event_manager = event_manager
        self._event_type = event_type
        self._filters: iterators.All[base_events.EventT] = iterators.All(())
        self._limit = limit
        self._queue: typing.List[base_events.EventT] = []
        self._registered_listener: typing.Optional[
            typing.Callable[[base_events.EventT], typing.Coroutine[typing.Any, typing.Any, None]]
        ] = None
        # The registered wrapping function for the weak ref to this class's _listener method.
        self._timeout = timeout

    # These are only included at runtime in-order to avoid the model being typed as an asynchronous context manager.
    if not typing.TYPE_CHECKING:

        async def __aenter__(self: event_manager_.EventStreamT) -> event_manager_.EventStreamT:
            # This is sync only.
            warnings.warn(
                "Using EventStream as an async context manager has been deprecated since 2.0.0.dev104. "
                "Please use it as a synchronous context manager (e.g. with bot.stream(...)) instead.",
                category=DeprecationWarning,
                stacklevel=2,
            )

            self.open()
            return self

        async def __aexit__(
            self,
            exc_type: typing.Optional[typing.Type[BaseException]],
            exc: typing.Optional[BaseException],
            exc_tb: typing.Optional[types.TracebackType],
        ) -> None:
            self.close()

    def __enter__(self: _EventStreamT) -> _EventStreamT:
        self.open()
        return self

    def __exit__(
        self,
        exc_type: typing.Optional[typing.Type[BaseException]],
        exc_val: typing.Optional[BaseException],
        exc_tb: typing.Optional[types.TracebackType],
    ) -> None:
        self.close()

    async def __anext__(self) -> base_events.EventT:
        if not self._active:
            raise TypeError("stream must be started with before entering it")

        while not self._queue:
            if not self._event:
                self._event = asyncio.Event()

            try:
                await asyncio.wait_for(self._event.wait(), timeout=self._timeout)
            except asyncio.TimeoutError:
                raise StopAsyncIteration from None

            self._event.clear()

        return self._queue.pop(0)

    def __await__(self) -> typing.Generator[None, None, typing.Sequence[base_events.EventT]]:
        return self._await_all().__await__()

    def __del__(self) -> None:
        # For the sake of protecting highly intelligent people who forget to close this, we register the event listener
        # with a weakref then try to close this on deletion. While this may lead to their consoles being spammed, this
        # is a small price to pay as it'll be way more obvious what's wrong than if we just left them with a vague
        # ominous memory leak.
        if self._active:
            _LOGGER.warning("active %r streamer fell out of scope before being closed", self._event_type.__name__)
            self.close()

    async def _await_all(self) -> typing.Sequence[base_events.EventT]:
        self.open()
        result = [event async for event in self]
        self.close()
        return result

    async def _listener(self, event: base_events.EventT) -> None:
        if not self._filters(event) or (self._limit is not None and len(self._queue) >= self._limit):
            return

        self._queue.append(event)
        if self._event:
            self._event.set()

    def close(self) -> None:
        if self._active and self._registered_listener is not None:
            try:
                self._event_manager.unsubscribe(self._event_type, self._registered_listener)
            except ValueError:
                pass

            self._registered_listener = None

        self._active = False

    def filter(
        self: _EventStreamT,
        *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[base_events.EventT], bool]],
        **attrs: typing.Any,
    ) -> _EventStreamT:
        filter_ = self._map_predicates_and_attr_getters("filter", *predicates, **attrs)
        if self._active:
            self._queue = [entry for entry in self._queue if filter_(entry)]

        self._filters |= filter_
        return self

    def open(self) -> None:
        if not self._active:
            # For the sake of protecting highly intelligent people who forget to close this, we register the event
            # listener with a weakref then try to close this on deletion. While this may lead to their consoles being
            # spammed, this is a small price to pay as it'll be way more obvious what's wrong than if we just left them
            # with a vague ominous memory leak.
            reference = weakref.WeakMethod(self._listener)
            listener = _generate_weak_listener(reference)
            self._registered_listener = listener
            self._event_manager.subscribe(self._event_type, listener)
            self._active = True


def _assert_is_listener(parameters: typing.Iterator[inspect.Parameter], /) -> None:
    if next(parameters, None) is None:
        raise TypeError("Event listener must have one positional argument for the event object.")

    if any(param.default is inspect.Parameter.empty for param in parameters):
        raise TypeError("Only the first argument for a listener can be required, the event argument.")


def filtered(
    event_types: typing.Union[typing.Type[base_events.Event], typing.Sequence[typing.Type[base_events.Event]]],
    cache_components: config.CacheComponents = config.CacheComponents.NONE,
    /,
) -> typing.Callable[[_UnboundMethodT[_EventManagerBaseT]], _UnboundMethodT[_EventManagerBaseT]]:
    """Add metadata to a consumer method to indicate when it should be unmarshalled and dispatched.

    Parameters
    ----------
    event_types
        Types of the events this raw consumer method may dispatch.
        This may either be a singular type of a sequence of types.

    Other Parameters
    ----------------
    cache_components : hikari.api.config.CacheComponents
        Bitfield of the cache components this event may make altering calls to.
        This defaults to `hikari.api.config.CacheComponents.NONE`.
    """
    if isinstance(event_types, typing.Sequence):
        # dict.fromkeys is used to remove any duplicate entries here
        event_types = tuple(dict.fromkeys(itertools.chain.from_iterable(e.dispatches() for e in event_types)))

    else:
        event_types = event_types.dispatches()

    bitmask = 0
    for event_type in event_types:
        bitmask |= event_type.bitmask()

    # https://github.com/python/mypy/issues/2087
    def decorator(method: _UnboundMethodT[_EventManagerBaseT], /) -> _UnboundMethodT[_EventManagerBaseT]:
        method.__cache_components__ = cache_components  # type: ignore[attr-defined]
        method.__events_bitmask__ = bitmask  # type: ignore[attr-defined]
        assert isinstance(method, _FilteredMethodT), "Incorrect attribute(s) set for a filtered method"
        return method  # type: ignore[unreachable]

    return decorator


@attr.define(weakref_slot=False)
class _Consumer:
    callback: _ConsumerT = attr.field(hash=True)
    """The callback function for this consumer."""

    events_bitmask: int = attr.field()
    """The registered events bitmask."""

    is_caching: bool = attr.field()
    """Cached value of whether or not this consumer is making cache calls in the current env."""

    listener_group_count: int = attr.field(init=False, default=0)
    """The number of listener groups registered to this consumer."""

    waiter_group_count: int = attr.field(init=False, default=0)
    """The number of waiters groups registered to this consumer."""

    @property
    def is_enabled(self) -> bool:
        return self.is_caching or self.listener_group_count > 0 or self.waiter_group_count > 0


class EventManagerBase(event_manager_.EventManager):
    """Provides functionality to consume and dispatch events.

    Specific event handlers should be in functions named `on_xxx` where `xxx`
    is the raw event name being dispatched in lower-case.
    """

    __slots__: typing.Sequence[str] = (
        "_consumers",
        "_event_factory",
        "_intents",
        "_listeners",
        "_waiters",
    )

    def __init__(
        self,
        event_factory: event_factory_.EventFactory,
        intents: intents_.Intents,
        *,
        cache_components: config.CacheComponents = config.CacheComponents.NONE,
    ) -> None:
        self._consumers: typing.Dict[str, _Consumer] = {}
        self._event_factory = event_factory
        self._intents = intents
        self._listeners: _ListenerMapT[base_events.Event] = {}
        self._waiters: _WaiterMapT[base_events.Event] = {}

        for name, member in inspect.getmembers(self):
            if name.startswith("on_"):
                event_name = name[3:]
                if isinstance(member, _FilteredMethodT):
                    caching = (member.__cache_components__ & cache_components) != 0

                    self._consumers[event_name] = _Consumer(member, member.__events_bitmask__, caching)

                else:
                    self._consumers[event_name] = _Consumer(member, -1, cache_components != cache_components.NONE)

    def _increment_listener_group_count(
        self, event_type: typing.Type[base_events.Event], count: typing.Literal[-1, 1]
    ) -> None:
        event_bitmask = event_type.bitmask()
        for consumer in self._consumers.values():
            if (consumer.events_bitmask & event_bitmask) == event_bitmask:
                consumer.listener_group_count += count

    def _increment_waiter_group_count(
        self, event_type: typing.Type[base_events.Event], count: typing.Literal[-1, 1]
    ) -> None:
        event_bitmask = event_type.bitmask()
        for consumer in self._consumers.values():
            if (consumer.events_bitmask & event_bitmask) == event_bitmask:
                consumer.waiter_group_count += count

    def _enabled_for_event(self, event_type: typing.Type[base_events.Event], /) -> bool:
        for cls in event_type.dispatches():
            if cls in self._listeners or cls in self._waiters:
                return True

        return False

    def _check_event(self, event_type: typing.Type[typing.Any], nested: int) -> None:
        try:
            is_event = issubclass(event_type, base_events.Event)
        except TypeError:
            is_event = False

        if not is_event:
            raise TypeError("'event_type' is a non-Event type")

        # Collection of combined bitfield combinations of intents that
        # could be enabled to receive this event.
        expected_intent_groups = base_events.get_required_intents_for(event_type)

        if expected_intent_groups:
            for expected_intent_group in expected_intent_groups:
                if (self._intents & expected_intent_group) == expected_intent_group:
                    break
            else:
                expected_intents_str = ", ".join(map(str, expected_intent_groups))

                warnings.warn(
                    f"You have tried to listen to {event_type.__name__}, but this will only ever be triggered if "
                    f"you enable one of the following intents: {expected_intents_str}.",
                    category=errors.MissingIntentWarning,
                    stacklevel=nested + 3,
                )

    def consume_raw_event(
        self, event_name: str, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        if self._enabled_for_event(shard_events.ShardPayloadEvent):
            payload_event = self._event_factory.deserialize_shard_payload_event(shard, payload, name=event_name)
            self.dispatch(payload_event)
        consumer = self._consumers[event_name.lower()]
        asyncio.create_task(self._handle_dispatch(consumer, shard, payload), name=f"dispatch {event_name}")

    # Yes, this is not generic. The reason for this is MyPy complains about
    # using ABCs that are not concrete in generic types passed to functions.
    # For the sake of UX, I will check this at runtime instead and let the
    # user use a static type checker.
    def subscribe(
        self,
        event_type: typing.Type[typing.Any],
        callback: event_manager_.CallbackT[typing.Any],
        *,
        _nested: int = 0,
    ) -> None:
        if not inspect.iscoroutinefunction(callback):
            raise TypeError("Cannot subscribe a non-coroutine function callback")

        # `_nested` is used to show the correct source code snippet if an intent
        # warning is triggered.
        self._check_event(event_type, _nested)

        _LOGGER.debug(
            "subscribing callback 'async def %s%s' to event-type %s.%s",
            getattr(callback, "__name__", "<anon>"),
            inspect.signature(callback),
            event_type.__module__,
            event_type.__qualname__,
        )

        try:
            self._listeners[event_type].append(callback)
        except KeyError:
            self._listeners[event_type] = [callback]
            self._increment_listener_group_count(event_type, 1)

    def get_listeners(
        self,
        event_type: typing.Type[base_events.EventT],
        /,
        *,
        polymorphic: bool = True,
    ) -> typing.Collection[event_manager_.CallbackT[base_events.EventT]]:
        if polymorphic:
            listeners: typing.List[event_manager_.CallbackT[base_events.EventT]] = []
            for event in event_type.dispatches():
                if subscribed_listeners := self._listeners.get(event):
                    listeners.extend(subscribed_listeners)

            return listeners

        if items := self._listeners.get(event_type):
            return items.copy()

        return []

    # Yes, this is not generic. The reason for this is MyPy complains about
    # using ABCs that are not concrete in generic types passed to functions.
    # For the sake of UX, I will check this at runtime instead and let the
    # user use a static type checker.
    def unsubscribe(
        self,
        event_type: typing.Type[typing.Any],
        callback: event_manager_.CallbackT[typing.Any],
    ) -> None:
        if listeners := self._listeners.get(event_type):
            _LOGGER.debug(
                "unsubscribing callback %s%s from event-type %s.%s",
                getattr(callback, "__name__", "<anon>"),
                inspect.signature(callback),
                event_type.__module__,
                event_type.__qualname__,
            )
            listeners.remove(callback)
            if not listeners:
                del self._listeners[event_type]
                self._increment_listener_group_count(event_type, -1)

    def listen(
        self,
        *event_types: typing.Type[base_events.EventT],
    ) -> typing.Callable[[event_manager_.CallbackT[base_events.EventT]], event_manager_.CallbackT[base_events.EventT]]:
        def decorator(
            callback: event_manager_.CallbackT[base_events.EventT],
        ) -> event_manager_.CallbackT[base_events.EventT]:
            # Avoid resolving forward references in the function's signature if
            # event_type was explicitly provided as this may lead to errors.
            if event_types:
                _assert_is_listener(iter(inspect.signature(callback).parameters.values()))
                resolved_types = event_types

            else:
                signature = reflect.resolve_signature(callback)
                params = signature.parameters.values()
                _assert_is_listener(iter(params))
                event_param = next(iter(params))
                annotation = event_param.annotation

                if annotation is event_param.empty:
                    raise TypeError("Must provide the event type in the @listen decorator or as a type hint!")

                if typing.get_origin(annotation) in _UNIONS:
                    # Resolve the types inside the union
                    resolved_types = typing.get_args(annotation)
                else:
                    # Just pass back the annotation
                    resolved_types = (annotation,)

            for resolved_type in resolved_types:
                self.subscribe(resolved_type, callback, _nested=1)

            return callback

        return decorator

    def dispatch(self, event: base_events.EventT) -> asyncio.Future[typing.Any]:
        if not isinstance(event, base_events.Event):
            raise TypeError(f"Events must be subclasses of {base_events.Event.__name__}, not {type(event).__name__}")

        tasks: typing.List[typing.Coroutine[None, typing.Any, None]] = []

        for cls in event.dispatches():
            if listeners := self._listeners.get(cls):
                for callback in listeners:
                    tasks.append(self._invoke_callback(callback, event))

            if cls not in self._waiters:
                continue

            waiter_set = self._waiters[cls]
            for waiter in tuple(waiter_set):
                predicate, future = waiter
                if not future.done():
                    try:
                        if predicate and not predicate(event):
                            continue
                    except Exception as ex:
                        future.set_exception(ex)
                    else:
                        future.set_result(event)

                waiter_set.remove(waiter)

            if not waiter_set:
                del self._waiters[cls]
                self._increment_waiter_group_count(cls, -1)

        return asyncio.gather(*tasks) if tasks else aio.completed_future()

    def stream(
        self,
        event_type: typing.Type[base_events.EventT],
        /,
        timeout: typing.Union[float, int, None],
        limit: typing.Optional[int] = None,
    ) -> event_manager_.EventStream[base_events.EventT]:
        self._check_event(event_type, 1)
        return EventStream(self, event_type, timeout=timeout, limit=limit)

    async def wait_for(
        self,
        event_type: typing.Type[base_events.EventT],
        /,
        timeout: typing.Union[float, int, None],
        predicate: typing.Optional[event_manager_.PredicateT[base_events.EventT]] = None,
    ) -> base_events.EventT:
        if not inspect.isclass(event_type) or not issubclass(event_type, base_events.Event):
            raise TypeError("Cannot wait for a non-Event type")

        self._check_event(event_type, 1)

        future: asyncio.Future[base_events.EventT] = asyncio.get_running_loop().create_future()

        try:
            waiter_set = self._waiters[event_type]
        except KeyError:
            waiter_set = set()
            self._waiters[event_type] = waiter_set
            self._increment_waiter_group_count(event_type, 1)

        pair = (predicate, future)

        waiter_set.add(pair)  # type: ignore[arg-type]
        try:
            return await asyncio.wait_for(future, timeout=timeout)
        except asyncio.TimeoutError:
            waiter_set.remove(pair)  # type: ignore[arg-type]
            if not waiter_set:
                del self._waiters[event_type]
                self._increment_waiter_group_count(event_type, -1)

            raise

    async def _handle_dispatch(
        self,
        consumer: _Consumer,
        shard: gateway_shard.GatewayShard,
        payload: data_binding.JSONObject,
    ) -> None:
        if not consumer.is_enabled:
            name = consumer.callback.__name__
            _LOGGER.log(
                ux.TRACE, "Skipping raw dispatch for %s due to lack of any registered listeners or cache need", name
            )
            return

        try:
            await consumer.callback(shard, payload)
        except asyncio.CancelledError:
            # Skip cancelled errors, likely caused by the event loop being shut down.
            pass
        except errors.UnrecognisedEntityError:
            _LOGGER.debug("Event referenced an unrecognised entity, discarding")
        except BaseException as ex:
            asyncio.get_running_loop().call_exception_handler(
                {
                    "message": "Exception occurred in raw event dispatch conduit",
                    "exception": ex,
                    "task": asyncio.current_task(),
                }
            )

    async def _invoke_callback(
        self, callback: event_manager_.CallbackT[base_events.EventT], event: base_events.EventT
    ) -> None:
        try:
            await callback(event)
        except Exception as ex:
            # Skip the first frame in logs, we don't care for it.
            trio = type(ex), ex, ex.__traceback__.tb_next if ex.__traceback__ is not None else None

            if base_events.is_no_recursive_throw_event(event):
                _LOGGER.error(
                    "an exception occurred handling an event (%s), but it has been ignored",
                    type(event).__name__,
                    exc_info=trio,
                )
            else:
                exception_event = base_events.ExceptionEvent(
                    exception=ex,
                    failed_event=event,
                    failed_callback=callback,
                )

                log = _LOGGER.debug if self.get_listeners(type(exception_event), polymorphic=True) else _LOGGER.error
                log("an exception occurred handling an event (%s)", type(event).__name__, exc_info=trio)
                await self.dispatch(exception_event)
View Source
class EventManagerBase(event_manager_.EventManager):
    """Provides functionality to consume and dispatch events.

    Specific event handlers should be in functions named `on_xxx` where `xxx`
    is the raw event name being dispatched in lower-case.
    """

    __slots__: typing.Sequence[str] = (
        "_consumers",
        "_event_factory",
        "_intents",
        "_listeners",
        "_waiters",
    )

    def __init__(
        self,
        event_factory: event_factory_.EventFactory,
        intents: intents_.Intents,
        *,
        cache_components: config.CacheComponents = config.CacheComponents.NONE,
    ) -> None:
        self._consumers: typing.Dict[str, _Consumer] = {}
        self._event_factory = event_factory
        self._intents = intents
        self._listeners: _ListenerMapT[base_events.Event] = {}
        self._waiters: _WaiterMapT[base_events.Event] = {}

        for name, member in inspect.getmembers(self):
            if name.startswith("on_"):
                event_name = name[3:]
                if isinstance(member, _FilteredMethodT):
                    caching = (member.__cache_components__ & cache_components) != 0

                    self._consumers[event_name] = _Consumer(member, member.__events_bitmask__, caching)

                else:
                    self._consumers[event_name] = _Consumer(member, -1, cache_components != cache_components.NONE)

    def _increment_listener_group_count(
        self, event_type: typing.Type[base_events.Event], count: typing.Literal[-1, 1]
    ) -> None:
        event_bitmask = event_type.bitmask()
        for consumer in self._consumers.values():
            if (consumer.events_bitmask & event_bitmask) == event_bitmask:
                consumer.listener_group_count += count

    def _increment_waiter_group_count(
        self, event_type: typing.Type[base_events.Event], count: typing.Literal[-1, 1]
    ) -> None:
        event_bitmask = event_type.bitmask()
        for consumer in self._consumers.values():
            if (consumer.events_bitmask & event_bitmask) == event_bitmask:
                consumer.waiter_group_count += count

    def _enabled_for_event(self, event_type: typing.Type[base_events.Event], /) -> bool:
        for cls in event_type.dispatches():
            if cls in self._listeners or cls in self._waiters:
                return True

        return False

    def _check_event(self, event_type: typing.Type[typing.Any], nested: int) -> None:
        try:
            is_event = issubclass(event_type, base_events.Event)
        except TypeError:
            is_event = False

        if not is_event:
            raise TypeError("'event_type' is a non-Event type")

        # Collection of combined bitfield combinations of intents that
        # could be enabled to receive this event.
        expected_intent_groups = base_events.get_required_intents_for(event_type)

        if expected_intent_groups:
            for expected_intent_group in expected_intent_groups:
                if (self._intents & expected_intent_group) == expected_intent_group:
                    break
            else:
                expected_intents_str = ", ".join(map(str, expected_intent_groups))

                warnings.warn(
                    f"You have tried to listen to {event_type.__name__}, but this will only ever be triggered if "
                    f"you enable one of the following intents: {expected_intents_str}.",
                    category=errors.MissingIntentWarning,
                    stacklevel=nested + 3,
                )

    def consume_raw_event(
        self, event_name: str, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        if self._enabled_for_event(shard_events.ShardPayloadEvent):
            payload_event = self._event_factory.deserialize_shard_payload_event(shard, payload, name=event_name)
            self.dispatch(payload_event)
        consumer = self._consumers[event_name.lower()]
        asyncio.create_task(self._handle_dispatch(consumer, shard, payload), name=f"dispatch {event_name}")

    # Yes, this is not generic. The reason for this is MyPy complains about
    # using ABCs that are not concrete in generic types passed to functions.
    # For the sake of UX, I will check this at runtime instead and let the
    # user use a static type checker.
    def subscribe(
        self,
        event_type: typing.Type[typing.Any],
        callback: event_manager_.CallbackT[typing.Any],
        *,
        _nested: int = 0,
    ) -> None:
        if not inspect.iscoroutinefunction(callback):
            raise TypeError("Cannot subscribe a non-coroutine function callback")

        # `_nested` is used to show the correct source code snippet if an intent
        # warning is triggered.
        self._check_event(event_type, _nested)

        _LOGGER.debug(
            "subscribing callback 'async def %s%s' to event-type %s.%s",
            getattr(callback, "__name__", "<anon>"),
            inspect.signature(callback),
            event_type.__module__,
            event_type.__qualname__,
        )

        try:
            self._listeners[event_type].append(callback)
        except KeyError:
            self._listeners[event_type] = [callback]
            self._increment_listener_group_count(event_type, 1)

    def get_listeners(
        self,
        event_type: typing.Type[base_events.EventT],
        /,
        *,
        polymorphic: bool = True,
    ) -> typing.Collection[event_manager_.CallbackT[base_events.EventT]]:
        if polymorphic:
            listeners: typing.List[event_manager_.CallbackT[base_events.EventT]] = []
            for event in event_type.dispatches():
                if subscribed_listeners := self._listeners.get(event):
                    listeners.extend(subscribed_listeners)

            return listeners

        if items := self._listeners.get(event_type):
            return items.copy()

        return []

    # Yes, this is not generic. The reason for this is MyPy complains about
    # using ABCs that are not concrete in generic types passed to functions.
    # For the sake of UX, I will check this at runtime instead and let the
    # user use a static type checker.
    def unsubscribe(
        self,
        event_type: typing.Type[typing.Any],
        callback: event_manager_.CallbackT[typing.Any],
    ) -> None:
        if listeners := self._listeners.get(event_type):
            _LOGGER.debug(
                "unsubscribing callback %s%s from event-type %s.%s",
                getattr(callback, "__name__", "<anon>"),
                inspect.signature(callback),
                event_type.__module__,
                event_type.__qualname__,
            )
            listeners.remove(callback)
            if not listeners:
                del self._listeners[event_type]
                self._increment_listener_group_count(event_type, -1)

    def listen(
        self,
        *event_types: typing.Type[base_events.EventT],
    ) -> typing.Callable[[event_manager_.CallbackT[base_events.EventT]], event_manager_.CallbackT[base_events.EventT]]:
        def decorator(
            callback: event_manager_.CallbackT[base_events.EventT],
        ) -> event_manager_.CallbackT[base_events.EventT]:
            # Avoid resolving forward references in the function's signature if
            # event_type was explicitly provided as this may lead to errors.
            if event_types:
                _assert_is_listener(iter(inspect.signature(callback).parameters.values()))
                resolved_types = event_types

            else:
                signature = reflect.resolve_signature(callback)
                params = signature.parameters.values()
                _assert_is_listener(iter(params))
                event_param = next(iter(params))
                annotation = event_param.annotation

                if annotation is event_param.empty:
                    raise TypeError("Must provide the event type in the @listen decorator or as a type hint!")

                if typing.get_origin(annotation) in _UNIONS:
                    # Resolve the types inside the union
                    resolved_types = typing.get_args(annotation)
                else:
                    # Just pass back the annotation
                    resolved_types = (annotation,)

            for resolved_type in resolved_types:
                self.subscribe(resolved_type, callback, _nested=1)

            return callback

        return decorator

    def dispatch(self, event: base_events.EventT) -> asyncio.Future[typing.Any]:
        if not isinstance(event, base_events.Event):
            raise TypeError(f"Events must be subclasses of {base_events.Event.__name__}, not {type(event).__name__}")

        tasks: typing.List[typing.Coroutine[None, typing.Any, None]] = []

        for cls in event.dispatches():
            if listeners := self._listeners.get(cls):
                for callback in listeners:
                    tasks.append(self._invoke_callback(callback, event))

            if cls not in self._waiters:
                continue

            waiter_set = self._waiters[cls]
            for waiter in tuple(waiter_set):
                predicate, future = waiter
                if not future.done():
                    try:
                        if predicate and not predicate(event):
                            continue
                    except Exception as ex:
                        future.set_exception(ex)
                    else:
                        future.set_result(event)

                waiter_set.remove(waiter)

            if not waiter_set:
                del self._waiters[cls]
                self._increment_waiter_group_count(cls, -1)

        return asyncio.gather(*tasks) if tasks else aio.completed_future()

    def stream(
        self,
        event_type: typing.Type[base_events.EventT],
        /,
        timeout: typing.Union[float, int, None],
        limit: typing.Optional[int] = None,
    ) -> event_manager_.EventStream[base_events.EventT]:
        self._check_event(event_type, 1)
        return EventStream(self, event_type, timeout=timeout, limit=limit)

    async def wait_for(
        self,
        event_type: typing.Type[base_events.EventT],
        /,
        timeout: typing.Union[float, int, None],
        predicate: typing.Optional[event_manager_.PredicateT[base_events.EventT]] = None,
    ) -> base_events.EventT:
        if not inspect.isclass(event_type) or not issubclass(event_type, base_events.Event):
            raise TypeError("Cannot wait for a non-Event type")

        self._check_event(event_type, 1)

        future: asyncio.Future[base_events.EventT] = asyncio.get_running_loop().create_future()

        try:
            waiter_set = self._waiters[event_type]
        except KeyError:
            waiter_set = set()
            self._waiters[event_type] = waiter_set
            self._increment_waiter_group_count(event_type, 1)

        pair = (predicate, future)

        waiter_set.add(pair)  # type: ignore[arg-type]
        try:
            return await asyncio.wait_for(future, timeout=timeout)
        except asyncio.TimeoutError:
            waiter_set.remove(pair)  # type: ignore[arg-type]
            if not waiter_set:
                del self._waiters[event_type]
                self._increment_waiter_group_count(event_type, -1)

            raise

    async def _handle_dispatch(
        self,
        consumer: _Consumer,
        shard: gateway_shard.GatewayShard,
        payload: data_binding.JSONObject,
    ) -> None:
        if not consumer.is_enabled:
            name = consumer.callback.__name__
            _LOGGER.log(
                ux.TRACE, "Skipping raw dispatch for %s due to lack of any registered listeners or cache need", name
            )
            return

        try:
            await consumer.callback(shard, payload)
        except asyncio.CancelledError:
            # Skip cancelled errors, likely caused by the event loop being shut down.
            pass
        except errors.UnrecognisedEntityError:
            _LOGGER.debug("Event referenced an unrecognised entity, discarding")
        except BaseException as ex:
            asyncio.get_running_loop().call_exception_handler(
                {
                    "message": "Exception occurred in raw event dispatch conduit",
                    "exception": ex,
                    "task": asyncio.current_task(),
                }
            )

    async def _invoke_callback(
        self, callback: event_manager_.CallbackT[base_events.EventT], event: base_events.EventT
    ) -> None:
        try:
            await callback(event)
        except Exception as ex:
            # Skip the first frame in logs, we don't care for it.
            trio = type(ex), ex, ex.__traceback__.tb_next if ex.__traceback__ is not None else None

            if base_events.is_no_recursive_throw_event(event):
                _LOGGER.error(
                    "an exception occurred handling an event (%s), but it has been ignored",
                    type(event).__name__,
                    exc_info=trio,
                )
            else:
                exception_event = base_events.ExceptionEvent(
                    exception=ex,
                    failed_event=event,
                    failed_callback=callback,
                )

                log = _LOGGER.debug if self.get_listeners(type(exception_event), polymorphic=True) else _LOGGER.error
                log("an exception occurred handling an event (%s)", type(event).__name__, exc_info=trio)
                await self.dispatch(exception_event)

Provides functionality to consume and dispatch events.

Specific event handlers should be in functions named on_xxx where xxx is the raw event name being dispatched in lower-case.

Methods
#  def __init__(
   self,
   event_factory: hikari.api.event_factory.EventFactory,
   intents: hikari.intents.Intents,
   *,
   cache_components: hikari.api.config.CacheComponents = <CacheComponents.NONE: 0>
):
View Source
    def __init__(
        self,
        event_factory: event_factory_.EventFactory,
        intents: intents_.Intents,
        *,
        cache_components: config.CacheComponents = config.CacheComponents.NONE,
    ) -> None:
        self._consumers: typing.Dict[str, _Consumer] = {}
        self._event_factory = event_factory
        self._intents = intents
        self._listeners: _ListenerMapT[base_events.Event] = {}
        self._waiters: _WaiterMapT[base_events.Event] = {}

        for name, member in inspect.getmembers(self):
            if name.startswith("on_"):
                event_name = name[3:]
                if isinstance(member, _FilteredMethodT):
                    caching = (member.__cache_components__ & cache_components) != 0

                    self._consumers[event_name] = _Consumer(member, member.__events_bitmask__, caching)

                else:
                    self._consumers[event_name] = _Consumer(member, -1, cache_components != cache_components.NONE)
#  def consume_raw_event(
   self,
   event_name: str,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    def consume_raw_event(
        self, event_name: str, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        if self._enabled_for_event(shard_events.ShardPayloadEvent):
            payload_event = self._event_factory.deserialize_shard_payload_event(shard, payload, name=event_name)
            self.dispatch(payload_event)
        consumer = self._consumers[event_name.lower()]
        asyncio.create_task(self._handle_dispatch(consumer, shard, payload), name=f"dispatch {event_name}")

Consume a raw event.

Parameters
Raises
  • LookupError: If there is no consumer for the event.
#  def dispatch(self, event: ~EventT) -> _asyncio.Future[typing.Any]:
View Source
    def dispatch(self, event: base_events.EventT) -> asyncio.Future[typing.Any]:
        if not isinstance(event, base_events.Event):
            raise TypeError(f"Events must be subclasses of {base_events.Event.__name__}, not {type(event).__name__}")

        tasks: typing.List[typing.Coroutine[None, typing.Any, None]] = []

        for cls in event.dispatches():
            if listeners := self._listeners.get(cls):
                for callback in listeners:
                    tasks.append(self._invoke_callback(callback, event))

            if cls not in self._waiters:
                continue

            waiter_set = self._waiters[cls]
            for waiter in tuple(waiter_set):
                predicate, future = waiter
                if not future.done():
                    try:
                        if predicate and not predicate(event):
                            continue
                    except Exception as ex:
                        future.set_exception(ex)
                    else:
                        future.set_result(event)

                waiter_set.remove(waiter)

            if not waiter_set:
                del self._waiters[cls]
                self._increment_waiter_group_count(cls, -1)

        return asyncio.gather(*tasks) if tasks else aio.completed_future()

Dispatch an event.

Parameters
Example

We can dispatch custom events by first defining a class that derives from hikari.events.base_events.Event.

import attr

from hikari.traits import RESTAware
from hikari.events.base_events import Event
from hikari.users import User
from hikari.snowflakes import Snowflake

@attr.define()
class EveryoneMentionedEvent(Event):
    app: RESTAware = attr.field()

    author: User = attr.field()
    '''The user who mentioned everyone.'''

    content: str = attr.field()
    '''The message that was sent.'''

    message_id: Snowflake = attr.field()
    '''The message ID.'''

    channel_id: Snowflake = attr.field()
    '''The channel ID.'''

We can then dispatch our event as we see fit.

from hikari.events.messages import MessageCreateEvent

@bot.listen(MessageCreateEvent)
async def on_message(event):
    if "@everyone" in event.content or "@here" in event.content:
        event = EveryoneMentionedEvent(
            author=event.author,
            content=event.content,
            message_id=event.id,
            channel_id=event.channel_id,
        )

        bot.dispatch(event)

This event can be listened to elsewhere by subscribing to it with EventManager.subscribe.

@bot.listen(EveryoneMentionedEvent)
async def on_everyone_mentioned(event):
    print(event.user, "just pinged everyone in", event.channel_id)
Returns
  • asyncio.Future[typing.Any]: A future that can be optionally awaited. If awaited, the future will complete once all corresponding event listeners have been invoked. If not awaited, this will schedule the dispatch of the events in the background for later.
See Also

hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.unsubscribe
hikari.api.event_manager.EventManager.wait_for

#  def get_listeners(
   self,
   event_type: Type[~EventT],
   /,
   *,
   polymorphic: bool = True
) -> Collection[Callable[[~EventT], Coroutine[Any, Any, NoneType]]]:
View Source
    def get_listeners(
        self,
        event_type: typing.Type[base_events.EventT],
        /,
        *,
        polymorphic: bool = True,
    ) -> typing.Collection[event_manager_.CallbackT[base_events.EventT]]:
        if polymorphic:
            listeners: typing.List[event_manager_.CallbackT[base_events.EventT]] = []
            for event in event_type.dispatches():
                if subscribed_listeners := self._listeners.get(event):
                    listeners.extend(subscribed_listeners)

            return listeners

        if items := self._listeners.get(event_type):
            return items.copy()

        return []

Get the listeners for a given event type, if there are any.

Parameters
  • event_type (typing.Type[T]): The event type to look for. T must be a subclass of hikari.events.base_events.Event.
  • polymorphic (bool): If True, this will also return the listeners for all the event types event_type will dispatch. If False, then only listeners for this class specifically are returned. The default is True.
Returns
  • typing.Collection[typing.Callable[[T], typing.Coroutine[typing.Any, typing.Any, None]]: A copy of the collection of listeners for the event. Will return an empty collection if nothing is registered.
#  def listen(
   self,
   *event_types: Type[~EventT]
) -> Callable[[Callable[[~EventT], Coroutine[Any, Any, NoneType]]], Callable[[~EventT], Coroutine[Any, Any, NoneType]]]:
View Source
    def listen(
        self,
        *event_types: typing.Type[base_events.EventT],
    ) -> typing.Callable[[event_manager_.CallbackT[base_events.EventT]], event_manager_.CallbackT[base_events.EventT]]:
        def decorator(
            callback: event_manager_.CallbackT[base_events.EventT],
        ) -> event_manager_.CallbackT[base_events.EventT]:
            # Avoid resolving forward references in the function's signature if
            # event_type was explicitly provided as this may lead to errors.
            if event_types:
                _assert_is_listener(iter(inspect.signature(callback).parameters.values()))
                resolved_types = event_types

            else:
                signature = reflect.resolve_signature(callback)
                params = signature.parameters.values()
                _assert_is_listener(iter(params))
                event_param = next(iter(params))
                annotation = event_param.annotation

                if annotation is event_param.empty:
                    raise TypeError("Must provide the event type in the @listen decorator or as a type hint!")

                if typing.get_origin(annotation) in _UNIONS:
                    # Resolve the types inside the union
                    resolved_types = typing.get_args(annotation)
                else:
                    # Just pass back the annotation
                    resolved_types = (annotation,)

            for resolved_type in resolved_types:
                self.subscribe(resolved_type, callback, _nested=1)

            return callback

        return decorator

Generate a decorator to subscribe a callback to an event type.

This is a second-order decorator.

Parameters
  • *event_types (typing.Optional[typing.Type[T]]): The event types to subscribe to. The implementation may allow this to be undefined. If this is the case, the event type will be inferred instead from the type hints on the function signature. T must be a subclass of hikari.events.base_events.Event.
Returns
  • typing.Callable[[T], T]: A decorator for a coroutine function that passes it to EventManager.subscribe before returning the function reference.
See Also

hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.unsubscribe
hikari.api.event_manager.EventManager.wait_for

#  def stream(
   self,
   event_type: Type[~EventT],
   /,
   timeout: Union[float, int, NoneType],
   limit: Optional[int] = None
) -> hikari.api.event_manager.EventStream[~EventT]:
View Source
    def stream(
        self,
        event_type: typing.Type[base_events.EventT],
        /,
        timeout: typing.Union[float, int, None],
        limit: typing.Optional[int] = None,
    ) -> event_manager_.EventStream[base_events.EventT]:
        self._check_event(event_type, 1)
        return EventStream(self, event_type, timeout=timeout, limit=limit)

Return a stream iterator for the given event and sub-events.

Warning: If you use await stream.open() to start the stream then you must also close it with await stream.close() otherwise it may queue events in memory indefinitely.

Parameters
  • event_type (typing.Type[hikari.events.base_events.Event]): The event type to listen for. This will listen for subclasses of this type additionally.
  • timeout (typing.Optional[int, float]): How long this streamer should wait for the next event before ending the iteration. If None then this will continue until explicitly broken from.
  • limit (typing.Optional[int]): The limit for how many events this should queue at one time before dropping extra incoming events, leave this as None for the cache size to be unlimited.
Returns
  • EventStream[hikari.events.base_events.Event]: The async iterator to handle streamed events. This must be started with with stream: or stream.open() before asynchronously iterating over it.
Examples
with bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) as stream:
    async for user_id in stream.map("user_id").limit(50):
        ...

or using open() and close()

stream = bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id))
stream.open()

async for user_id in stream.map("user_id").limit(50)
    ...

stream.close()
See Also

hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.unsubscribe
hikari.api.event_manager.EventManager.wait_for

#  def subscribe(
   self,
   event_type: Type[Any],
   callback: Callable[[Any], Coroutine[Any, Any, NoneType]],
   *,
   _nested: int = 0
) -> None:
View Source
    def subscribe(
        self,
        event_type: typing.Type[typing.Any],
        callback: event_manager_.CallbackT[typing.Any],
        *,
        _nested: int = 0,
    ) -> None:
        if not inspect.iscoroutinefunction(callback):
            raise TypeError("Cannot subscribe a non-coroutine function callback")

        # `_nested` is used to show the correct source code snippet if an intent
        # warning is triggered.
        self._check_event(event_type, _nested)

        _LOGGER.debug(
            "subscribing callback 'async def %s%s' to event-type %s.%s",
            getattr(callback, "__name__", "<anon>"),
            inspect.signature(callback),
            event_type.__module__,
            event_type.__qualname__,
        )

        try:
            self._listeners[event_type].append(callback)
        except KeyError:
            self._listeners[event_type] = [callback]
            self._increment_listener_group_count(event_type, 1)

Subscribe a given callback to a given event type.

Parameters
  • event_type (typing.Type[T]): The event type to listen for. This will also listen for any subclasses of the given type. T must be a subclass of hikari.events.base_events.Event.
  • callback: Must be a coroutine function to invoke. This should consume an instance of the given event, or an instance of a valid subclass if one exists. Any result is discarded.
Example

The following demonstrates subscribing a callback to message creation events.

from hikari.events.messages import MessageCreateEvent

async def on_message(event):
    ...

bot.subscribe(MessageCreateEvent, on_message)
See Also

hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.unsubscribe
hikari.api.event_manager.EventManager.wait_for

#  def unsubscribe(
   self,
   event_type: Type[Any],
   callback: Callable[[Any], Coroutine[Any, Any, NoneType]]
) -> None:
View Source
    def unsubscribe(
        self,
        event_type: typing.Type[typing.Any],
        callback: event_manager_.CallbackT[typing.Any],
    ) -> None:
        if listeners := self._listeners.get(event_type):
            _LOGGER.debug(
                "unsubscribing callback %s%s from event-type %s.%s",
                getattr(callback, "__name__", "<anon>"),
                inspect.signature(callback),
                event_type.__module__,
                event_type.__qualname__,
            )
            listeners.remove(callback)
            if not listeners:
                del self._listeners[event_type]
                self._increment_listener_group_count(event_type, -1)

Unsubscribe a given callback from a given event type, if present.

Parameters
  • event_type (typing.Type[T]): The event type to unsubscribe from. This must be the same exact type as was originally subscribed with to be removed correctly. T must derive from hikari.events.base_events.Event.
  • callback: The callback to unsubscribe.
Example

The following demonstrates unsubscribing a callback from a message creation event.

from hikari.events.messages import MessageCreateEvent

async def on_message(event):
    ...

bot.unsubscribe(MessageCreateEvent, on_message)
See Also

hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.wait_for

#  async def wait_for(
   self,
   event_type: Type[~EventT],
   /,
   timeout: Union[float, int, NoneType],
   predicate: Optional[Callable[[~EventT], bool]] = None
) -> ~EventT:
View Source
    async def wait_for(
        self,
        event_type: typing.Type[base_events.EventT],
        /,
        timeout: typing.Union[float, int, None],
        predicate: typing.Optional[event_manager_.PredicateT[base_events.EventT]] = None,
    ) -> base_events.EventT:
        if not inspect.isclass(event_type) or not issubclass(event_type, base_events.Event):
            raise TypeError("Cannot wait for a non-Event type")

        self._check_event(event_type, 1)

        future: asyncio.Future[base_events.EventT] = asyncio.get_running_loop().create_future()

        try:
            waiter_set = self._waiters[event_type]
        except KeyError:
            waiter_set = set()
            self._waiters[event_type] = waiter_set
            self._increment_waiter_group_count(event_type, 1)

        pair = (predicate, future)

        waiter_set.add(pair)  # type: ignore[arg-type]
        try:
            return await asyncio.wait_for(future, timeout=timeout)
        except asyncio.TimeoutError:
            waiter_set.remove(pair)  # type: ignore[arg-type]
            if not waiter_set:
                del self._waiters[event_type]
                self._increment_waiter_group_count(event_type, -1)

            raise

Wait for a given event to occur once, then return the event.

Warning: Async predicates are not supported.

Parameters
  • event_type (typing.Type[hikari.events.base_events.Event]): The event type to listen for. This will listen for subclasses of this type additionally.
  • predicate: A function taking the event as the single parameter. This should return True if the event is one you want to return, or False if the event should not be returned. If left as None (the default), then the first matching event type that the bot receives (or any subtype) will be the one returned.
  • timeout (typing.Union[float, int, None]): The amount of time to wait before raising an asyncio.TimeoutError and giving up instead. This is measured in seconds. If None, then no timeout will be waited for (no timeout can result in "leaking" of coroutines that never complete if called in an uncontrolled way, so is not recommended).
Returns
Raises
  • asyncio.TimeoutError: If the timeout is not None and is reached before an event is received that the predicate returns True for.
See Also

hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.unsubscribe

View Source
class EventStream(event_manager_.EventStream[base_events.EventT]):
    """An implementation of an event `EventStream` class.

    .. note::
        While calling `EventStream.filter` on an active "opened" event stream
        will return a wrapping lazy iterator, calling it on an inactive "closed"
        event stream will return the event stream and add the given predicates
        to the streamer.
    """

    __slots__: typing.Sequence[str] = (
        "__weakref__",
        "_active",
        "_event",
        "_event_manager",
        "_event_type",
        "_filters",
        "_limit",
        "_queue",
        "_registered_listener",
        "_timeout",
    )

    __weakref__: typing.Optional[weakref.ref[EventStream[base_events.EventT]]]

    def __init__(
        self,
        event_manager: event_manager_.EventManager,
        event_type: typing.Type[base_events.EventT],
        *,
        timeout: typing.Union[float, int, None],
        limit: typing.Optional[int] = None,
    ) -> None:
        self._active = False
        self._event: typing.Optional[asyncio.Event] = None
        self._event_manager = event_manager
        self._event_type = event_type
        self._filters: iterators.All[base_events.EventT] = iterators.All(())
        self._limit = limit
        self._queue: typing.List[base_events.EventT] = []
        self._registered_listener: typing.Optional[
            typing.Callable[[base_events.EventT], typing.Coroutine[typing.Any, typing.Any, None]]
        ] = None
        # The registered wrapping function for the weak ref to this class's _listener method.
        self._timeout = timeout

    # These are only included at runtime in-order to avoid the model being typed as an asynchronous context manager.
    if not typing.TYPE_CHECKING:

        async def __aenter__(self: event_manager_.EventStreamT) -> event_manager_.EventStreamT:
            # This is sync only.
            warnings.warn(
                "Using EventStream as an async context manager has been deprecated since 2.0.0.dev104. "
                "Please use it as a synchronous context manager (e.g. with bot.stream(...)) instead.",
                category=DeprecationWarning,
                stacklevel=2,
            )

            self.open()
            return self

        async def __aexit__(
            self,
            exc_type: typing.Optional[typing.Type[BaseException]],
            exc: typing.Optional[BaseException],
            exc_tb: typing.Optional[types.TracebackType],
        ) -> None:
            self.close()

    def __enter__(self: _EventStreamT) -> _EventStreamT:
        self.open()
        return self

    def __exit__(
        self,
        exc_type: typing.Optional[typing.Type[BaseException]],
        exc_val: typing.Optional[BaseException],
        exc_tb: typing.Optional[types.TracebackType],
    ) -> None:
        self.close()

    async def __anext__(self) -> base_events.EventT:
        if not self._active:
            raise TypeError("stream must be started with before entering it")

        while not self._queue:
            if not self._event:
                self._event = asyncio.Event()

            try:
                await asyncio.wait_for(self._event.wait(), timeout=self._timeout)
            except asyncio.TimeoutError:
                raise StopAsyncIteration from None

            self._event.clear()

        return self._queue.pop(0)

    def __await__(self) -> typing.Generator[None, None, typing.Sequence[base_events.EventT]]:
        return self._await_all().__await__()

    def __del__(self) -> None:
        # For the sake of protecting highly intelligent people who forget to close this, we register the event listener
        # with a weakref then try to close this on deletion. While this may lead to their consoles being spammed, this
        # is a small price to pay as it'll be way more obvious what's wrong than if we just left them with a vague
        # ominous memory leak.
        if self._active:
            _LOGGER.warning("active %r streamer fell out of scope before being closed", self._event_type.__name__)
            self.close()

    async def _await_all(self) -> typing.Sequence[base_events.EventT]:
        self.open()
        result = [event async for event in self]
        self.close()
        return result

    async def _listener(self, event: base_events.EventT) -> None:
        if not self._filters(event) or (self._limit is not None and len(self._queue) >= self._limit):
            return

        self._queue.append(event)
        if self._event:
            self._event.set()

    def close(self) -> None:
        if self._active and self._registered_listener is not None:
            try:
                self._event_manager.unsubscribe(self._event_type, self._registered_listener)
            except ValueError:
                pass

            self._registered_listener = None

        self._active = False

    def filter(
        self: _EventStreamT,
        *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[base_events.EventT], bool]],
        **attrs: typing.Any,
    ) -> _EventStreamT:
        filter_ = self._map_predicates_and_attr_getters("filter", *predicates, **attrs)
        if self._active:
            self._queue = [entry for entry in self._queue if filter_(entry)]

        self._filters |= filter_
        return self

    def open(self) -> None:
        if not self._active:
            # For the sake of protecting highly intelligent people who forget to close this, we register the event
            # listener with a weakref then try to close this on deletion. While this may lead to their consoles being
            # spammed, this is a small price to pay as it'll be way more obvious what's wrong than if we just left them
            # with a vague ominous memory leak.
            reference = weakref.WeakMethod(self._listener)
            listener = _generate_weak_listener(reference)
            self._registered_listener = listener
            self._event_manager.subscribe(self._event_type, listener)
            self._active = True

An implementation of an event EventStream class.

Note: While calling EventStream.filter on an active "opened" event stream will return a wrapping lazy iterator, calling it on an inactive "closed" event stream will return the event stream and add the given predicates to the streamer.

Methods
#  def __init__(
   self,
   event_manager: hikari.api.event_manager.EventManager,
   event_type: Type[~EventT],
   *,
   timeout: Union[float, int, NoneType],
   limit: Optional[int] = None
):
View Source
    def __init__(
        self,
        event_manager: event_manager_.EventManager,
        event_type: typing.Type[base_events.EventT],
        *,
        timeout: typing.Union[float, int, None],
        limit: typing.Optional[int] = None,
    ) -> None:
        self._active = False
        self._event: typing.Optional[asyncio.Event] = None
        self._event_manager = event_manager
        self._event_type = event_type
        self._filters: iterators.All[base_events.EventT] = iterators.All(())
        self._limit = limit
        self._queue: typing.List[base_events.EventT] = []
        self._registered_listener: typing.Optional[
            typing.Callable[[base_events.EventT], typing.Coroutine[typing.Any, typing.Any, None]]
        ] = None
        # The registered wrapping function for the weak ref to this class's _listener method.
        self._timeout = timeout
#  def awaiting(
   self,
   window_size: int = 10
) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
    def awaiting(self, window_size: int = 10) -> LazyIterator[ValueT]:
        """Await each item concurrently in a fixed size window.

        .. warning::
            Setting a large window size, or setting it to 0 to await everything
            is a dangerous thing to do if you are making API calls. Some
            endpoints will get ratelimited and cause a backup of waiting
            tasks, others may begin to spam global rate limits instead
            (the `fetch_user` endpoint seems to be notorious for doing this).

        .. note::
            This call assumes that the iterator contains awaitable values as
            input. MyPy cannot detect this nicely, so any cast is forced
            internally.
            If the item is not awaitable, you will receive a
            `TypeError` instead.
            You have been warned. You cannot escape the ways of the duck type
            young grasshopper.

        Parameters
        ----------
        window_size : int
            The window size of how many tasks to await at once. You can set this
            to `0` to await everything at once, but see the below warning.

        Returns
        -------
        LazyIterator[ValueT]
            The new lazy iterator to return.
        """
        # Not type safe. Can I make this type safe?
        return _AwaitingLazyIterator(typing.cast("LazyIterator[typing.Awaitable[ValueT]]", self), window_size)

Await each item concurrently in a fixed size window.

Warning: Setting a large window size, or setting it to 0 to await everything is a dangerous thing to do if you are making API calls. Some endpoints will get ratelimited and cause a backup of waiting tasks, others may begin to spam global rate limits instead (the fetch_user endpoint seems to be notorious for doing this).

Note: This call assumes that the iterator contains awaitable values as input. MyPy cannot detect this nicely, so any cast is forced internally. If the item is not awaitable, you will receive a TypeError instead. You have been warned. You cannot escape the ways of the duck type young grasshopper.

Parameters
  • window_size (int): The window size of how many tasks to await at once. You can set this to 0 to await everything at once, but see the below warning.
Returns
  • LazyIterator[ValueT]: The new lazy iterator to return.
#  def chunk(
   self,
   chunk_size: int
) -> hikari.iterators.LazyIterator[typing.Sequence[~ValueT]]:
View Source
    def chunk(self, chunk_size: int) -> LazyIterator[typing.Sequence[ValueT]]:
        """Return results in chunks of up to `chunk_size` amount of entries.

        Parameters
        ----------
        chunk_size : int
            The limit for how many results should be returned in each chunk.

        Returns
        -------
        LazyIterator[typing.Sequence[ValueT]]
            `LazyIterator` that emits each chunked sequence.
        """
        return _ChunkedLazyIterator(self, chunk_size)

Return results in chunks of up to chunk_size amount of entries.

Parameters
  • chunk_size (int): The limit for how many results should be returned in each chunk.
Returns
  • LazyIterator[typing.Sequence[ValueT]]: LazyIterator that emits each chunked sequence.
#  def close(self) -> None:
View Source
    def close(self) -> None:
        if self._active and self._registered_listener is not None:
            try:
                self._event_manager.unsubscribe(self._event_type, self._registered_listener)
            except ValueError:
                pass

            self._registered_listener = None

        self._active = False

Mark this streamer as closed to stop it from queueing and receiving events.

If called on an already closed streamer then this will do nothing.

Note: with streamer may be used as a short-cut for opening and closing a streamer.

#  async def collect(
   self,
   collector: Callable[[Sequence[~ValueT]], Collection[~ValueT]]
) -> Collection[~ValueT]:
View Source
    async def collect(
        self, collector: typing.Callable[[typing.Sequence[ValueT]], typing.Collection[ValueT]]
    ) -> typing.Collection[ValueT]:
        """Collect the results into a given type and return it.

        Parameters
        ----------
        collector
            A function that consumes a sequence of values and returns a
            collection.
        """
        return collector(await self)

Collect the results into a given type and return it.

Parameters
  • collector: A function that consumes a sequence of values and returns a collection.
#  async def count(self) -> int:
View Source
    async def count(self) -> int:
        """Count the number of results.

        Returns
        -------
        int
            Number of results found.
        """
        count = 0
        async for _ in self:
            count += 1

        return count

Count the number of results.

Returns
  • int: Number of results found.
#  def enumerate(
   self,
   *,
   start: int = 0
) -> hikari.iterators.LazyIterator[typing.Tuple[int, ~ValueT]]:
View Source
    def enumerate(self, *, start: int = 0) -> LazyIterator[typing.Tuple[int, ValueT]]:
        """Enumerate the paginated results lazily.

        This behaves as an asyncio-friendly version of `enumerate`
        which uses much less memory than collecting all the results first and
        calling `enumerate` across them.

        Parameters
        ----------
        start : int
            Optional int to start at. If omitted, this is `0`.

        Examples
        --------
        ```py
        >>> async for i, item in paginated_results.enumerate():
        ...    print(i, item)
        (0, foo)
        (1, bar)
        (2, baz)
        (3, bork)
        (4, qux)

        >>> async for i, item in paginated_results.enumerate(start=9):
        ...    print(i, item)
        (9, foo)
        (10, bar)
        (11, baz)
        (12, bork)
        (13, qux)

        >>> async for i, item in paginated_results.enumerate(start=9).limit(3):
        ...    print(i, item)
        (9, foo)
        (10, bar)
        (11, baz)
        ```

        Returns
        -------
        LazyIterator[typing.Tuple[int, T]]
            A paginated results view that asynchronously yields an increasing
            counter in a tuple with each result, lazily.
        """
        return _EnumeratedLazyIterator(self, start=start)

Enumerate the paginated results lazily.

This behaves as an asyncio-friendly version of enumerate which uses much less memory than collecting all the results first and calling enumerate across them.

Parameters
  • start (int): Optional int to start at. If omitted, this is 0.
Examples
>>> async for i, item in paginated_results.enumerate():
...    print(i, item)
(0, foo)
(1, bar)
(2, baz)
(3, bork)
(4, qux)

>>> async for i, item in paginated_results.enumerate(start=9):
...    print(i, item)
(9, foo)
(10, bar)
(11, baz)
(12, bork)
(13, qux)

>>> async for i, item in paginated_results.enumerate(start=9).limit(3):
...    print(i, item)
(9, foo)
(10, bar)
(11, baz)
Returns
  • LazyIterator[typing.Tuple[int, T]]: A paginated results view that asynchronously yields an increasing counter in a tuple with each result, lazily.
#  def filter(
   self: ~_EventStreamT,
   *predicates: Union[Tuple[str, Any], Callable[[~EventT], bool]],
   **attrs: Any
) -> ~_EventStreamT:
View Source
    def filter(
        self: _EventStreamT,
        *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[base_events.EventT], bool]],
        **attrs: typing.Any,
    ) -> _EventStreamT:
        filter_ = self._map_predicates_and_attr_getters("filter", *predicates, **attrs)
        if self._active:
            self._queue = [entry for entry in self._queue if filter_(entry)]

        self._filters |= filter_
        return self

Filter the items by one or more conditions.

Each condition is treated as a predicate, being called with each item that this iterator would return when it is requested.

All conditions must evaluate to True for the item to be returned. If this is not met, then the item is discarded and ignored, the next matching item will be returned instead, if there is one.

Parameters
  • *predicates (typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]): Predicates to invoke. These are functions that take a value and return True if it is of interest, or False otherwise. These may instead include 2-tuple objects consisting of a str attribute name (nested attributes are referred to using the . operator), and values to compare for equality. This allows you to specify conditions such as members.filter(("user.bot", True)).
  • **attrs (typing.Any): Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
Returns
  • EventStream[ValueT]: The current stream with the new filter applied.
#  def flat_map(
   self,
   flattener: Union[hikari.internal.spel.AttrGetter[~ValueT, Union[AsyncIterator[~AnotherValueT], Iterable[~AnotherValueT]]], Callable[[~ValueT], Union[AsyncIterator[~AnotherValueT], Iterable[~AnotherValueT]]]]
) -> hikari.iterators.LazyIterator[~AnotherValueT]:
View Source
    def flat_map(self, flattener: _FlattenerT[ValueT, AnotherValueT]) -> LazyIterator[AnotherValueT]:
        r"""Perform a flat mapping operation.

        This will pass each item in the iterator to the given `function`
        parameter, expecting a new `typing.Iterable` or `typing.AsyncIterator`
        to be returned as the result. This means you can map to a new
        `LazyIterator`, `typing.AsyncIterator`, `typing.Iterable`,
        async generator, or generator.

        Remember that `typing.Iterator` implicitly provides `typing.Iterable`
        compatibility.

        This is used to provide lazy conversions, and can be used to implement
        reactive-like pipelines if desired.

        All results are combined into one large lazy iterator and yielded
        lazily.

        Parameters
        ----------
        flattener
            A function that returns either an async iterator or iterator of
            new values. Could be an attribute name instead.

        Example
        -------

        The following example generates a distinct collection of all mentioned
        users in the given channel from the past 500 messages.

        ```py
        def iter_mentioned_users(message: hikari.Message) -> typing.Iterable[Snowflake]:
            for match in re.findall(r"<@!?(\d+)>", message.content):
                yield Snowflake(match)

        mentioned_users = await (
            channel
            .history()
            .limit(500)
            .map(".content")
            .flat_map(iter_mentioned_users)
            .distinct()
        )
        ```

        Returns
        -------
        LazyIterator[AnotherValueT]
            The new lazy iterator to return.
        """
        return _FlatMapLazyIterator(self, flattener)

Perform a flat mapping operation.

This will pass each item in the iterator to the given function parameter, expecting a new typing.Iterable or typing.AsyncIterator to be returned as the result. This means you can map to a new LazyIterator, typing.AsyncIterator, typing.Iterable, async generator, or generator.

Remember that typing.Iterator implicitly provides typing.Iterable compatibility.

This is used to provide lazy conversions, and can be used to implement reactive-like pipelines if desired.

All results are combined into one large lazy iterator and yielded lazily.

Parameters
  • flattener: A function that returns either an async iterator or iterator of new values. Could be an attribute name instead.
Example

The following example generates a distinct collection of all mentioned users in the given channel from the past 500 messages.

def iter_mentioned_users(message: hikari.Message) -> typing.Iterable[Snowflake]:
    for match in re.findall(r"<@!?(\d+)>", message.content):
        yield Snowflake(match)

mentioned_users = await (
    channel
    .history()
    .limit(500)
    .map(".content")
    .flat_map(iter_mentioned_users)
    .distinct()
)
Returns
  • LazyIterator[AnotherValueT]: The new lazy iterator to return.
#  async def for_each(self, consumer: Callable[[~ValueT], Any]) -> None:
View Source
    async def for_each(self, consumer: typing.Callable[[ValueT], typing.Any]) -> None:
        """Pass each value to a given consumer immediately."""
        if asyncio.iscoroutinefunction(consumer):
            async for item in self:
                await consumer(item)
        else:
            async for item in self:
                consumer(item)

Pass each value to a given consumer immediately.

#  async def last(self) -> ~ValueT:
View Source
    async def last(self) -> ValueT:
        """Return the last element of this iterator only.

        .. note::
            This method will consume the whole iterator if run.

        Returns
        -------
        ValueT
            The last result.

        Raises
        ------
        LookupError
            If no result exists.
        """
        return await self.reversed().next()

Return the last element of this iterator only.

Note: This method will consume the whole iterator if run.

Returns
  • ValueT: The last result.
Raises
  • LookupError: If no result exists.
#  def limit(self, limit: int) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
    def limit(self, limit: int) -> LazyIterator[ValueT]:
        """Limit the number of items you receive from this async iterator.

        Parameters
        ----------
        limit : int
            The number of items to get. This must be greater than zero.

        Examples
        --------
        ```py
        >>> async for item in paginated_results.limit(3):
        ...     print(item)
        ```

        Returns
        -------
        LazyIterator[ValueT]
            A paginated results view that asynchronously yields a maximum
            of the given number of items before completing.
        """
        return _LimitedLazyIterator(self, limit)

Limit the number of items you receive from this async iterator.

Parameters
  • limit (int): The number of items to get. This must be greater than zero.
Examples
>>> async for item in paginated_results.limit(3):
...     print(item)
Returns
  • LazyIterator[ValueT]: A paginated results view that asynchronously yields a maximum of the given number of items before completing.
#  def map(
   self,
   transformation: Union[Callable[[~ValueT], ~AnotherValueT], str]
) -> hikari.iterators.LazyIterator[~AnotherValueT]:
View Source
    def map(
        self,
        transformation: typing.Union[typing.Callable[[ValueT], AnotherValueT], str],
    ) -> LazyIterator[AnotherValueT]:
        """Map the values to a different value.

        Parameters
        ----------
        transformation : typing.Union[typing.Callable[[ValueT], bool], str]
            The function to use to map the attribute. This may alternatively
            be a string attribute name to replace the input value with. You
            can provide nested attributes using the `.` operator.

        Returns
        -------
        LazyIterator[AnotherValueT]
            `LazyIterator` that maps each value to another value.
        """
        if isinstance(transformation, str):
            transformation = typing.cast("spel.AttrGetter[ValueT, AnotherValueT]", spel.AttrGetter(transformation))

        return _MappingLazyIterator(self, transformation)

Map the values to a different value.

Parameters
  • transformation (typing.Union[typing.Callable[[ValueT], bool], str]): The function to use to map the attribute. This may alternatively be a string attribute name to replace the input value with. You can provide nested attributes using the . operator.
Returns
  • LazyIterator[AnotherValueT]: LazyIterator that maps each value to another value.
#  async def next(self) -> ~ValueT:
View Source
    async def next(self) -> ValueT:
        """Return the next element of this iterator only.

        Returns
        -------
        ValueT
            The next result.

        Raises
        ------
        LookupError
            If no more results exist.
        """
        try:
            return await self.__anext__()
        except StopAsyncIteration:
            raise LookupError("No elements were found") from None

Return the next element of this iterator only.

Returns
  • ValueT: The next result.
Raises
  • LookupError: If no more results exist.
#  def open(self) -> None:
View Source
    def open(self) -> None:
        if not self._active:
            # For the sake of protecting highly intelligent people who forget to close this, we register the event
            # listener with a weakref then try to close this on deletion. While this may lead to their consoles being
            # spammed, this is a small price to pay as it'll be way more obvious what's wrong than if we just left them
            # with a vague ominous memory leak.
            reference = weakref.WeakMethod(self._listener)
            listener = _generate_weak_listener(reference)
            self._registered_listener = listener
            self._event_manager.subscribe(self._event_type, listener)
            self._active = True

Mark this streamer as opened to let it start receiving and queueing events.

If called on an already started streamer then this will do nothing.

Note: with streamer may be used as a short-cut for opening and closing a stream.

#  def reversed(self) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
    def reversed(self) -> LazyIterator[ValueT]:
        """Return a lazy iterator of the remainder of this iterator's values reversed.

        Returns
        -------
        LazyIterator[ValueT]
            The lazy iterator of this iterator's remaining values reversed.
        """
        return _ReversedLazyIterator(self)

Return a lazy iterator of the remainder of this iterator's values reversed.

Returns
  • LazyIterator[ValueT]: The lazy iterator of this iterator's remaining values reversed.
#  def skip(self, number: int) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
    def skip(self, number: int) -> LazyIterator[ValueT]:
        """Drop the given number of items, then yield anything after.

        Parameters
        ----------
        number : int
            The max number of items to drop before any items are yielded.

        Returns
        -------
        LazyIterator[ValueT]
            A paginated results view that asynchronously yields all items
            AFTER the given number of items are discarded first.
        """
        return _DropCountLazyIterator(self, number)

Drop the given number of items, then yield anything after.

Parameters
  • number (int): The max number of items to drop before any items are yielded.
Returns
  • LazyIterator[ValueT]: A paginated results view that asynchronously yields all items AFTER the given number of items are discarded first.
#  def skip_until(
   self,
   *predicates: Union[Tuple[str, Any], Callable[[~ValueT], bool]],
   **attrs: Any
) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
    def skip_until(
        self,
        *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[ValueT], bool]],
        **attrs: typing.Any,
    ) -> LazyIterator[ValueT]:
        """Discard items while all conditions are False.

        Items after this will be yielded as normal.

        Parameters
        ----------
        *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]
            Predicates to invoke. These are functions that take a value and
            return `True` if it is of interest, or `False`
            otherwise. These may instead include 2-`tuple` objects
            consisting of a `str` attribute name (nested attributes are
            referred to using the `.` operator), and values to compare for
            equality. This allows you to specify conditions such as
            `members.skip_until(("user.bot", True))`.
        **attrs : typing.Any
            Alternative to passing 2-tuples. Cannot specify nested attributes
            using this method.

        Returns
        -------
        LazyIterator[ValueT]
            LazyIterator that only emits values once a condition has failed.
            All items before this are discarded.
        """
        conditions = self._map_predicates_and_attr_getters("skip_until", *predicates, **attrs)
        return _DropWhileLazyIterator(self, ~conditions)

Discard items while all conditions are False.

Items after this will be yielded as normal.

Parameters
  • *predicates (typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]): Predicates to invoke. These are functions that take a value and return True if it is of interest, or False otherwise. These may instead include 2-tuple objects consisting of a str attribute name (nested attributes are referred to using the . operator), and values to compare for equality. This allows you to specify conditions such as members.skip_until(("user.bot", True)).
  • **attrs (typing.Any): Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
Returns
  • LazyIterator[ValueT]: LazyIterator that only emits values once a condition has failed. All items before this are discarded.
#  def skip_while(
   self,
   *predicates: Union[Tuple[str, Any], Callable[[~ValueT], bool]],
   **attrs: Any
) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
    def skip_while(
        self,
        *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[ValueT], bool]],
        **attrs: typing.Any,
    ) -> LazyIterator[ValueT]:
        """Discard items while all conditions are True.

        Items after this will be yielded as normal.

        Parameters
        ----------
        *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]
            Predicates to invoke. These are functions that take a value and
            return `True` if it is of interest, or `False`
            otherwise. These may instead include 2-`tuple` objects
            consisting of a `str` attribute name (nested attributes
            are referred to using the `.` operator), and values to compare for
            equality. This allows you to specify conditions such as
            `members.skip_while(("user.bot", True))`.
        **attrs : typing.Any
            Alternative to passing 2-tuples. Cannot specify nested attributes
            using this method.

        Returns
        -------
        LazyIterator[ValueT]
            LazyIterator that only emits values once a condition has been met.
            All items before this are discarded.
        """
        conditions = self._map_predicates_and_attr_getters("skip_while", *predicates, **attrs)
        return _DropWhileLazyIterator(self, conditions)

Discard items while all conditions are True.

Items after this will be yielded as normal.

Parameters
  • *predicates (typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]): Predicates to invoke. These are functions that take a value and return True if it is of interest, or False otherwise. These may instead include 2-tuple objects consisting of a str attribute name (nested attributes are referred to using the . operator), and values to compare for equality. This allows you to specify conditions such as members.skip_while(("user.bot", True)).
  • **attrs (typing.Any): Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
Returns
  • LazyIterator[ValueT]: LazyIterator that only emits values once a condition has been met. All items before this are discarded.
#  async def sort(self, *, key: Any = None, reverse: bool = False) -> Sequence[~ValueT]:
View Source
    async def sort(self, *, key: typing.Any = None, reverse: bool = False) -> typing.Sequence[ValueT]:
        """Collect all results, then sort the collection before returning it."""
        return sorted(await self, key=key, reverse=reverse)

Collect all results, then sort the collection before returning it.

#  def take_until(
   self,
   *predicates: Union[Tuple[str, Any], Callable[[~ValueT], bool]],
   **attrs: Any
) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
    def take_until(
        self,
        *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[ValueT], bool]],
        **attrs: typing.Any,
    ) -> LazyIterator[ValueT]:
        """Return each item until any conditions pass or the end is reached.

        Parameters
        ----------
        *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]
            Predicates to invoke. These are functions that take a value and
            return `True` if it is of interest, or `False`
            otherwise. These may instead include 2-`tuple` objects
            consisting of a `str` attribute name (nested attributes are
            referred to using the `.` operator), and values to compare for
            equality. This allows you to specify conditions such as
            `members.take_until(("user.bot", True))`.
        **attrs : typing.Any
            Alternative to passing 2-tuples. Cannot specify nested attributes
            using this method.

        Returns
        -------
        LazyIterator[ValueT]
            LazyIterator that only emits values until any conditions are
            matched.
        """
        conditions = self._map_predicates_and_attr_getters("take_until", *predicates, **attrs)
        return _TakeWhileLazyIterator(self, ~conditions)

Return each item until any conditions pass or the end is reached.

Parameters
  • *predicates (typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]): Predicates to invoke. These are functions that take a value and return True if it is of interest, or False otherwise. These may instead include 2-tuple objects consisting of a str attribute name (nested attributes are referred to using the . operator), and values to compare for equality. This allows you to specify conditions such as members.take_until(("user.bot", True)).
  • **attrs (typing.Any): Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
Returns
  • LazyIterator[ValueT]: LazyIterator that only emits values until any conditions are matched.
#  def take_while(
   self,
   *predicates: Union[Tuple[str, Any], Callable[[~ValueT], bool]],
   **attrs: Any
) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
    def take_while(
        self,
        *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[ValueT], bool]],
        **attrs: typing.Any,
    ) -> LazyIterator[ValueT]:
        """Return each item until any conditions fail or the end is reached.

        Parameters
        ----------
        *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]
            Predicates to invoke. These are functions that take a value and
            return `True` if it is of interest, or `False`
            otherwise. These may instead include 2-`tuple` objects
            consisting of a `str` attribute name (nested attributes
            are referred to using the `.` operator), and values to compare for
            equality. This allows you to specify conditions such as
            `members.take_while(("user.bot", True))`.
        **attrs : typing.Any
            Alternative to passing 2-tuples. Cannot specify nested attributes
            using this method.

        Returns
        -------
        LazyIterator[ValueT]
            LazyIterator that only emits values until any conditions are not
            matched.
        """
        conditions = self._map_predicates_and_attr_getters("take_while", *predicates, **attrs)
        return _TakeWhileLazyIterator(self, conditions)

Return each item until any conditions fail or the end is reached.

Parameters
  • *predicates (typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]): Predicates to invoke. These are functions that take a value and return True if it is of interest, or False otherwise. These may instead include 2-tuple objects consisting of a str attribute name (nested attributes are referred to using the . operator), and values to compare for equality. This allows you to specify conditions such as members.take_while(("user.bot", True)).
  • **attrs (typing.Any): Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
Returns
  • LazyIterator[ValueT]: LazyIterator that only emits values until any conditions are not matched.
#  def filtered(
   event_types: Union[Type[hikari.events.base_events.Event], Sequence[Type[hikari.events.base_events.Event]]],
   cache_components: hikari.api.config.CacheComponents = <CacheComponents.NONE: 0>,
   /
) -> Callable[[Callable[[~_EventManagerBaseT, hikari.api.shard.GatewayShard, Dict[str, Any]], Coroutine[Any, Any, NoneType]]], Callable[[~_EventManagerBaseT, hikari.api.shard.GatewayShard, Dict[str, Any]], Coroutine[Any, Any, NoneType]]]:
View Source
def filtered(
    event_types: typing.Union[typing.Type[base_events.Event], typing.Sequence[typing.Type[base_events.Event]]],
    cache_components: config.CacheComponents = config.CacheComponents.NONE,
    /,
) -> typing.Callable[[_UnboundMethodT[_EventManagerBaseT]], _UnboundMethodT[_EventManagerBaseT]]:
    """Add metadata to a consumer method to indicate when it should be unmarshalled and dispatched.

    Parameters
    ----------
    event_types
        Types of the events this raw consumer method may dispatch.
        This may either be a singular type of a sequence of types.

    Other Parameters
    ----------------
    cache_components : hikari.api.config.CacheComponents
        Bitfield of the cache components this event may make altering calls to.
        This defaults to `hikari.api.config.CacheComponents.NONE`.
    """
    if isinstance(event_types, typing.Sequence):
        # dict.fromkeys is used to remove any duplicate entries here
        event_types = tuple(dict.fromkeys(itertools.chain.from_iterable(e.dispatches() for e in event_types)))

    else:
        event_types = event_types.dispatches()

    bitmask = 0
    for event_type in event_types:
        bitmask |= event_type.bitmask()

    # https://github.com/python/mypy/issues/2087
    def decorator(method: _UnboundMethodT[_EventManagerBaseT], /) -> _UnboundMethodT[_EventManagerBaseT]:
        method.__cache_components__ = cache_components  # type: ignore[attr-defined]
        method.__events_bitmask__ = bitmask  # type: ignore[attr-defined]
        assert isinstance(method, _FilteredMethodT), "Incorrect attribute(s) set for a filtered method"
        return method  # type: ignore[unreachable]

    return decorator

Add metadata to a consumer method to indicate when it should be unmarshalled and dispatched.

Parameters
  • event_types: Types of the events this raw consumer method may dispatch. This may either be a singular type of a sequence of types.
Other Parameters