Back to top

hikari.api.event_manager

Core interface for components that manage events in the library.

View Source
# -*- coding: utf-8 -*-
# cython: language_level=3
# Copyright (c) 2020 Nekokatt
# Copyright (c) 2021-present davfsa
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Core interface for components that manage events in the library."""
from __future__ import annotations

__all__: typing.Sequence[str] = ("EventManager", "EventStream")

import abc
import asyncio
import typing

from hikari import iterators
from hikari.events import base_events

if typing.TYPE_CHECKING:
    import types

    from hikari.api import shard as gateway_shard
    from hikari.internal import data_binding

    PredicateT = typing.Callable[[base_events.EventT], bool]
    CallbackT = typing.Callable[[base_events.EventT], typing.Coroutine[typing.Any, typing.Any, None]]
    ConsumerT = typing.Callable[
        [gateway_shard.GatewayShard, data_binding.JSONObject], typing.Coroutine[typing.Any, typing.Any, None]
    ]

    _EventStreamT = typing.TypeVar("_EventStreamT")


class EventStream(iterators.LazyIterator[base_events.EventT], abc.ABC):
    """A base abstract class for all event streamers.

    Unlike `hikari.iterators.LazyIterator` (which this extends), an event
    streamer must be started and closed.

    Examples
    --------
    A streamer may either be started and closed using `with` syntax
    where `EventStream.open` and `EventStream.close` are implicitly called based on
    context.

    ```py
    with EventStream(app, EventType, timeout=50) as stream:
        async for entry in stream:
            ...
    ```

    A streamer may also be directly started and closed using the `EventStream.close`
    and `EventStream.open`. Note that if you don't call `EventStream.close` after
    opening a streamer when you're finished with it then it may queue events
    events in memory indefinitely.

    ```py
    stream = EventStream(app, EventType, timeout=50)
    await stream.open()
    async for event in stream:
        ...

    await stream.close()
    ```

    See Also
    --------
    `hikari.iterators.LazyIterator`
    """

    __slots__: typing.Sequence[str] = ()

    @abc.abstractmethod
    def close(self) -> None:
        """Mark this streamer as closed to stop it from queueing and receiving events.

        If called on an already closed streamer then this will do nothing.

        .. note::
            `with streamer` may be used as a short-cut for opening and
            closing a streamer.
        """

    @abc.abstractmethod
    def open(self) -> None:
        """Mark this streamer as opened to let it start receiving and queueing events.

        If called on an already started streamer then this will do nothing.

        .. note::
            `with streamer` may be used as a short-cut for opening and
            closing a stream.
        """

    @abc.abstractmethod
    def filter(
        self: _EventStreamT,
        *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[base_events.EventT], bool]],
        **attrs: typing.Any,
    ) -> _EventStreamT:
        """Filter the items by one or more conditions.

        Each condition is treated as a predicate, being called with each item
        that this iterator would return when it is requested.

        All conditions must evaluate to `True` for the item to be
        returned. If this is not met, then the item is discarded and ignored,
        the next matching item will be returned instead, if there is one.

        Parameters
        ----------
        *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]
            Predicates to invoke. These are functions that take a value and
            return `True` if it is of interest, or `False`
            otherwise. These may instead include 2-`tuple` objects
            consisting of a `str` attribute name (nested attributes
            are referred to using the `.` operator), and values to compare for
            equality. This allows you to specify conditions such as
            `members.filter(("user.bot", True))`.
        **attrs : typing.Any
            Alternative to passing 2-tuples. Cannot specify nested attributes
            using this method.

        Returns
        -------
        EventStream[ValueT]
            The current stream with the new filter applied.
        """

    @abc.abstractmethod
    def __enter__(self: _EventStreamT) -> _EventStreamT:
        raise NotImplementedError

    @abc.abstractmethod
    def __exit__(
        self,
        exc_type: typing.Optional[typing.Type[BaseException]],
        exc: typing.Optional[BaseException],
        exc_tb: typing.Optional[types.TracebackType],
    ) -> None:
        raise NotImplementedError


class EventManager(abc.ABC):
    """Base interface for event manager implementations.

    This is a listener of a `hikari.events.base_events.Event` object and
    consumer of raw event payloads, and is expected to invoke one or more
    corresponding event listeners where appropriate.
    """

    __slots__: typing.Sequence[str] = ()

    @abc.abstractmethod
    def consume_raw_event(
        self, event_name: str, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """Consume a raw event.

        Parameters
        ----------
        event_name : str
            The case-insensitive name of the event being triggered.
        shard : hikari.api.shard.GatewayShard
            Object of the shard that received this event.
        payload : hikari.internal.data_binding.JSONObject
            Payload of the event being triggered.

        Raises
        ------
        LookupError
            If there is no consumer for the event.
        """

    @abc.abstractmethod
    def dispatch(self, event: base_events.Event) -> asyncio.Future[typing.Any]:
        """Dispatch an event.

        Parameters
        ----------
        event : hikari.events.base_events.Event
            The event to dispatch.

        Example
        -------
        We can dispatch custom events by first defining a class that
        derives from `hikari.events.base_events.Event`.

        ```py
        import attr

        from hikari.traits import RESTAware
        from hikari.events.base_events import Event
        from hikari.users import User
        from hikari.snowflakes import Snowflake

        @attr.define()
        class EveryoneMentionedEvent(Event):
            app: RESTAware = attr.field()

            author: User = attr.field()
            '''The user who mentioned everyone.'''

            content: str = attr.field()
            '''The message that was sent.'''

            message_id: Snowflake = attr.field()
            '''The message ID.'''

            channel_id: Snowflake = attr.field()
            '''The channel ID.'''
        ```

        We can then dispatch our event as we see fit.

        ```py
        from hikari.events.messages import MessageCreateEvent

        @bot.listen(MessageCreateEvent)
        async def on_message(event):
            if "@everyone" in event.content or "@here" in event.content:
                event = EveryoneMentionedEvent(
                    author=event.author,
                    content=event.content,
                    message_id=event.id,
                    channel_id=event.channel_id,
                )

                bot.dispatch(event)
        ```

        This event can be listened to elsewhere by subscribing to it with
        `EventManager.subscribe`.

        ```py
        @bot.listen(EveryoneMentionedEvent)
        async def on_everyone_mentioned(event):
            print(event.user, "just pinged everyone in", event.channel_id)
        ```

        Returns
        -------
        asyncio.Future[typing.Any]
            A future that can be optionally awaited. If awaited, the future
            will complete once all corresponding event listeners have been
            invoked. If not awaited, this will schedule the dispatch of the
            events in the background for later.

        See Also
        --------
        `hikari.api.event_manager.EventManager.listen`
        `hikari.api.event_manager.EventManager.stream`
        `hikari.api.event_manager.EventManager.subscribe`
        `hikari.api.event_manager.EventManager.unsubscribe`
        `hikari.api.event_manager.EventManager.wait_for`
        """

    # Yes, this is not generic. The reason for this is MyPy complains about
    # using ABCs that are not concrete in generic types passed to functions.
    # For the sake of UX, I will check this at runtime instead and let the
    # user use a static type checker.
    @abc.abstractmethod
    def subscribe(self, event_type: typing.Type[typing.Any], callback: CallbackT[typing.Any]) -> None:
        """Subscribe a given callback to a given event type.

        Parameters
        ----------
        event_type : typing.Type[T]
            The event type to listen for. This will also listen for any
            subclasses of the given type.
            `T` must be a subclass of `hikari.events.base_events.Event`.
        callback
            Must be a coroutine function to invoke. This should
            consume an instance of the given event, or an instance of a valid
            subclass if one exists. Any result is discarded.

        Example
        -------
        The following demonstrates subscribing a callback to message creation
        events.

        ```py
        from hikari.events.messages import MessageCreateEvent

        async def on_message(event):
            ...

        bot.subscribe(MessageCreateEvent, on_message)
        ```

        See Also
        --------
        `hikari.api.event_manager.EventManager.dispatch`
        `hikari.api.event_manager.EventManager.listen`
        `hikari.api.event_manager.EventManager.stream`
        `hikari.api.event_manager.EventManager.unsubscribe`
        `hikari.api.event_manager.EventManager.wait_for`
        """

    # Yes, this is not generic. The reason for this is MyPy complains about
    # using ABCs that are not concrete in generic types passed to functions.
    # For the sake of UX, I will check this at runtime instead and let the
    # user use a static type checker.
    @abc.abstractmethod
    def unsubscribe(self, event_type: typing.Type[typing.Any], callback: CallbackT[typing.Any]) -> None:
        """Unsubscribe a given callback from a given event type, if present.

        Parameters
        ----------
        event_type : typing.Type[T]
            The event type to unsubscribe from. This must be the same exact
            type as was originally subscribed with to be removed correctly.
            `T` must derive from `hikari.events.base_events.Event`.
        callback
            The callback to unsubscribe.

        Example
        -------
        The following demonstrates unsubscribing a callback from a message
        creation event.

        ```py
        from hikari.events.messages import MessageCreateEvent

        async def on_message(event):
            ...

        bot.unsubscribe(MessageCreateEvent, on_message)
        ```

        See Also
        --------
        `hikari.api.event_manager.EventManager.dispatch`
        `hikari.api.event_manager.EventManager.listen`
        `hikari.api.event_manager.EventManager.stream`
        `hikari.api.event_manager.EventManager.subscribe`
        `hikari.api.event_manager.EventManager.wait_for`
        """

    @abc.abstractmethod
    def get_listeners(
        self,
        event_type: typing.Type[base_events.EventT],
        /,
        *,
        polymorphic: bool = True,
    ) -> typing.Collection[CallbackT[base_events.EventT]]:
        """Get the listeners for a given event type, if there are any.

        Parameters
        ----------
        event_type : typing.Type[T]
            The event type to look for.
            `T` must be a subclass of `hikari.events.base_events.Event`.
        polymorphic : bool
            If `True`, this will also return the listeners for all the
            event types `event_type` will dispatch. If `False`, then
            only listeners for this class specifically are returned. The
            default is `True`.

        Returns
        -------
        typing.Collection[typing.Callable[[T], typing.Coroutine[typing.Any, typing.Any, None]]
            A copy of the collection of listeners for the event. Will return
            an empty collection if nothing is registered.
        """

    @abc.abstractmethod
    def listen(
        self,
        *event_types: typing.Type[base_events.EventT],
    ) -> typing.Callable[[CallbackT[base_events.EventT]], CallbackT[base_events.EventT]]:
        """Generate a decorator to subscribe a callback to an event type.

        This is a second-order decorator.

        Parameters
        ----------
        *event_types : typing.Optional[typing.Type[T]]
            The event types to subscribe to. The implementation may allow this
            to be undefined. If this is the case, the event type will be inferred
            instead from the type hints on the function signature.
            `T` must be a subclass of `hikari.events.base_events.Event`.

        Returns
        -------
        typing.Callable[[T], T]
            A decorator for a coroutine function that passes it to
            `EventManager.subscribe` before returning the function
            reference.

        See Also
        --------
        `hikari.api.event_manager.EventManager.dispatch`
        `hikari.api.event_manager.EventManager.stream`
        `hikari.api.event_manager.EventManager.subscribe`
        `hikari.api.event_manager.EventManager.unsubscribe`
        `hikari.api.event_manager.EventManager.wait_for`
        """

    @abc.abstractmethod
    def stream(
        self,
        event_type: typing.Type[base_events.EventT],
        /,
        timeout: typing.Union[float, int, None],
        limit: typing.Optional[int] = None,
    ) -> EventStream[base_events.EventT]:
        """Return a stream iterator for the given event and sub-events.

        .. warning::
            If you use `await stream.open()` to start the stream then you must
            also close it with `await stream.close()` otherwise it may queue
            events in memory indefinitely.

        Parameters
        ----------
        event_type : typing.Type[hikari.events.base_events.Event]
            The event type to listen for. This will listen for subclasses of
            this type additionally.
        timeout : typing.Optional[int, float]
            How long this streamer should wait for the next event before
            ending the iteration. If `None` then this will continue
            until explicitly broken from.
        limit : typing.Optional[int]
            The limit for how many events this should queue at one time before
            dropping extra incoming events, leave this as `None` for
            the cache size to be unlimited.

        Returns
        -------
        EventStream[hikari.events.base_events.Event]
            The async iterator to handle streamed events. This must be started
            with `with stream:` or `stream.open()` before
            asynchronously iterating over it.

        Examples
        --------
        ```py
        with bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) as stream:
            async for user_id in stream.map("user_id").limit(50):
                ...
        ```

        or using `open()` and `close()`

        ```py
        stream = bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id))
        stream.open()

        async for user_id in stream.map("user_id").limit(50)
            ...

        stream.close()
        ```

        See Also
        --------
        `hikari.api.event_manager.EventManager.dispatch`
        `hikari.api.event_manager.EventManager.listen`
        `hikari.api.event_manager.EventManager.subscribe`
        `hikari.api.event_manager.EventManager.unsubscribe`
        `hikari.api.event_manager.EventManager.wait_for`
        """

    @abc.abstractmethod
    async def wait_for(
        self,
        event_type: typing.Type[base_events.EventT],
        /,
        timeout: typing.Union[float, int, None],
        predicate: typing.Optional[PredicateT[base_events.EventT]] = None,
    ) -> base_events.EventT:
        """Wait for a given event to occur once, then return the event.

        .. warning::
            Async predicates are not supported.

        Parameters
        ----------
        event_type : typing.Type[hikari.events.base_events.Event]
            The event type to listen for. This will listen for subclasses of
            this type additionally.
        predicate
            A function taking the event as the single parameter.
            This should return `True` if the event is one you want to
            return, or `False` if the event should not be returned.
            If left as `None` (the default), then the first matching event type
            that the bot receives (or any subtype) will be the one returned.
        timeout : typing.Union[float, int, None]
            The amount of time to wait before raising an `asyncio.TimeoutError`
            and giving up instead. This is measured in seconds. If
            `None`, then no timeout will be waited for (no timeout can
            result in "leaking" of coroutines that never complete if called in
            an uncontrolled way, so is not recommended).

        Returns
        -------
        hikari.events.base_events.Event
            The event that was provided.

        Raises
        ------
        asyncio.TimeoutError
            If the timeout is not `None` and is reached before an
            event is received that the predicate returns `True` for.

        See Also
        --------
        `hikari.api.event_manager.EventManager.dispatch`
        `hikari.api.event_manager.EventManager.listen`
        `hikari.api.event_manager.EventManager.stream`
        `hikari.api.event_manager.EventManager.subscribe`
        `hikari.api.event_manager.EventManager.unsubscribe`
        """
#  class EventManager(abc.ABC):
View Source
class EventManager(abc.ABC):
    """Base interface for event manager implementations.

    This is a listener of a `hikari.events.base_events.Event` object and
    consumer of raw event payloads, and is expected to invoke one or more
    corresponding event listeners where appropriate.
    """

    __slots__: typing.Sequence[str] = ()

    @abc.abstractmethod
    def consume_raw_event(
        self, event_name: str, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """Consume a raw event.

        Parameters
        ----------
        event_name : str
            The case-insensitive name of the event being triggered.
        shard : hikari.api.shard.GatewayShard
            Object of the shard that received this event.
        payload : hikari.internal.data_binding.JSONObject
            Payload of the event being triggered.

        Raises
        ------
        LookupError
            If there is no consumer for the event.
        """

    @abc.abstractmethod
    def dispatch(self, event: base_events.Event) -> asyncio.Future[typing.Any]:
        """Dispatch an event.

        Parameters
        ----------
        event : hikari.events.base_events.Event
            The event to dispatch.

        Example
        -------
        We can dispatch custom events by first defining a class that
        derives from `hikari.events.base_events.Event`.

        ```py
        import attr

        from hikari.traits import RESTAware
        from hikari.events.base_events import Event
        from hikari.users import User
        from hikari.snowflakes import Snowflake

        @attr.define()
        class EveryoneMentionedEvent(Event):
            app: RESTAware = attr.field()

            author: User = attr.field()
            '''The user who mentioned everyone.'''

            content: str = attr.field()
            '''The message that was sent.'''

            message_id: Snowflake = attr.field()
            '''The message ID.'''

            channel_id: Snowflake = attr.field()
            '''The channel ID.'''
        ```

        We can then dispatch our event as we see fit.

        ```py
        from hikari.events.messages import MessageCreateEvent

        @bot.listen(MessageCreateEvent)
        async def on_message(event):
            if "@everyone" in event.content or "@here" in event.content:
                event = EveryoneMentionedEvent(
                    author=event.author,
                    content=event.content,
                    message_id=event.id,
                    channel_id=event.channel_id,
                )

                bot.dispatch(event)
        ```

        This event can be listened to elsewhere by subscribing to it with
        `EventManager.subscribe`.

        ```py
        @bot.listen(EveryoneMentionedEvent)
        async def on_everyone_mentioned(event):
            print(event.user, "just pinged everyone in", event.channel_id)
        ```

        Returns
        -------
        asyncio.Future[typing.Any]
            A future that can be optionally awaited. If awaited, the future
            will complete once all corresponding event listeners have been
            invoked. If not awaited, this will schedule the dispatch of the
            events in the background for later.

        See Also
        --------
        `hikari.api.event_manager.EventManager.listen`
        `hikari.api.event_manager.EventManager.stream`
        `hikari.api.event_manager.EventManager.subscribe`
        `hikari.api.event_manager.EventManager.unsubscribe`
        `hikari.api.event_manager.EventManager.wait_for`
        """

    # Yes, this is not generic. The reason for this is MyPy complains about
    # using ABCs that are not concrete in generic types passed to functions.
    # For the sake of UX, I will check this at runtime instead and let the
    # user use a static type checker.
    @abc.abstractmethod
    def subscribe(self, event_type: typing.Type[typing.Any], callback: CallbackT[typing.Any]) -> None:
        """Subscribe a given callback to a given event type.

        Parameters
        ----------
        event_type : typing.Type[T]
            The event type to listen for. This will also listen for any
            subclasses of the given type.
            `T` must be a subclass of `hikari.events.base_events.Event`.
        callback
            Must be a coroutine function to invoke. This should
            consume an instance of the given event, or an instance of a valid
            subclass if one exists. Any result is discarded.

        Example
        -------
        The following demonstrates subscribing a callback to message creation
        events.

        ```py
        from hikari.events.messages import MessageCreateEvent

        async def on_message(event):
            ...

        bot.subscribe(MessageCreateEvent, on_message)
        ```

        See Also
        --------
        `hikari.api.event_manager.EventManager.dispatch`
        `hikari.api.event_manager.EventManager.listen`
        `hikari.api.event_manager.EventManager.stream`
        `hikari.api.event_manager.EventManager.unsubscribe`
        `hikari.api.event_manager.EventManager.wait_for`
        """

    # Yes, this is not generic. The reason for this is MyPy complains about
    # using ABCs that are not concrete in generic types passed to functions.
    # For the sake of UX, I will check this at runtime instead and let the
    # user use a static type checker.
    @abc.abstractmethod
    def unsubscribe(self, event_type: typing.Type[typing.Any], callback: CallbackT[typing.Any]) -> None:
        """Unsubscribe a given callback from a given event type, if present.

        Parameters
        ----------
        event_type : typing.Type[T]
            The event type to unsubscribe from. This must be the same exact
            type as was originally subscribed with to be removed correctly.
            `T` must derive from `hikari.events.base_events.Event`.
        callback
            The callback to unsubscribe.

        Example
        -------
        The following demonstrates unsubscribing a callback from a message
        creation event.

        ```py
        from hikari.events.messages import MessageCreateEvent

        async def on_message(event):
            ...

        bot.unsubscribe(MessageCreateEvent, on_message)
        ```

        See Also
        --------
        `hikari.api.event_manager.EventManager.dispatch`
        `hikari.api.event_manager.EventManager.listen`
        `hikari.api.event_manager.EventManager.stream`
        `hikari.api.event_manager.EventManager.subscribe`
        `hikari.api.event_manager.EventManager.wait_for`
        """

    @abc.abstractmethod
    def get_listeners(
        self,
        event_type: typing.Type[base_events.EventT],
        /,
        *,
        polymorphic: bool = True,
    ) -> typing.Collection[CallbackT[base_events.EventT]]:
        """Get the listeners for a given event type, if there are any.

        Parameters
        ----------
        event_type : typing.Type[T]
            The event type to look for.
            `T` must be a subclass of `hikari.events.base_events.Event`.
        polymorphic : bool
            If `True`, this will also return the listeners for all the
            event types `event_type` will dispatch. If `False`, then
            only listeners for this class specifically are returned. The
            default is `True`.

        Returns
        -------
        typing.Collection[typing.Callable[[T], typing.Coroutine[typing.Any, typing.Any, None]]
            A copy of the collection of listeners for the event. Will return
            an empty collection if nothing is registered.
        """

    @abc.abstractmethod
    def listen(
        self,
        *event_types: typing.Type[base_events.EventT],
    ) -> typing.Callable[[CallbackT[base_events.EventT]], CallbackT[base_events.EventT]]:
        """Generate a decorator to subscribe a callback to an event type.

        This is a second-order decorator.

        Parameters
        ----------
        *event_types : typing.Optional[typing.Type[T]]
            The event types to subscribe to. The implementation may allow this
            to be undefined. If this is the case, the event type will be inferred
            instead from the type hints on the function signature.
            `T` must be a subclass of `hikari.events.base_events.Event`.

        Returns
        -------
        typing.Callable[[T], T]
            A decorator for a coroutine function that passes it to
            `EventManager.subscribe` before returning the function
            reference.

        See Also
        --------
        `hikari.api.event_manager.EventManager.dispatch`
        `hikari.api.event_manager.EventManager.stream`
        `hikari.api.event_manager.EventManager.subscribe`
        `hikari.api.event_manager.EventManager.unsubscribe`
        `hikari.api.event_manager.EventManager.wait_for`
        """

    @abc.abstractmethod
    def stream(
        self,
        event_type: typing.Type[base_events.EventT],
        /,
        timeout: typing.Union[float, int, None],
        limit: typing.Optional[int] = None,
    ) -> EventStream[base_events.EventT]:
        """Return a stream iterator for the given event and sub-events.

        .. warning::
            If you use `await stream.open()` to start the stream then you must
            also close it with `await stream.close()` otherwise it may queue
            events in memory indefinitely.

        Parameters
        ----------
        event_type : typing.Type[hikari.events.base_events.Event]
            The event type to listen for. This will listen for subclasses of
            this type additionally.
        timeout : typing.Optional[int, float]
            How long this streamer should wait for the next event before
            ending the iteration. If `None` then this will continue
            until explicitly broken from.
        limit : typing.Optional[int]
            The limit for how many events this should queue at one time before
            dropping extra incoming events, leave this as `None` for
            the cache size to be unlimited.

        Returns
        -------
        EventStream[hikari.events.base_events.Event]
            The async iterator to handle streamed events. This must be started
            with `with stream:` or `stream.open()` before
            asynchronously iterating over it.

        Examples
        --------
        ```py
        with bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) as stream:
            async for user_id in stream.map("user_id").limit(50):
                ...
        ```

        or using `open()` and `close()`

        ```py
        stream = bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id))
        stream.open()

        async for user_id in stream.map("user_id").limit(50)
            ...

        stream.close()
        ```

        See Also
        --------
        `hikari.api.event_manager.EventManager.dispatch`
        `hikari.api.event_manager.EventManager.listen`
        `hikari.api.event_manager.EventManager.subscribe`
        `hikari.api.event_manager.EventManager.unsubscribe`
        `hikari.api.event_manager.EventManager.wait_for`
        """

    @abc.abstractmethod
    async def wait_for(
        self,
        event_type: typing.Type[base_events.EventT],
        /,
        timeout: typing.Union[float, int, None],
        predicate: typing.Optional[PredicateT[base_events.EventT]] = None,
    ) -> base_events.EventT:
        """Wait for a given event to occur once, then return the event.

        .. warning::
            Async predicates are not supported.

        Parameters
        ----------
        event_type : typing.Type[hikari.events.base_events.Event]
            The event type to listen for. This will listen for subclasses of
            this type additionally.
        predicate
            A function taking the event as the single parameter.
            This should return `True` if the event is one you want to
            return, or `False` if the event should not be returned.
            If left as `None` (the default), then the first matching event type
            that the bot receives (or any subtype) will be the one returned.
        timeout : typing.Union[float, int, None]
            The amount of time to wait before raising an `asyncio.TimeoutError`
            and giving up instead. This is measured in seconds. If
            `None`, then no timeout will be waited for (no timeout can
            result in "leaking" of coroutines that never complete if called in
            an uncontrolled way, so is not recommended).

        Returns
        -------
        hikari.events.base_events.Event
            The event that was provided.

        Raises
        ------
        asyncio.TimeoutError
            If the timeout is not `None` and is reached before an
            event is received that the predicate returns `True` for.

        See Also
        --------
        `hikari.api.event_manager.EventManager.dispatch`
        `hikari.api.event_manager.EventManager.listen`
        `hikari.api.event_manager.EventManager.stream`
        `hikari.api.event_manager.EventManager.subscribe`
        `hikari.api.event_manager.EventManager.unsubscribe`
        """

Base interface for event manager implementations.

This is a listener of a hikari.events.base_events.Event object and consumer of raw event payloads, and is expected to invoke one or more corresponding event listeners where appropriate.

Methods
#  
@abc.abstractmethod
def consume_raw_event(
   self,
   event_name: str,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @abc.abstractmethod
    def consume_raw_event(
        self, event_name: str, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """Consume a raw event.

        Parameters
        ----------
        event_name : str
            The case-insensitive name of the event being triggered.
        shard : hikari.api.shard.GatewayShard
            Object of the shard that received this event.
        payload : hikari.internal.data_binding.JSONObject
            Payload of the event being triggered.

        Raises
        ------
        LookupError
            If there is no consumer for the event.
        """

Consume a raw event.

Parameters
Raises
  • LookupError: If there is no consumer for the event.
#  
@abc.abstractmethod
def dispatch(
   self,
   event: hikari.events.base_events.Event
) -> _asyncio.Future[typing.Any]:
View Source
    @abc.abstractmethod
    def dispatch(self, event: base_events.Event) -> asyncio.Future[typing.Any]:
        """Dispatch an event.

        Parameters
        ----------
        event : hikari.events.base_events.Event
            The event to dispatch.

        Example
        -------
        We can dispatch custom events by first defining a class that
        derives from `hikari.events.base_events.Event`.

        ```py
        import attr

        from hikari.traits import RESTAware
        from hikari.events.base_events import Event
        from hikari.users import User
        from hikari.snowflakes import Snowflake

        @attr.define()
        class EveryoneMentionedEvent(Event):
            app: RESTAware = attr.field()

            author: User = attr.field()
            '''The user who mentioned everyone.'''

            content: str = attr.field()
            '''The message that was sent.'''

            message_id: Snowflake = attr.field()
            '''The message ID.'''

            channel_id: Snowflake = attr.field()
            '''The channel ID.'''
        ```

        We can then dispatch our event as we see fit.

        ```py
        from hikari.events.messages import MessageCreateEvent

        @bot.listen(MessageCreateEvent)
        async def on_message(event):
            if "@everyone" in event.content or "@here" in event.content:
                event = EveryoneMentionedEvent(
                    author=event.author,
                    content=event.content,
                    message_id=event.id,
                    channel_id=event.channel_id,
                )

                bot.dispatch(event)
        ```

        This event can be listened to elsewhere by subscribing to it with
        `EventManager.subscribe`.

        ```py
        @bot.listen(EveryoneMentionedEvent)
        async def on_everyone_mentioned(event):
            print(event.user, "just pinged everyone in", event.channel_id)
        ```

        Returns
        -------
        asyncio.Future[typing.Any]
            A future that can be optionally awaited. If awaited, the future
            will complete once all corresponding event listeners have been
            invoked. If not awaited, this will schedule the dispatch of the
            events in the background for later.

        See Also
        --------
        `hikari.api.event_manager.EventManager.listen`
        `hikari.api.event_manager.EventManager.stream`
        `hikari.api.event_manager.EventManager.subscribe`
        `hikari.api.event_manager.EventManager.unsubscribe`
        `hikari.api.event_manager.EventManager.wait_for`
        """

Dispatch an event.

Parameters
Example

We can dispatch custom events by first defining a class that derives from hikari.events.base_events.Event.

import attr

from hikari.traits import RESTAware
from hikari.events.base_events import Event
from hikari.users import User
from hikari.snowflakes import Snowflake

@attr.define()
class EveryoneMentionedEvent(Event):
    app: RESTAware = attr.field()

    author: User = attr.field()
    '''The user who mentioned everyone.'''

    content: str = attr.field()
    '''The message that was sent.'''

    message_id: Snowflake = attr.field()
    '''The message ID.'''

    channel_id: Snowflake = attr.field()
    '''The channel ID.'''

We can then dispatch our event as we see fit.

from hikari.events.messages import MessageCreateEvent

@bot.listen(MessageCreateEvent)
async def on_message(event):
    if "@everyone" in event.content or "@here" in event.content:
        event = EveryoneMentionedEvent(
            author=event.author,
            content=event.content,
            message_id=event.id,
            channel_id=event.channel_id,
        )

        bot.dispatch(event)

This event can be listened to elsewhere by subscribing to it with EventManager.subscribe.

@bot.listen(EveryoneMentionedEvent)
async def on_everyone_mentioned(event):
    print(event.user, "just pinged everyone in", event.channel_id)
Returns
  • asyncio.Future[typing.Any]: A future that can be optionally awaited. If awaited, the future will complete once all corresponding event listeners have been invoked. If not awaited, this will schedule the dispatch of the events in the background for later.
See Also

hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.unsubscribe
hikari.api.event_manager.EventManager.wait_for

#  
@abc.abstractmethod
def get_listeners(
   self,
   event_type: Type[~EventT],
   /,
   *,
   polymorphic: bool = True
) -> Collection[Callable[[~EventT], Coroutine[Any, Any, NoneType]]]:
View Source
    @abc.abstractmethod
    def get_listeners(
        self,
        event_type: typing.Type[base_events.EventT],
        /,
        *,
        polymorphic: bool = True,
    ) -> typing.Collection[CallbackT[base_events.EventT]]:
        """Get the listeners for a given event type, if there are any.

        Parameters
        ----------
        event_type : typing.Type[T]
            The event type to look for.
            `T` must be a subclass of `hikari.events.base_events.Event`.
        polymorphic : bool
            If `True`, this will also return the listeners for all the
            event types `event_type` will dispatch. If `False`, then
            only listeners for this class specifically are returned. The
            default is `True`.

        Returns
        -------
        typing.Collection[typing.Callable[[T], typing.Coroutine[typing.Any, typing.Any, None]]
            A copy of the collection of listeners for the event. Will return
            an empty collection if nothing is registered.
        """

Get the listeners for a given event type, if there are any.

Parameters
  • event_type (typing.Type[T]): The event type to look for. T must be a subclass of hikari.events.base_events.Event.
  • polymorphic (bool): If True, this will also return the listeners for all the event types event_type will dispatch. If False, then only listeners for this class specifically are returned. The default is True.
Returns
  • typing.Collection[typing.Callable[[T], typing.Coroutine[typing.Any, typing.Any, None]]: A copy of the collection of listeners for the event. Will return an empty collection if nothing is registered.
#  
@abc.abstractmethod
def listen(
   self,
   *event_types: Type[~EventT]
) -> Callable[[Callable[[~EventT], Coroutine[Any, Any, NoneType]]], Callable[[~EventT], Coroutine[Any, Any, NoneType]]]:
View Source
    @abc.abstractmethod
    def listen(
        self,
        *event_types: typing.Type[base_events.EventT],
    ) -> typing.Callable[[CallbackT[base_events.EventT]], CallbackT[base_events.EventT]]:
        """Generate a decorator to subscribe a callback to an event type.

        This is a second-order decorator.

        Parameters
        ----------
        *event_types : typing.Optional[typing.Type[T]]
            The event types to subscribe to. The implementation may allow this
            to be undefined. If this is the case, the event type will be inferred
            instead from the type hints on the function signature.
            `T` must be a subclass of `hikari.events.base_events.Event`.

        Returns
        -------
        typing.Callable[[T], T]
            A decorator for a coroutine function that passes it to
            `EventManager.subscribe` before returning the function
            reference.

        See Also
        --------
        `hikari.api.event_manager.EventManager.dispatch`
        `hikari.api.event_manager.EventManager.stream`
        `hikari.api.event_manager.EventManager.subscribe`
        `hikari.api.event_manager.EventManager.unsubscribe`
        `hikari.api.event_manager.EventManager.wait_for`
        """

Generate a decorator to subscribe a callback to an event type.

This is a second-order decorator.

Parameters
  • *event_types (typing.Optional[typing.Type[T]]): The event types to subscribe to. The implementation may allow this to be undefined. If this is the case, the event type will be inferred instead from the type hints on the function signature. T must be a subclass of hikari.events.base_events.Event.
Returns
  • typing.Callable[[T], T]: A decorator for a coroutine function that passes it to EventManager.subscribe before returning the function reference.
See Also

hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.unsubscribe
hikari.api.event_manager.EventManager.wait_for

#  
@abc.abstractmethod
def stream(
   self,
   event_type: Type[~EventT],
   /,
   timeout: Union[float, int, NoneType],
   limit: Optional[int] = None
) -> hikari.api.event_manager.EventStream[~EventT]:
View Source
    @abc.abstractmethod
    def stream(
        self,
        event_type: typing.Type[base_events.EventT],
        /,
        timeout: typing.Union[float, int, None],
        limit: typing.Optional[int] = None,
    ) -> EventStream[base_events.EventT]:
        """Return a stream iterator for the given event and sub-events.

        .. warning::
            If you use `await stream.open()` to start the stream then you must
            also close it with `await stream.close()` otherwise it may queue
            events in memory indefinitely.

        Parameters
        ----------
        event_type : typing.Type[hikari.events.base_events.Event]
            The event type to listen for. This will listen for subclasses of
            this type additionally.
        timeout : typing.Optional[int, float]
            How long this streamer should wait for the next event before
            ending the iteration. If `None` then this will continue
            until explicitly broken from.
        limit : typing.Optional[int]
            The limit for how many events this should queue at one time before
            dropping extra incoming events, leave this as `None` for
            the cache size to be unlimited.

        Returns
        -------
        EventStream[hikari.events.base_events.Event]
            The async iterator to handle streamed events. This must be started
            with `with stream:` or `stream.open()` before
            asynchronously iterating over it.

        Examples
        --------
        ```py
        with bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) as stream:
            async for user_id in stream.map("user_id").limit(50):
                ...
        ```

        or using `open()` and `close()`

        ```py
        stream = bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id))
        stream.open()

        async for user_id in stream.map("user_id").limit(50)
            ...

        stream.close()
        ```

        See Also
        --------
        `hikari.api.event_manager.EventManager.dispatch`
        `hikari.api.event_manager.EventManager.listen`
        `hikari.api.event_manager.EventManager.subscribe`
        `hikari.api.event_manager.EventManager.unsubscribe`
        `hikari.api.event_manager.EventManager.wait_for`
        """

Return a stream iterator for the given event and sub-events.

Warning: If you use await stream.open() to start the stream then you must also close it with await stream.close() otherwise it may queue events in memory indefinitely.

Parameters
  • event_type (typing.Type[hikari.events.base_events.Event]): The event type to listen for. This will listen for subclasses of this type additionally.
  • timeout (typing.Optional[int, float]): How long this streamer should wait for the next event before ending the iteration. If None then this will continue until explicitly broken from.
  • limit (typing.Optional[int]): The limit for how many events this should queue at one time before dropping extra incoming events, leave this as None for the cache size to be unlimited.
Returns
  • EventStream[hikari.events.base_events.Event]: The async iterator to handle streamed events. This must be started with with stream: or stream.open() before asynchronously iterating over it.
Examples
with bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) as stream:
    async for user_id in stream.map("user_id").limit(50):
        ...

or using open() and close()

stream = bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id))
stream.open()

async for user_id in stream.map("user_id").limit(50)
    ...

stream.close()
See Also

hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.unsubscribe
hikari.api.event_manager.EventManager.wait_for

#  
@abc.abstractmethod
def subscribe(
   self,
   event_type: Type[Any],
   callback: Callable[[Any], Coroutine[Any, Any, NoneType]]
) -> None:
View Source
    @abc.abstractmethod
    def subscribe(self, event_type: typing.Type[typing.Any], callback: CallbackT[typing.Any]) -> None:
        """Subscribe a given callback to a given event type.

        Parameters
        ----------
        event_type : typing.Type[T]
            The event type to listen for. This will also listen for any
            subclasses of the given type.
            `T` must be a subclass of `hikari.events.base_events.Event`.
        callback
            Must be a coroutine function to invoke. This should
            consume an instance of the given event, or an instance of a valid
            subclass if one exists. Any result is discarded.

        Example
        -------
        The following demonstrates subscribing a callback to message creation
        events.

        ```py
        from hikari.events.messages import MessageCreateEvent

        async def on_message(event):
            ...

        bot.subscribe(MessageCreateEvent, on_message)
        ```

        See Also
        --------
        `hikari.api.event_manager.EventManager.dispatch`
        `hikari.api.event_manager.EventManager.listen`
        `hikari.api.event_manager.EventManager.stream`
        `hikari.api.event_manager.EventManager.unsubscribe`
        `hikari.api.event_manager.EventManager.wait_for`
        """

Subscribe a given callback to a given event type.

Parameters
  • event_type (typing.Type[T]): The event type to listen for. This will also listen for any subclasses of the given type. T must be a subclass of hikari.events.base_events.Event.
  • callback: Must be a coroutine function to invoke. This should consume an instance of the given event, or an instance of a valid subclass if one exists. Any result is discarded.
Example

The following demonstrates subscribing a callback to message creation events.

from hikari.events.messages import MessageCreateEvent

async def on_message(event):
    ...

bot.subscribe(MessageCreateEvent, on_message)
See Also

hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.unsubscribe
hikari.api.event_manager.EventManager.wait_for

#  
@abc.abstractmethod
def unsubscribe(
   self,
   event_type: Type[Any],
   callback: Callable[[Any], Coroutine[Any, Any, NoneType]]
) -> None:
View Source
    @abc.abstractmethod
    def unsubscribe(self, event_type: typing.Type[typing.Any], callback: CallbackT[typing.Any]) -> None:
        """Unsubscribe a given callback from a given event type, if present.

        Parameters
        ----------
        event_type : typing.Type[T]
            The event type to unsubscribe from. This must be the same exact
            type as was originally subscribed with to be removed correctly.
            `T` must derive from `hikari.events.base_events.Event`.
        callback
            The callback to unsubscribe.

        Example
        -------
        The following demonstrates unsubscribing a callback from a message
        creation event.

        ```py
        from hikari.events.messages import MessageCreateEvent

        async def on_message(event):
            ...

        bot.unsubscribe(MessageCreateEvent, on_message)
        ```

        See Also
        --------
        `hikari.api.event_manager.EventManager.dispatch`
        `hikari.api.event_manager.EventManager.listen`
        `hikari.api.event_manager.EventManager.stream`
        `hikari.api.event_manager.EventManager.subscribe`
        `hikari.api.event_manager.EventManager.wait_for`
        """

Unsubscribe a given callback from a given event type, if present.

Parameters
  • event_type (typing.Type[T]): The event type to unsubscribe from. This must be the same exact type as was originally subscribed with to be removed correctly. T must derive from hikari.events.base_events.Event.
  • callback: The callback to unsubscribe.
Example

The following demonstrates unsubscribing a callback from a message creation event.

from hikari.events.messages import MessageCreateEvent

async def on_message(event):
    ...

bot.unsubscribe(MessageCreateEvent, on_message)
See Also

hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.wait_for

#  
@abc.abstractmethod
async def wait_for(
   self,
   event_type: Type[~EventT],
   /,
   timeout: Union[float, int, NoneType],
   predicate: Optional[Callable[[~EventT], bool]] = None
) -> ~EventT:
View Source
    @abc.abstractmethod
    async def wait_for(
        self,
        event_type: typing.Type[base_events.EventT],
        /,
        timeout: typing.Union[float, int, None],
        predicate: typing.Optional[PredicateT[base_events.EventT]] = None,
    ) -> base_events.EventT:
        """Wait for a given event to occur once, then return the event.

        .. warning::
            Async predicates are not supported.

        Parameters
        ----------
        event_type : typing.Type[hikari.events.base_events.Event]
            The event type to listen for. This will listen for subclasses of
            this type additionally.
        predicate
            A function taking the event as the single parameter.
            This should return `True` if the event is one you want to
            return, or `False` if the event should not be returned.
            If left as `None` (the default), then the first matching event type
            that the bot receives (or any subtype) will be the one returned.
        timeout : typing.Union[float, int, None]
            The amount of time to wait before raising an `asyncio.TimeoutError`
            and giving up instead. This is measured in seconds. If
            `None`, then no timeout will be waited for (no timeout can
            result in "leaking" of coroutines that never complete if called in
            an uncontrolled way, so is not recommended).

        Returns
        -------
        hikari.events.base_events.Event
            The event that was provided.

        Raises
        ------
        asyncio.TimeoutError
            If the timeout is not `None` and is reached before an
            event is received that the predicate returns `True` for.

        See Also
        --------
        `hikari.api.event_manager.EventManager.dispatch`
        `hikari.api.event_manager.EventManager.listen`
        `hikari.api.event_manager.EventManager.stream`
        `hikari.api.event_manager.EventManager.subscribe`
        `hikari.api.event_manager.EventManager.unsubscribe`
        """

Wait for a given event to occur once, then return the event.

Warning: Async predicates are not supported.

Parameters
  • event_type (typing.Type[hikari.events.base_events.Event]): The event type to listen for. This will listen for subclasses of this type additionally.
  • predicate: A function taking the event as the single parameter. This should return True if the event is one you want to return, or False if the event should not be returned. If left as None (the default), then the first matching event type that the bot receives (or any subtype) will be the one returned.
  • timeout (typing.Union[float, int, None]): The amount of time to wait before raising an asyncio.TimeoutError and giving up instead. This is measured in seconds. If None, then no timeout will be waited for (no timeout can result in "leaking" of coroutines that never complete if called in an uncontrolled way, so is not recommended).
Returns
Raises
  • asyncio.TimeoutError: If the timeout is not None and is reached before an event is received that the predicate returns True for.
See Also

hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.unsubscribe

#  class EventStream(hikari.iterators.LazyIterator[~EventT], abc.ABC):
View Source
class EventStream(iterators.LazyIterator[base_events.EventT], abc.ABC):
    """A base abstract class for all event streamers.

    Unlike `hikari.iterators.LazyIterator` (which this extends), an event
    streamer must be started and closed.

    Examples
    --------
    A streamer may either be started and closed using `with` syntax
    where `EventStream.open` and `EventStream.close` are implicitly called based on
    context.

    ```py
    with EventStream(app, EventType, timeout=50) as stream:
        async for entry in stream:
            ...
    ```

    A streamer may also be directly started and closed using the `EventStream.close`
    and `EventStream.open`. Note that if you don't call `EventStream.close` after
    opening a streamer when you're finished with it then it may queue events
    events in memory indefinitely.

    ```py
    stream = EventStream(app, EventType, timeout=50)
    await stream.open()
    async for event in stream:
        ...

    await stream.close()
    ```

    See Also
    --------
    `hikari.iterators.LazyIterator`
    """

    __slots__: typing.Sequence[str] = ()

    @abc.abstractmethod
    def close(self) -> None:
        """Mark this streamer as closed to stop it from queueing and receiving events.

        If called on an already closed streamer then this will do nothing.

        .. note::
            `with streamer` may be used as a short-cut for opening and
            closing a streamer.
        """

    @abc.abstractmethod
    def open(self) -> None:
        """Mark this streamer as opened to let it start receiving and queueing events.

        If called on an already started streamer then this will do nothing.

        .. note::
            `with streamer` may be used as a short-cut for opening and
            closing a stream.
        """

    @abc.abstractmethod
    def filter(
        self: _EventStreamT,
        *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[base_events.EventT], bool]],
        **attrs: typing.Any,
    ) -> _EventStreamT:
        """Filter the items by one or more conditions.

        Each condition is treated as a predicate, being called with each item
        that this iterator would return when it is requested.

        All conditions must evaluate to `True` for the item to be
        returned. If this is not met, then the item is discarded and ignored,
        the next matching item will be returned instead, if there is one.

        Parameters
        ----------
        *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]
            Predicates to invoke. These are functions that take a value and
            return `True` if it is of interest, or `False`
            otherwise. These may instead include 2-`tuple` objects
            consisting of a `str` attribute name (nested attributes
            are referred to using the `.` operator), and values to compare for
            equality. This allows you to specify conditions such as
            `members.filter(("user.bot", True))`.
        **attrs : typing.Any
            Alternative to passing 2-tuples. Cannot specify nested attributes
            using this method.

        Returns
        -------
        EventStream[ValueT]
            The current stream with the new filter applied.
        """

    @abc.abstractmethod
    def __enter__(self: _EventStreamT) -> _EventStreamT:
        raise NotImplementedError

    @abc.abstractmethod
    def __exit__(
        self,
        exc_type: typing.Optional[typing.Type[BaseException]],
        exc: typing.Optional[BaseException],
        exc_tb: typing.Optional[types.TracebackType],
    ) -> None:
        raise NotImplementedError

A base abstract class for all event streamers.

Unlike hikari.iterators.LazyIterator (which this extends), an event streamer must be started and closed.

Examples

A streamer may either be started and closed using with syntax where EventStream.open and EventStream.close are implicitly called based on context.

with EventStream(app, EventType, timeout=50) as stream:
    async for entry in stream:
        ...

A streamer may also be directly started and closed using the EventStream.close and EventStream.open. Note that if you don't call EventStream.close after opening a streamer when you're finished with it then it may queue events events in memory indefinitely.

stream = EventStream(app, EventType, timeout=50)
await stream.open()
async for event in stream:
    ...

await stream.close()
See Also

hikari.iterators.LazyIterator

Methods
#  def awaiting(
   self,
   window_size: int = 10
) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
    def awaiting(self, window_size: int = 10) -> LazyIterator[ValueT]:
        """Await each item concurrently in a fixed size window.

        .. warning::
            Setting a large window size, or setting it to 0 to await everything
            is a dangerous thing to do if you are making API calls. Some
            endpoints will get ratelimited and cause a backup of waiting
            tasks, others may begin to spam global rate limits instead
            (the `fetch_user` endpoint seems to be notorious for doing this).

        .. note::
            This call assumes that the iterator contains awaitable values as
            input. MyPy cannot detect this nicely, so any cast is forced
            internally.
            If the item is not awaitable, you will receive a
            `TypeError` instead.
            You have been warned. You cannot escape the ways of the duck type
            young grasshopper.

        Parameters
        ----------
        window_size : int
            The window size of how many tasks to await at once. You can set this
            to `0` to await everything at once, but see the below warning.

        Returns
        -------
        LazyIterator[ValueT]
            The new lazy iterator to return.
        """
        # Not type safe. Can I make this type safe?
        return _AwaitingLazyIterator(typing.cast("LazyIterator[typing.Awaitable[ValueT]]", self), window_size)

Await each item concurrently in a fixed size window.

Warning: Setting a large window size, or setting it to 0 to await everything is a dangerous thing to do if you are making API calls. Some endpoints will get ratelimited and cause a backup of waiting tasks, others may begin to spam global rate limits instead (the fetch_user endpoint seems to be notorious for doing this).

Note: This call assumes that the iterator contains awaitable values as input. MyPy cannot detect this nicely, so any cast is forced internally. If the item is not awaitable, you will receive a TypeError instead. You have been warned. You cannot escape the ways of the duck type young grasshopper.

Parameters
  • window_size (int): The window size of how many tasks to await at once. You can set this to 0 to await everything at once, but see the below warning.
Returns
  • LazyIterator[ValueT]: The new lazy iterator to return.
#  def chunk(
   self,
   chunk_size: int
) -> hikari.iterators.LazyIterator[typing.Sequence[~ValueT]]:
View Source
    def chunk(self, chunk_size: int) -> LazyIterator[typing.Sequence[ValueT]]:
        """Return results in chunks of up to `chunk_size` amount of entries.

        Parameters
        ----------
        chunk_size : int
            The limit for how many results should be returned in each chunk.

        Returns
        -------
        LazyIterator[typing.Sequence[ValueT]]
            `LazyIterator` that emits each chunked sequence.
        """
        return _ChunkedLazyIterator(self, chunk_size)

Return results in chunks of up to chunk_size amount of entries.

Parameters
  • chunk_size (int): The limit for how many results should be returned in each chunk.
Returns
  • LazyIterator[typing.Sequence[ValueT]]: LazyIterator that emits each chunked sequence.
#  
@abc.abstractmethod
def close(self) -> None:
View Source
    @abc.abstractmethod
    def close(self) -> None:
        """Mark this streamer as closed to stop it from queueing and receiving events.

        If called on an already closed streamer then this will do nothing.

        .. note::
            `with streamer` may be used as a short-cut for opening and
            closing a streamer.
        """

Mark this streamer as closed to stop it from queueing and receiving events.

If called on an already closed streamer then this will do nothing.

Note: with streamer may be used as a short-cut for opening and closing a streamer.

#  async def collect(
   self,
   collector: Callable[[Sequence[~ValueT]], Collection[~ValueT]]
) -> Collection[~ValueT]:
View Source
    async def collect(
        self, collector: typing.Callable[[typing.Sequence[ValueT]], typing.Collection[ValueT]]
    ) -> typing.Collection[ValueT]:
        """Collect the results into a given type and return it.

        Parameters
        ----------
        collector
            A function that consumes a sequence of values and returns a
            collection.
        """
        return collector(await self)

Collect the results into a given type and return it.

Parameters
  • collector: A function that consumes a sequence of values and returns a collection.
#  async def count(self) -> int:
View Source
    async def count(self) -> int:
        """Count the number of results.

        Returns
        -------
        int
            Number of results found.
        """
        count = 0
        async for _ in self:
            count += 1

        return count

Count the number of results.

Returns
  • int: Number of results found.
#  def enumerate(
   self,
   *,
   start: int = 0
) -> hikari.iterators.LazyIterator[typing.Tuple[int, ~ValueT]]:
View Source
    def enumerate(self, *, start: int = 0) -> LazyIterator[typing.Tuple[int, ValueT]]:
        """Enumerate the paginated results lazily.

        This behaves as an asyncio-friendly version of `enumerate`
        which uses much less memory than collecting all the results first and
        calling `enumerate` across them.

        Parameters
        ----------
        start : int
            Optional int to start at. If omitted, this is `0`.

        Examples
        --------
        ```py
        >>> async for i, item in paginated_results.enumerate():
        ...    print(i, item)
        (0, foo)
        (1, bar)
        (2, baz)
        (3, bork)
        (4, qux)

        >>> async for i, item in paginated_results.enumerate(start=9):
        ...    print(i, item)
        (9, foo)
        (10, bar)
        (11, baz)
        (12, bork)
        (13, qux)

        >>> async for i, item in paginated_results.enumerate(start=9).limit(3):
        ...    print(i, item)
        (9, foo)
        (10, bar)
        (11, baz)
        ```

        Returns
        -------
        LazyIterator[typing.Tuple[int, T]]
            A paginated results view that asynchronously yields an increasing
            counter in a tuple with each result, lazily.
        """
        return _EnumeratedLazyIterator(self, start=start)

Enumerate the paginated results lazily.

This behaves as an asyncio-friendly version of enumerate which uses much less memory than collecting all the results first and calling enumerate across them.

Parameters
  • start (int): Optional int to start at. If omitted, this is 0.
Examples
>>> async for i, item in paginated_results.enumerate():
...    print(i, item)
(0, foo)
(1, bar)
(2, baz)
(3, bork)
(4, qux)

>>> async for i, item in paginated_results.enumerate(start=9):
...    print(i, item)
(9, foo)
(10, bar)
(11, baz)
(12, bork)
(13, qux)

>>> async for i, item in paginated_results.enumerate(start=9).limit(3):
...    print(i, item)
(9, foo)
(10, bar)
(11, baz)
Returns
  • LazyIterator[typing.Tuple[int, T]]: A paginated results view that asynchronously yields an increasing counter in a tuple with each result, lazily.
#  
@abc.abstractmethod
def filter(
   self: ~_EventStreamT,
   *predicates: Union[Tuple[str, Any], Callable[[~EventT], bool]],
   **attrs: Any
) -> ~_EventStreamT:
View Source
    @abc.abstractmethod
    def filter(
        self: _EventStreamT,
        *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[base_events.EventT], bool]],
        **attrs: typing.Any,
    ) -> _EventStreamT:
        """Filter the items by one or more conditions.

        Each condition is treated as a predicate, being called with each item
        that this iterator would return when it is requested.

        All conditions must evaluate to `True` for the item to be
        returned. If this is not met, then the item is discarded and ignored,
        the next matching item will be returned instead, if there is one.

        Parameters
        ----------
        *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]
            Predicates to invoke. These are functions that take a value and
            return `True` if it is of interest, or `False`
            otherwise. These may instead include 2-`tuple` objects
            consisting of a `str` attribute name (nested attributes
            are referred to using the `.` operator), and values to compare for
            equality. This allows you to specify conditions such as
            `members.filter(("user.bot", True))`.
        **attrs : typing.Any
            Alternative to passing 2-tuples. Cannot specify nested attributes
            using this method.

        Returns
        -------
        EventStream[ValueT]
            The current stream with the new filter applied.
        """

Filter the items by one or more conditions.

Each condition is treated as a predicate, being called with each item that this iterator would return when it is requested.

All conditions must evaluate to True for the item to be returned. If this is not met, then the item is discarded and ignored, the next matching item will be returned instead, if there is one.

Parameters
  • *predicates (typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]): Predicates to invoke. These are functions that take a value and return True if it is of interest, or False otherwise. These may instead include 2-tuple objects consisting of a str attribute name (nested attributes are referred to using the . operator), and values to compare for equality. This allows you to specify conditions such as members.filter(("user.bot", True)).
  • **attrs (typing.Any): Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
Returns
  • EventStream[ValueT]: The current stream with the new filter applied.
#  def flat_map(
   self,
   flattener: Union[hikari.internal.spel.AttrGetter[~ValueT, Union[AsyncIterator[~AnotherValueT], Iterable[~AnotherValueT]]], Callable[[~ValueT], Union[AsyncIterator[~AnotherValueT], Iterable[~AnotherValueT]]]]
) -> hikari.iterators.LazyIterator[~AnotherValueT]:
View Source
    def flat_map(self, flattener: _FlattenerT[ValueT, AnotherValueT]) -> LazyIterator[AnotherValueT]:
        r"""Perform a flat mapping operation.

        This will pass each item in the iterator to the given `function`
        parameter, expecting a new `typing.Iterable` or `typing.AsyncIterator`
        to be returned as the result. This means you can map to a new
        `LazyIterator`, `typing.AsyncIterator`, `typing.Iterable`,
        async generator, or generator.

        Remember that `typing.Iterator` implicitly provides `typing.Iterable`
        compatibility.

        This is used to provide lazy conversions, and can be used to implement
        reactive-like pipelines if desired.

        All results are combined into one large lazy iterator and yielded
        lazily.

        Parameters
        ----------
        flattener
            A function that returns either an async iterator or iterator of
            new values. Could be an attribute name instead.

        Example
        -------

        The following example generates a distinct collection of all mentioned
        users in the given channel from the past 500 messages.

        ```py
        def iter_mentioned_users(message: hikari.Message) -> typing.Iterable[Snowflake]:
            for match in re.findall(r"<@!?(\d+)>", message.content):
                yield Snowflake(match)

        mentioned_users = await (
            channel
            .history()
            .limit(500)
            .map(".content")
            .flat_map(iter_mentioned_users)
            .distinct()
        )
        ```

        Returns
        -------
        LazyIterator[AnotherValueT]
            The new lazy iterator to return.
        """
        return _FlatMapLazyIterator(self, flattener)

Perform a flat mapping operation.

This will pass each item in the iterator to the given function parameter, expecting a new typing.Iterable or typing.AsyncIterator to be returned as the result. This means you can map to a new LazyIterator, typing.AsyncIterator, typing.Iterable, async generator, or generator.

Remember that typing.Iterator implicitly provides typing.Iterable compatibility.

This is used to provide lazy conversions, and can be used to implement reactive-like pipelines if desired.

All results are combined into one large lazy iterator and yielded lazily.

Parameters
  • flattener: A function that returns either an async iterator or iterator of new values. Could be an attribute name instead.
Example

The following example generates a distinct collection of all mentioned users in the given channel from the past 500 messages.

def iter_mentioned_users(message: hikari.Message) -> typing.Iterable[Snowflake]:
    for match in re.findall(r"<@!?(\d+)>", message.content):
        yield Snowflake(match)

mentioned_users = await (
    channel
    .history()
    .limit(500)
    .map(".content")
    .flat_map(iter_mentioned_users)
    .distinct()
)
Returns
  • LazyIterator[AnotherValueT]: The new lazy iterator to return.
#  async def for_each(self, consumer: Callable[[~ValueT], Any]) -> None:
View Source
    async def for_each(self, consumer: typing.Callable[[ValueT], typing.Any]) -> None:
        """Pass each value to a given consumer immediately."""
        if asyncio.iscoroutinefunction(consumer):
            async for item in self:
                await consumer(item)
        else:
            async for item in self:
                consumer(item)

Pass each value to a given consumer immediately.

#  async def last(self) -> ~ValueT:
View Source
    async def last(self) -> ValueT:
        """Return the last element of this iterator only.

        .. note::
            This method will consume the whole iterator if run.

        Returns
        -------
        ValueT
            The last result.

        Raises
        ------
        LookupError
            If no result exists.
        """
        return await self.reversed().next()

Return the last element of this iterator only.

Note: This method will consume the whole iterator if run.

Returns
  • ValueT: The last result.
Raises
  • LookupError: If no result exists.
#  def limit(self, limit: int) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
    def limit(self, limit: int) -> LazyIterator[ValueT]:
        """Limit the number of items you receive from this async iterator.

        Parameters
        ----------
        limit : int
            The number of items to get. This must be greater than zero.

        Examples
        --------
        ```py
        >>> async for item in paginated_results.limit(3):
        ...     print(item)
        ```

        Returns
        -------
        LazyIterator[ValueT]
            A paginated results view that asynchronously yields a maximum
            of the given number of items before completing.
        """
        return _LimitedLazyIterator(self, limit)

Limit the number of items you receive from this async iterator.

Parameters
  • limit (int): The number of items to get. This must be greater than zero.
Examples
>>> async for item in paginated_results.limit(3):
...     print(item)
Returns
  • LazyIterator[ValueT]: A paginated results view that asynchronously yields a maximum of the given number of items before completing.
#  def map(
   self,
   transformation: Union[Callable[[~ValueT], ~AnotherValueT], str]
) -> hikari.iterators.LazyIterator[~AnotherValueT]:
View Source
    def map(
        self,
        transformation: typing.Union[typing.Callable[[ValueT], AnotherValueT], str],
    ) -> LazyIterator[AnotherValueT]:
        """Map the values to a different value.

        Parameters
        ----------
        transformation : typing.Union[typing.Callable[[ValueT], bool], str]
            The function to use to map the attribute. This may alternatively
            be a string attribute name to replace the input value with. You
            can provide nested attributes using the `.` operator.

        Returns
        -------
        LazyIterator[AnotherValueT]
            `LazyIterator` that maps each value to another value.
        """
        if isinstance(transformation, str):
            transformation = typing.cast("spel.AttrGetter[ValueT, AnotherValueT]", spel.AttrGetter(transformation))

        return _MappingLazyIterator(self, transformation)

Map the values to a different value.

Parameters
  • transformation (typing.Union[typing.Callable[[ValueT], bool], str]): The function to use to map the attribute. This may alternatively be a string attribute name to replace the input value with. You can provide nested attributes using the . operator.
Returns
  • LazyIterator[AnotherValueT]: LazyIterator that maps each value to another value.
#  async def next(self) -> ~ValueT:
View Source
    async def next(self) -> ValueT:
        """Return the next element of this iterator only.

        Returns
        -------
        ValueT
            The next result.

        Raises
        ------
        LookupError
            If no more results exist.
        """
        try:
            return await self.__anext__()
        except StopAsyncIteration:
            raise LookupError("No elements were found") from None

Return the next element of this iterator only.

Returns
  • ValueT: The next result.
Raises
  • LookupError: If no more results exist.
#  
@abc.abstractmethod
def open(self) -> None:
View Source
    @abc.abstractmethod
    def open(self) -> None:
        """Mark this streamer as opened to let it start receiving and queueing events.

        If called on an already started streamer then this will do nothing.

        .. note::
            `with streamer` may be used as a short-cut for opening and
            closing a stream.
        """

Mark this streamer as opened to let it start receiving and queueing events.

If called on an already started streamer then this will do nothing.

Note: with streamer may be used as a short-cut for opening and closing a stream.

#  def reversed(self) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
    def reversed(self) -> LazyIterator[ValueT]:
        """Return a lazy iterator of the remainder of this iterator's values reversed.

        Returns
        -------
        LazyIterator[ValueT]
            The lazy iterator of this iterator's remaining values reversed.
        """
        return _ReversedLazyIterator(self)

Return a lazy iterator of the remainder of this iterator's values reversed.

Returns
  • LazyIterator[ValueT]: The lazy iterator of this iterator's remaining values reversed.
#  def skip(self, number: int) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
    def skip(self, number: int) -> LazyIterator[ValueT]:
        """Drop the given number of items, then yield anything after.

        Parameters
        ----------
        number : int
            The max number of items to drop before any items are yielded.

        Returns
        -------
        LazyIterator[ValueT]
            A paginated results view that asynchronously yields all items
            AFTER the given number of items are discarded first.
        """
        return _DropCountLazyIterator(self, number)

Drop the given number of items, then yield anything after.

Parameters
  • number (int): The max number of items to drop before any items are yielded.
Returns
  • LazyIterator[ValueT]: A paginated results view that asynchronously yields all items AFTER the given number of items are discarded first.
#  def skip_until(
   self,
   *predicates: Union[Tuple[str, Any], Callable[[~ValueT], bool]],
   **attrs: Any
) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
    def skip_until(
        self,
        *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[ValueT], bool]],
        **attrs: typing.Any,
    ) -> LazyIterator[ValueT]:
        """Discard items while all conditions are False.

        Items after this will be yielded as normal.

        Parameters
        ----------
        *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]
            Predicates to invoke. These are functions that take a value and
            return `True` if it is of interest, or `False`
            otherwise. These may instead include 2-`tuple` objects
            consisting of a `str` attribute name (nested attributes are
            referred to using the `.` operator), and values to compare for
            equality. This allows you to specify conditions such as
            `members.skip_until(("user.bot", True))`.
        **attrs : typing.Any
            Alternative to passing 2-tuples. Cannot specify nested attributes
            using this method.

        Returns
        -------
        LazyIterator[ValueT]
            LazyIterator that only emits values once a condition has failed.
            All items before this are discarded.
        """
        conditions = self._map_predicates_and_attr_getters("skip_until", *predicates, **attrs)
        return _DropWhileLazyIterator(self, ~conditions)

Discard items while all conditions are False.

Items after this will be yielded as normal.

Parameters
  • *predicates (typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]): Predicates to invoke. These are functions that take a value and return True if it is of interest, or False otherwise. These may instead include 2-tuple objects consisting of a str attribute name (nested attributes are referred to using the . operator), and values to compare for equality. This allows you to specify conditions such as members.skip_until(("user.bot", True)).
  • **attrs (typing.Any): Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
Returns
  • LazyIterator[ValueT]: LazyIterator that only emits values once a condition has failed. All items before this are discarded.
#  def skip_while(
   self,
   *predicates: Union[Tuple[str, Any], Callable[[~ValueT], bool]],
   **attrs: Any
) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
    def skip_while(
        self,
        *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[ValueT], bool]],
        **attrs: typing.Any,
    ) -> LazyIterator[ValueT]:
        """Discard items while all conditions are True.

        Items after this will be yielded as normal.

        Parameters
        ----------
        *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]
            Predicates to invoke. These are functions that take a value and
            return `True` if it is of interest, or `False`
            otherwise. These may instead include 2-`tuple` objects
            consisting of a `str` attribute name (nested attributes
            are referred to using the `.` operator), and values to compare for
            equality. This allows you to specify conditions such as
            `members.skip_while(("user.bot", True))`.
        **attrs : typing.Any
            Alternative to passing 2-tuples. Cannot specify nested attributes
            using this method.

        Returns
        -------
        LazyIterator[ValueT]
            LazyIterator that only emits values once a condition has been met.
            All items before this are discarded.
        """
        conditions = self._map_predicates_and_attr_getters("skip_while", *predicates, **attrs)
        return _DropWhileLazyIterator(self, conditions)

Discard items while all conditions are True.

Items after this will be yielded as normal.

Parameters
  • *predicates (typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]): Predicates to invoke. These are functions that take a value and return True if it is of interest, or False otherwise. These may instead include 2-tuple objects consisting of a str attribute name (nested attributes are referred to using the . operator), and values to compare for equality. This allows you to specify conditions such as members.skip_while(("user.bot", True)).
  • **attrs (typing.Any): Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
Returns
  • LazyIterator[ValueT]: LazyIterator that only emits values once a condition has been met. All items before this are discarded.
#  async def sort(self, *, key: Any = None, reverse: bool = False) -> Sequence[~ValueT]:
View Source
    async def sort(self, *, key: typing.Any = None, reverse: bool = False) -> typing.Sequence[ValueT]:
        """Collect all results, then sort the collection before returning it."""
        return sorted(await self, key=key, reverse=reverse)

Collect all results, then sort the collection before returning it.

#  def take_until(
   self,
   *predicates: Union[Tuple[str, Any], Callable[[~ValueT], bool]],
   **attrs: Any
) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
    def take_until(
        self,
        *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[ValueT], bool]],
        **attrs: typing.Any,
    ) -> LazyIterator[ValueT]:
        """Return each item until any conditions pass or the end is reached.

        Parameters
        ----------
        *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]
            Predicates to invoke. These are functions that take a value and
            return `True` if it is of interest, or `False`
            otherwise. These may instead include 2-`tuple` objects
            consisting of a `str` attribute name (nested attributes are
            referred to using the `.` operator), and values to compare for
            equality. This allows you to specify conditions such as
            `members.take_until(("user.bot", True))`.
        **attrs : typing.Any
            Alternative to passing 2-tuples. Cannot specify nested attributes
            using this method.

        Returns
        -------
        LazyIterator[ValueT]
            LazyIterator that only emits values until any conditions are
            matched.
        """
        conditions = self._map_predicates_and_attr_getters("take_until", *predicates, **attrs)
        return _TakeWhileLazyIterator(self, ~conditions)

Return each item until any conditions pass or the end is reached.

Parameters
  • *predicates (typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]): Predicates to invoke. These are functions that take a value and return True if it is of interest, or False otherwise. These may instead include 2-tuple objects consisting of a str attribute name (nested attributes are referred to using the . operator), and values to compare for equality. This allows you to specify conditions such as members.take_until(("user.bot", True)).
  • **attrs (typing.Any): Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
Returns
  • LazyIterator[ValueT]: LazyIterator that only emits values until any conditions are matched.
#  def take_while(
   self,
   *predicates: Union[Tuple[str, Any], Callable[[~ValueT], bool]],
   **attrs: Any
) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
    def take_while(
        self,
        *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[ValueT], bool]],
        **attrs: typing.Any,
    ) -> LazyIterator[ValueT]:
        """Return each item until any conditions fail or the end is reached.

        Parameters
        ----------
        *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]
            Predicates to invoke. These are functions that take a value and
            return `True` if it is of interest, or `False`
            otherwise. These may instead include 2-`tuple` objects
            consisting of a `str` attribute name (nested attributes
            are referred to using the `.` operator), and values to compare for
            equality. This allows you to specify conditions such as
            `members.take_while(("user.bot", True))`.
        **attrs : typing.Any
            Alternative to passing 2-tuples. Cannot specify nested attributes
            using this method.

        Returns
        -------
        LazyIterator[ValueT]
            LazyIterator that only emits values until any conditions are not
            matched.
        """
        conditions = self._map_predicates_and_attr_getters("take_while", *predicates, **attrs)
        return _TakeWhileLazyIterator(self, conditions)

Return each item until any conditions fail or the end is reached.

Parameters
  • *predicates (typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]): Predicates to invoke. These are functions that take a value and return True if it is of interest, or False otherwise. These may instead include 2-tuple objects consisting of a str attribute name (nested attributes are referred to using the . operator), and values to compare for equality. This allows you to specify conditions such as members.take_while(("user.bot", True)).
  • **attrs (typing.Any): Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
Returns
  • LazyIterator[ValueT]: LazyIterator that only emits values until any conditions are not matched.