hikari.api.event_manager
Core interface for components that manage events in the library.
View Source
# -*- coding: utf-8 -*- # cython: language_level=3 # Copyright (c) 2020 Nekokatt # Copyright (c) 2021-present davfsa # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. """Core interface for components that manage events in the library.""" from __future__ import annotations __all__: typing.Sequence[str] = ("EventManager", "EventStream") import abc import asyncio import typing from hikari import iterators from hikari.events import base_events if typing.TYPE_CHECKING: import types from hikari.api import shard as gateway_shard from hikari.internal import data_binding PredicateT = typing.Callable[[base_events.EventT], bool] CallbackT = typing.Callable[[base_events.EventT], typing.Coroutine[typing.Any, typing.Any, None]] ConsumerT = typing.Callable[ [gateway_shard.GatewayShard, data_binding.JSONObject], typing.Coroutine[typing.Any, typing.Any, None] ] _EventStreamT = typing.TypeVar("_EventStreamT") class EventStream(iterators.LazyIterator[base_events.EventT], abc.ABC): """A base abstract class for all event streamers. Unlike `hikari.iterators.LazyIterator` (which this extends), an event streamer must be started and closed. Examples -------- A streamer may either be started and closed using `with` syntax where `EventStream.open` and `EventStream.close` are implicitly called based on context. ```py with EventStream(app, EventType, timeout=50) as stream: async for entry in stream: ... ``` A streamer may also be directly started and closed using the `EventStream.close` and `EventStream.open`. Note that if you don't call `EventStream.close` after opening a streamer when you're finished with it then it may queue events events in memory indefinitely. ```py stream = EventStream(app, EventType, timeout=50) await stream.open() async for event in stream: ... await stream.close() ``` See Also -------- `hikari.iterators.LazyIterator` """ __slots__: typing.Sequence[str] = () @abc.abstractmethod def close(self) -> None: """Mark this streamer as closed to stop it from queueing and receiving events. If called on an already closed streamer then this will do nothing. .. note:: `with streamer` may be used as a short-cut for opening and closing a streamer. """ @abc.abstractmethod def open(self) -> None: """Mark this streamer as opened to let it start receiving and queueing events. If called on an already started streamer then this will do nothing. .. note:: `with streamer` may be used as a short-cut for opening and closing a stream. """ @abc.abstractmethod def filter( self: _EventStreamT, *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[base_events.EventT], bool]], **attrs: typing.Any, ) -> _EventStreamT: """Filter the items by one or more conditions. Each condition is treated as a predicate, being called with each item that this iterator would return when it is requested. All conditions must evaluate to `True` for the item to be returned. If this is not met, then the item is discarded and ignored, the next matching item will be returned instead, if there is one. Parameters ---------- *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]] Predicates to invoke. These are functions that take a value and return `True` if it is of interest, or `False` otherwise. These may instead include 2-`tuple` objects consisting of a `str` attribute name (nested attributes are referred to using the `.` operator), and values to compare for equality. This allows you to specify conditions such as `members.filter(("user.bot", True))`. **attrs : typing.Any Alternative to passing 2-tuples. Cannot specify nested attributes using this method. Returns ------- EventStream[ValueT] The current stream with the new filter applied. """ @abc.abstractmethod def __enter__(self: _EventStreamT) -> _EventStreamT: raise NotImplementedError @abc.abstractmethod def __exit__( self, exc_type: typing.Optional[typing.Type[BaseException]], exc: typing.Optional[BaseException], exc_tb: typing.Optional[types.TracebackType], ) -> None: raise NotImplementedError class EventManager(abc.ABC): """Base interface for event manager implementations. This is a listener of a `hikari.events.base_events.Event` object and consumer of raw event payloads, and is expected to invoke one or more corresponding event listeners where appropriate. """ __slots__: typing.Sequence[str] = () @abc.abstractmethod def consume_raw_event( self, event_name: str, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject ) -> None: """Consume a raw event. Parameters ---------- event_name : str The case-insensitive name of the event being triggered. shard : hikari.api.shard.GatewayShard Object of the shard that received this event. payload : hikari.internal.data_binding.JSONObject Payload of the event being triggered. Raises ------ LookupError If there is no consumer for the event. """ @abc.abstractmethod def dispatch(self, event: base_events.Event) -> asyncio.Future[typing.Any]: """Dispatch an event. Parameters ---------- event : hikari.events.base_events.Event The event to dispatch. Example ------- We can dispatch custom events by first defining a class that derives from `hikari.events.base_events.Event`. ```py import attr from hikari.traits import RESTAware from hikari.events.base_events import Event from hikari.users import User from hikari.snowflakes import Snowflake @attr.define() class EveryoneMentionedEvent(Event): app: RESTAware = attr.field() author: User = attr.field() '''The user who mentioned everyone.''' content: str = attr.field() '''The message that was sent.''' message_id: Snowflake = attr.field() '''The message ID.''' channel_id: Snowflake = attr.field() '''The channel ID.''' ``` We can then dispatch our event as we see fit. ```py from hikari.events.messages import MessageCreateEvent @bot.listen(MessageCreateEvent) async def on_message(event): if "@everyone" in event.content or "@here" in event.content: event = EveryoneMentionedEvent( author=event.author, content=event.content, message_id=event.id, channel_id=event.channel_id, ) bot.dispatch(event) ``` This event can be listened to elsewhere by subscribing to it with `EventManager.subscribe`. ```py @bot.listen(EveryoneMentionedEvent) async def on_everyone_mentioned(event): print(event.user, "just pinged everyone in", event.channel_id) ``` Returns ------- asyncio.Future[typing.Any] A future that can be optionally awaited. If awaited, the future will complete once all corresponding event listeners have been invoked. If not awaited, this will schedule the dispatch of the events in the background for later. See Also -------- `hikari.api.event_manager.EventManager.listen` `hikari.api.event_manager.EventManager.stream` `hikari.api.event_manager.EventManager.subscribe` `hikari.api.event_manager.EventManager.unsubscribe` `hikari.api.event_manager.EventManager.wait_for` """ # Yes, this is not generic. The reason for this is MyPy complains about # using ABCs that are not concrete in generic types passed to functions. # For the sake of UX, I will check this at runtime instead and let the # user use a static type checker. @abc.abstractmethod def subscribe(self, event_type: typing.Type[typing.Any], callback: CallbackT[typing.Any]) -> None: """Subscribe a given callback to a given event type. Parameters ---------- event_type : typing.Type[T] The event type to listen for. This will also listen for any subclasses of the given type. `T` must be a subclass of `hikari.events.base_events.Event`. callback Must be a coroutine function to invoke. This should consume an instance of the given event, or an instance of a valid subclass if one exists. Any result is discarded. Example ------- The following demonstrates subscribing a callback to message creation events. ```py from hikari.events.messages import MessageCreateEvent async def on_message(event): ... bot.subscribe(MessageCreateEvent, on_message) ``` See Also -------- `hikari.api.event_manager.EventManager.dispatch` `hikari.api.event_manager.EventManager.listen` `hikari.api.event_manager.EventManager.stream` `hikari.api.event_manager.EventManager.unsubscribe` `hikari.api.event_manager.EventManager.wait_for` """ # Yes, this is not generic. The reason for this is MyPy complains about # using ABCs that are not concrete in generic types passed to functions. # For the sake of UX, I will check this at runtime instead and let the # user use a static type checker. @abc.abstractmethod def unsubscribe(self, event_type: typing.Type[typing.Any], callback: CallbackT[typing.Any]) -> None: """Unsubscribe a given callback from a given event type, if present. Parameters ---------- event_type : typing.Type[T] The event type to unsubscribe from. This must be the same exact type as was originally subscribed with to be removed correctly. `T` must derive from `hikari.events.base_events.Event`. callback The callback to unsubscribe. Example ------- The following demonstrates unsubscribing a callback from a message creation event. ```py from hikari.events.messages import MessageCreateEvent async def on_message(event): ... bot.unsubscribe(MessageCreateEvent, on_message) ``` See Also -------- `hikari.api.event_manager.EventManager.dispatch` `hikari.api.event_manager.EventManager.listen` `hikari.api.event_manager.EventManager.stream` `hikari.api.event_manager.EventManager.subscribe` `hikari.api.event_manager.EventManager.wait_for` """ @abc.abstractmethod def get_listeners( self, event_type: typing.Type[base_events.EventT], /, *, polymorphic: bool = True, ) -> typing.Collection[CallbackT[base_events.EventT]]: """Get the listeners for a given event type, if there are any. Parameters ---------- event_type : typing.Type[T] The event type to look for. `T` must be a subclass of `hikari.events.base_events.Event`. polymorphic : bool If `True`, this will also return the listeners for all the event types `event_type` will dispatch. If `False`, then only listeners for this class specifically are returned. The default is `True`. Returns ------- typing.Collection[typing.Callable[[T], typing.Coroutine[typing.Any, typing.Any, None]] A copy of the collection of listeners for the event. Will return an empty collection if nothing is registered. """ @abc.abstractmethod def listen( self, *event_types: typing.Type[base_events.EventT], ) -> typing.Callable[[CallbackT[base_events.EventT]], CallbackT[base_events.EventT]]: """Generate a decorator to subscribe a callback to an event type. This is a second-order decorator. Parameters ---------- *event_types : typing.Optional[typing.Type[T]] The event types to subscribe to. The implementation may allow this to be undefined. If this is the case, the event type will be inferred instead from the type hints on the function signature. `T` must be a subclass of `hikari.events.base_events.Event`. Returns ------- typing.Callable[[T], T] A decorator for a coroutine function that passes it to `EventManager.subscribe` before returning the function reference. See Also -------- `hikari.api.event_manager.EventManager.dispatch` `hikari.api.event_manager.EventManager.stream` `hikari.api.event_manager.EventManager.subscribe` `hikari.api.event_manager.EventManager.unsubscribe` `hikari.api.event_manager.EventManager.wait_for` """ @abc.abstractmethod def stream( self, event_type: typing.Type[base_events.EventT], /, timeout: typing.Union[float, int, None], limit: typing.Optional[int] = None, ) -> EventStream[base_events.EventT]: """Return a stream iterator for the given event and sub-events. .. warning:: If you use `await stream.open()` to start the stream then you must also close it with `await stream.close()` otherwise it may queue events in memory indefinitely. Parameters ---------- event_type : typing.Type[hikari.events.base_events.Event] The event type to listen for. This will listen for subclasses of this type additionally. timeout : typing.Optional[int, float] How long this streamer should wait for the next event before ending the iteration. If `None` then this will continue until explicitly broken from. limit : typing.Optional[int] The limit for how many events this should queue at one time before dropping extra incoming events, leave this as `None` for the cache size to be unlimited. Returns ------- EventStream[hikari.events.base_events.Event] The async iterator to handle streamed events. This must be started with `with stream:` or `stream.open()` before asynchronously iterating over it. Examples -------- ```py with bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) as stream: async for user_id in stream.map("user_id").limit(50): ... ``` or using `open()` and `close()` ```py stream = bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) stream.open() async for user_id in stream.map("user_id").limit(50) ... stream.close() ``` See Also -------- `hikari.api.event_manager.EventManager.dispatch` `hikari.api.event_manager.EventManager.listen` `hikari.api.event_manager.EventManager.subscribe` `hikari.api.event_manager.EventManager.unsubscribe` `hikari.api.event_manager.EventManager.wait_for` """ @abc.abstractmethod async def wait_for( self, event_type: typing.Type[base_events.EventT], /, timeout: typing.Union[float, int, None], predicate: typing.Optional[PredicateT[base_events.EventT]] = None, ) -> base_events.EventT: """Wait for a given event to occur once, then return the event. .. warning:: Async predicates are not supported. Parameters ---------- event_type : typing.Type[hikari.events.base_events.Event] The event type to listen for. This will listen for subclasses of this type additionally. predicate A function taking the event as the single parameter. This should return `True` if the event is one you want to return, or `False` if the event should not be returned. If left as `None` (the default), then the first matching event type that the bot receives (or any subtype) will be the one returned. timeout : typing.Union[float, int, None] The amount of time to wait before raising an `asyncio.TimeoutError` and giving up instead. This is measured in seconds. If `None`, then no timeout will be waited for (no timeout can result in "leaking" of coroutines that never complete if called in an uncontrolled way, so is not recommended). Returns ------- hikari.events.base_events.Event The event that was provided. Raises ------ asyncio.TimeoutError If the timeout is not `None` and is reached before an event is received that the predicate returns `True` for. See Also -------- `hikari.api.event_manager.EventManager.dispatch` `hikari.api.event_manager.EventManager.listen` `hikari.api.event_manager.EventManager.stream` `hikari.api.event_manager.EventManager.subscribe` `hikari.api.event_manager.EventManager.unsubscribe` """
View Source
class EventManager(abc.ABC): """Base interface for event manager implementations. This is a listener of a `hikari.events.base_events.Event` object and consumer of raw event payloads, and is expected to invoke one or more corresponding event listeners where appropriate. """ __slots__: typing.Sequence[str] = () @abc.abstractmethod def consume_raw_event( self, event_name: str, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject ) -> None: """Consume a raw event. Parameters ---------- event_name : str The case-insensitive name of the event being triggered. shard : hikari.api.shard.GatewayShard Object of the shard that received this event. payload : hikari.internal.data_binding.JSONObject Payload of the event being triggered. Raises ------ LookupError If there is no consumer for the event. """ @abc.abstractmethod def dispatch(self, event: base_events.Event) -> asyncio.Future[typing.Any]: """Dispatch an event. Parameters ---------- event : hikari.events.base_events.Event The event to dispatch. Example ------- We can dispatch custom events by first defining a class that derives from `hikari.events.base_events.Event`. ```py import attr from hikari.traits import RESTAware from hikari.events.base_events import Event from hikari.users import User from hikari.snowflakes import Snowflake @attr.define() class EveryoneMentionedEvent(Event): app: RESTAware = attr.field() author: User = attr.field() '''The user who mentioned everyone.''' content: str = attr.field() '''The message that was sent.''' message_id: Snowflake = attr.field() '''The message ID.''' channel_id: Snowflake = attr.field() '''The channel ID.''' ``` We can then dispatch our event as we see fit. ```py from hikari.events.messages import MessageCreateEvent @bot.listen(MessageCreateEvent) async def on_message(event): if "@everyone" in event.content or "@here" in event.content: event = EveryoneMentionedEvent( author=event.author, content=event.content, message_id=event.id, channel_id=event.channel_id, ) bot.dispatch(event) ``` This event can be listened to elsewhere by subscribing to it with `EventManager.subscribe`. ```py @bot.listen(EveryoneMentionedEvent) async def on_everyone_mentioned(event): print(event.user, "just pinged everyone in", event.channel_id) ``` Returns ------- asyncio.Future[typing.Any] A future that can be optionally awaited. If awaited, the future will complete once all corresponding event listeners have been invoked. If not awaited, this will schedule the dispatch of the events in the background for later. See Also -------- `hikari.api.event_manager.EventManager.listen` `hikari.api.event_manager.EventManager.stream` `hikari.api.event_manager.EventManager.subscribe` `hikari.api.event_manager.EventManager.unsubscribe` `hikari.api.event_manager.EventManager.wait_for` """ # Yes, this is not generic. The reason for this is MyPy complains about # using ABCs that are not concrete in generic types passed to functions. # For the sake of UX, I will check this at runtime instead and let the # user use a static type checker. @abc.abstractmethod def subscribe(self, event_type: typing.Type[typing.Any], callback: CallbackT[typing.Any]) -> None: """Subscribe a given callback to a given event type. Parameters ---------- event_type : typing.Type[T] The event type to listen for. This will also listen for any subclasses of the given type. `T` must be a subclass of `hikari.events.base_events.Event`. callback Must be a coroutine function to invoke. This should consume an instance of the given event, or an instance of a valid subclass if one exists. Any result is discarded. Example ------- The following demonstrates subscribing a callback to message creation events. ```py from hikari.events.messages import MessageCreateEvent async def on_message(event): ... bot.subscribe(MessageCreateEvent, on_message) ``` See Also -------- `hikari.api.event_manager.EventManager.dispatch` `hikari.api.event_manager.EventManager.listen` `hikari.api.event_manager.EventManager.stream` `hikari.api.event_manager.EventManager.unsubscribe` `hikari.api.event_manager.EventManager.wait_for` """ # Yes, this is not generic. The reason for this is MyPy complains about # using ABCs that are not concrete in generic types passed to functions. # For the sake of UX, I will check this at runtime instead and let the # user use a static type checker. @abc.abstractmethod def unsubscribe(self, event_type: typing.Type[typing.Any], callback: CallbackT[typing.Any]) -> None: """Unsubscribe a given callback from a given event type, if present. Parameters ---------- event_type : typing.Type[T] The event type to unsubscribe from. This must be the same exact type as was originally subscribed with to be removed correctly. `T` must derive from `hikari.events.base_events.Event`. callback The callback to unsubscribe. Example ------- The following demonstrates unsubscribing a callback from a message creation event. ```py from hikari.events.messages import MessageCreateEvent async def on_message(event): ... bot.unsubscribe(MessageCreateEvent, on_message) ``` See Also -------- `hikari.api.event_manager.EventManager.dispatch` `hikari.api.event_manager.EventManager.listen` `hikari.api.event_manager.EventManager.stream` `hikari.api.event_manager.EventManager.subscribe` `hikari.api.event_manager.EventManager.wait_for` """ @abc.abstractmethod def get_listeners( self, event_type: typing.Type[base_events.EventT], /, *, polymorphic: bool = True, ) -> typing.Collection[CallbackT[base_events.EventT]]: """Get the listeners for a given event type, if there are any. Parameters ---------- event_type : typing.Type[T] The event type to look for. `T` must be a subclass of `hikari.events.base_events.Event`. polymorphic : bool If `True`, this will also return the listeners for all the event types `event_type` will dispatch. If `False`, then only listeners for this class specifically are returned. The default is `True`. Returns ------- typing.Collection[typing.Callable[[T], typing.Coroutine[typing.Any, typing.Any, None]] A copy of the collection of listeners for the event. Will return an empty collection if nothing is registered. """ @abc.abstractmethod def listen( self, *event_types: typing.Type[base_events.EventT], ) -> typing.Callable[[CallbackT[base_events.EventT]], CallbackT[base_events.EventT]]: """Generate a decorator to subscribe a callback to an event type. This is a second-order decorator. Parameters ---------- *event_types : typing.Optional[typing.Type[T]] The event types to subscribe to. The implementation may allow this to be undefined. If this is the case, the event type will be inferred instead from the type hints on the function signature. `T` must be a subclass of `hikari.events.base_events.Event`. Returns ------- typing.Callable[[T], T] A decorator for a coroutine function that passes it to `EventManager.subscribe` before returning the function reference. See Also -------- `hikari.api.event_manager.EventManager.dispatch` `hikari.api.event_manager.EventManager.stream` `hikari.api.event_manager.EventManager.subscribe` `hikari.api.event_manager.EventManager.unsubscribe` `hikari.api.event_manager.EventManager.wait_for` """ @abc.abstractmethod def stream( self, event_type: typing.Type[base_events.EventT], /, timeout: typing.Union[float, int, None], limit: typing.Optional[int] = None, ) -> EventStream[base_events.EventT]: """Return a stream iterator for the given event and sub-events. .. warning:: If you use `await stream.open()` to start the stream then you must also close it with `await stream.close()` otherwise it may queue events in memory indefinitely. Parameters ---------- event_type : typing.Type[hikari.events.base_events.Event] The event type to listen for. This will listen for subclasses of this type additionally. timeout : typing.Optional[int, float] How long this streamer should wait for the next event before ending the iteration. If `None` then this will continue until explicitly broken from. limit : typing.Optional[int] The limit for how many events this should queue at one time before dropping extra incoming events, leave this as `None` for the cache size to be unlimited. Returns ------- EventStream[hikari.events.base_events.Event] The async iterator to handle streamed events. This must be started with `with stream:` or `stream.open()` before asynchronously iterating over it. Examples -------- ```py with bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) as stream: async for user_id in stream.map("user_id").limit(50): ... ``` or using `open()` and `close()` ```py stream = bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) stream.open() async for user_id in stream.map("user_id").limit(50) ... stream.close() ``` See Also -------- `hikari.api.event_manager.EventManager.dispatch` `hikari.api.event_manager.EventManager.listen` `hikari.api.event_manager.EventManager.subscribe` `hikari.api.event_manager.EventManager.unsubscribe` `hikari.api.event_manager.EventManager.wait_for` """ @abc.abstractmethod async def wait_for( self, event_type: typing.Type[base_events.EventT], /, timeout: typing.Union[float, int, None], predicate: typing.Optional[PredicateT[base_events.EventT]] = None, ) -> base_events.EventT: """Wait for a given event to occur once, then return the event. .. warning:: Async predicates are not supported. Parameters ---------- event_type : typing.Type[hikari.events.base_events.Event] The event type to listen for. This will listen for subclasses of this type additionally. predicate A function taking the event as the single parameter. This should return `True` if the event is one you want to return, or `False` if the event should not be returned. If left as `None` (the default), then the first matching event type that the bot receives (or any subtype) will be the one returned. timeout : typing.Union[float, int, None] The amount of time to wait before raising an `asyncio.TimeoutError` and giving up instead. This is measured in seconds. If `None`, then no timeout will be waited for (no timeout can result in "leaking" of coroutines that never complete if called in an uncontrolled way, so is not recommended). Returns ------- hikari.events.base_events.Event The event that was provided. Raises ------ asyncio.TimeoutError If the timeout is not `None` and is reached before an event is received that the predicate returns `True` for. See Also -------- `hikari.api.event_manager.EventManager.dispatch` `hikari.api.event_manager.EventManager.listen` `hikari.api.event_manager.EventManager.stream` `hikari.api.event_manager.EventManager.subscribe` `hikari.api.event_manager.EventManager.unsubscribe` """
Base interface for event manager implementations.
This is a listener of a hikari.events.base_events.Event
object and consumer of raw event payloads, and is expected to invoke one or more corresponding event listeners where appropriate.
Methods
self,
event_name: str,
shard: hikari.api.shard.GatewayShard,
payload: Dict[str, Any]
) -> None:
View Source
@abc.abstractmethod def consume_raw_event( self, event_name: str, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject ) -> None: """Consume a raw event. Parameters ---------- event_name : str The case-insensitive name of the event being triggered. shard : hikari.api.shard.GatewayShard Object of the shard that received this event. payload : hikari.internal.data_binding.JSONObject Payload of the event being triggered. Raises ------ LookupError If there is no consumer for the event. """
Consume a raw event.
Parameters
- event_name (str): The case-insensitive name of the event being triggered.
- shard (hikari.api.shard.GatewayShard): Object of the shard that received this event.
- payload (hikari.internal.data_binding.JSONObject): Payload of the event being triggered.
Raises
- LookupError: If there is no consumer for the event.
self,
event: hikari.events.base_events.Event
) -> _asyncio.Future[typing.Any]:
View Source
@abc.abstractmethod def dispatch(self, event: base_events.Event) -> asyncio.Future[typing.Any]: """Dispatch an event. Parameters ---------- event : hikari.events.base_events.Event The event to dispatch. Example ------- We can dispatch custom events by first defining a class that derives from `hikari.events.base_events.Event`. ```py import attr from hikari.traits import RESTAware from hikari.events.base_events import Event from hikari.users import User from hikari.snowflakes import Snowflake @attr.define() class EveryoneMentionedEvent(Event): app: RESTAware = attr.field() author: User = attr.field() '''The user who mentioned everyone.''' content: str = attr.field() '''The message that was sent.''' message_id: Snowflake = attr.field() '''The message ID.''' channel_id: Snowflake = attr.field() '''The channel ID.''' ``` We can then dispatch our event as we see fit. ```py from hikari.events.messages import MessageCreateEvent @bot.listen(MessageCreateEvent) async def on_message(event): if "@everyone" in event.content or "@here" in event.content: event = EveryoneMentionedEvent( author=event.author, content=event.content, message_id=event.id, channel_id=event.channel_id, ) bot.dispatch(event) ``` This event can be listened to elsewhere by subscribing to it with `EventManager.subscribe`. ```py @bot.listen(EveryoneMentionedEvent) async def on_everyone_mentioned(event): print(event.user, "just pinged everyone in", event.channel_id) ``` Returns ------- asyncio.Future[typing.Any] A future that can be optionally awaited. If awaited, the future will complete once all corresponding event listeners have been invoked. If not awaited, this will schedule the dispatch of the events in the background for later. See Also -------- `hikari.api.event_manager.EventManager.listen` `hikari.api.event_manager.EventManager.stream` `hikari.api.event_manager.EventManager.subscribe` `hikari.api.event_manager.EventManager.unsubscribe` `hikari.api.event_manager.EventManager.wait_for` """
Dispatch an event.
Parameters
- event (hikari.events.base_events.Event): The event to dispatch.
Example
We can dispatch custom events by first defining a class that derives from hikari.events.base_events.Event
.
import attr
from hikari.traits import RESTAware
from hikari.events.base_events import Event
from hikari.users import User
from hikari.snowflakes import Snowflake
@attr.define()
class EveryoneMentionedEvent(Event):
app: RESTAware = attr.field()
author: User = attr.field()
'''The user who mentioned everyone.'''
content: str = attr.field()
'''The message that was sent.'''
message_id: Snowflake = attr.field()
'''The message ID.'''
channel_id: Snowflake = attr.field()
'''The channel ID.'''
We can then dispatch our event as we see fit.
from hikari.events.messages import MessageCreateEvent
@bot.listen(MessageCreateEvent)
async def on_message(event):
if "@everyone" in event.content or "@here" in event.content:
event = EveryoneMentionedEvent(
author=event.author,
content=event.content,
message_id=event.id,
channel_id=event.channel_id,
)
bot.dispatch(event)
This event can be listened to elsewhere by subscribing to it with EventManager.subscribe
.
@bot.listen(EveryoneMentionedEvent)
async def on_everyone_mentioned(event):
print(event.user, "just pinged everyone in", event.channel_id)
Returns
- asyncio.Future[typing.Any]: A future that can be optionally awaited. If awaited, the future will complete once all corresponding event listeners have been invoked. If not awaited, this will schedule the dispatch of the events in the background for later.
See Also
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.unsubscribe
hikari.api.event_manager.EventManager.wait_for
self,
event_type: Type[~EventT],
/,
*,
polymorphic: bool = True
) -> Collection[Callable[[~EventT], Coroutine[Any, Any, NoneType]]]:
View Source
@abc.abstractmethod def get_listeners( self, event_type: typing.Type[base_events.EventT], /, *, polymorphic: bool = True, ) -> typing.Collection[CallbackT[base_events.EventT]]: """Get the listeners for a given event type, if there are any. Parameters ---------- event_type : typing.Type[T] The event type to look for. `T` must be a subclass of `hikari.events.base_events.Event`. polymorphic : bool If `True`, this will also return the listeners for all the event types `event_type` will dispatch. If `False`, then only listeners for this class specifically are returned. The default is `True`. Returns ------- typing.Collection[typing.Callable[[T], typing.Coroutine[typing.Any, typing.Any, None]] A copy of the collection of listeners for the event. Will return an empty collection if nothing is registered. """
Get the listeners for a given event type, if there are any.
Parameters
- event_type (typing.Type[T]): The event type to look for.
T
must be a subclass ofhikari.events.base_events.Event
. - polymorphic (bool): If
True
, this will also return the listeners for all the event typesevent_type
will dispatch. IfFalse
, then only listeners for this class specifically are returned. The default isTrue
.
Returns
- typing.Collection[typing.Callable[[T], typing.Coroutine[typing.Any, typing.Any, None]]: A copy of the collection of listeners for the event. Will return an empty collection if nothing is registered.
self,
*event_types: Type[~EventT]
) -> Callable[[Callable[[~EventT], Coroutine[Any, Any, NoneType]]], Callable[[~EventT], Coroutine[Any, Any, NoneType]]]:
View Source
@abc.abstractmethod def listen( self, *event_types: typing.Type[base_events.EventT], ) -> typing.Callable[[CallbackT[base_events.EventT]], CallbackT[base_events.EventT]]: """Generate a decorator to subscribe a callback to an event type. This is a second-order decorator. Parameters ---------- *event_types : typing.Optional[typing.Type[T]] The event types to subscribe to. The implementation may allow this to be undefined. If this is the case, the event type will be inferred instead from the type hints on the function signature. `T` must be a subclass of `hikari.events.base_events.Event`. Returns ------- typing.Callable[[T], T] A decorator for a coroutine function that passes it to `EventManager.subscribe` before returning the function reference. See Also -------- `hikari.api.event_manager.EventManager.dispatch` `hikari.api.event_manager.EventManager.stream` `hikari.api.event_manager.EventManager.subscribe` `hikari.api.event_manager.EventManager.unsubscribe` `hikari.api.event_manager.EventManager.wait_for` """
Generate a decorator to subscribe a callback to an event type.
This is a second-order decorator.
Parameters
- *event_types (typing.Optional[typing.Type[T]]): The event types to subscribe to. The implementation may allow this to be undefined. If this is the case, the event type will be inferred instead from the type hints on the function signature.
T
must be a subclass ofhikari.events.base_events.Event
.
Returns
- typing.Callable[[T], T]: A decorator for a coroutine function that passes it to
EventManager.subscribe
before returning the function reference.
See Also
hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.unsubscribe
hikari.api.event_manager.EventManager.wait_for
self,
event_type: Type[~EventT],
/,
timeout: Union[float, int, NoneType],
limit: Optional[int] = None
) -> hikari.api.event_manager.EventStream[~EventT]:
View Source
@abc.abstractmethod def stream( self, event_type: typing.Type[base_events.EventT], /, timeout: typing.Union[float, int, None], limit: typing.Optional[int] = None, ) -> EventStream[base_events.EventT]: """Return a stream iterator for the given event and sub-events. .. warning:: If you use `await stream.open()` to start the stream then you must also close it with `await stream.close()` otherwise it may queue events in memory indefinitely. Parameters ---------- event_type : typing.Type[hikari.events.base_events.Event] The event type to listen for. This will listen for subclasses of this type additionally. timeout : typing.Optional[int, float] How long this streamer should wait for the next event before ending the iteration. If `None` then this will continue until explicitly broken from. limit : typing.Optional[int] The limit for how many events this should queue at one time before dropping extra incoming events, leave this as `None` for the cache size to be unlimited. Returns ------- EventStream[hikari.events.base_events.Event] The async iterator to handle streamed events. This must be started with `with stream:` or `stream.open()` before asynchronously iterating over it. Examples -------- ```py with bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) as stream: async for user_id in stream.map("user_id").limit(50): ... ``` or using `open()` and `close()` ```py stream = bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) stream.open() async for user_id in stream.map("user_id").limit(50) ... stream.close() ``` See Also -------- `hikari.api.event_manager.EventManager.dispatch` `hikari.api.event_manager.EventManager.listen` `hikari.api.event_manager.EventManager.subscribe` `hikari.api.event_manager.EventManager.unsubscribe` `hikari.api.event_manager.EventManager.wait_for` """
Return a stream iterator for the given event and sub-events.
Warning: If you use await stream.open()
to start the stream then you must also close it with await stream.close()
otherwise it may queue events in memory indefinitely.
Parameters
- event_type (typing.Type[hikari.events.base_events.Event]): The event type to listen for. This will listen for subclasses of this type additionally.
- timeout (typing.Optional[int, float]): How long this streamer should wait for the next event before ending the iteration. If
None
then this will continue until explicitly broken from. - limit (typing.Optional[int]): The limit for how many events this should queue at one time before dropping extra incoming events, leave this as
None
for the cache size to be unlimited.
Returns
- EventStream[hikari.events.base_events.Event]: The async iterator to handle streamed events. This must be started with
with stream:
orstream.open()
before asynchronously iterating over it.
Examples
with bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) as stream:
async for user_id in stream.map("user_id").limit(50):
...
or using open()
and close()
stream = bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id))
stream.open()
async for user_id in stream.map("user_id").limit(50)
...
stream.close()
See Also
hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.unsubscribe
hikari.api.event_manager.EventManager.wait_for
self,
event_type: Type[Any],
callback: Callable[[Any], Coroutine[Any, Any, NoneType]]
) -> None:
View Source
@abc.abstractmethod def subscribe(self, event_type: typing.Type[typing.Any], callback: CallbackT[typing.Any]) -> None: """Subscribe a given callback to a given event type. Parameters ---------- event_type : typing.Type[T] The event type to listen for. This will also listen for any subclasses of the given type. `T` must be a subclass of `hikari.events.base_events.Event`. callback Must be a coroutine function to invoke. This should consume an instance of the given event, or an instance of a valid subclass if one exists. Any result is discarded. Example ------- The following demonstrates subscribing a callback to message creation events. ```py from hikari.events.messages import MessageCreateEvent async def on_message(event): ... bot.subscribe(MessageCreateEvent, on_message) ``` See Also -------- `hikari.api.event_manager.EventManager.dispatch` `hikari.api.event_manager.EventManager.listen` `hikari.api.event_manager.EventManager.stream` `hikari.api.event_manager.EventManager.unsubscribe` `hikari.api.event_manager.EventManager.wait_for` """
Subscribe a given callback to a given event type.
Parameters
- event_type (typing.Type[T]): The event type to listen for. This will also listen for any subclasses of the given type.
T
must be a subclass ofhikari.events.base_events.Event
. - callback: Must be a coroutine function to invoke. This should consume an instance of the given event, or an instance of a valid subclass if one exists. Any result is discarded.
Example
The following demonstrates subscribing a callback to message creation events.
from hikari.events.messages import MessageCreateEvent
async def on_message(event):
...
bot.subscribe(MessageCreateEvent, on_message)
See Also
hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.unsubscribe
hikari.api.event_manager.EventManager.wait_for
self,
event_type: Type[Any],
callback: Callable[[Any], Coroutine[Any, Any, NoneType]]
) -> None:
View Source
@abc.abstractmethod def unsubscribe(self, event_type: typing.Type[typing.Any], callback: CallbackT[typing.Any]) -> None: """Unsubscribe a given callback from a given event type, if present. Parameters ---------- event_type : typing.Type[T] The event type to unsubscribe from. This must be the same exact type as was originally subscribed with to be removed correctly. `T` must derive from `hikari.events.base_events.Event`. callback The callback to unsubscribe. Example ------- The following demonstrates unsubscribing a callback from a message creation event. ```py from hikari.events.messages import MessageCreateEvent async def on_message(event): ... bot.unsubscribe(MessageCreateEvent, on_message) ``` See Also -------- `hikari.api.event_manager.EventManager.dispatch` `hikari.api.event_manager.EventManager.listen` `hikari.api.event_manager.EventManager.stream` `hikari.api.event_manager.EventManager.subscribe` `hikari.api.event_manager.EventManager.wait_for` """
Unsubscribe a given callback from a given event type, if present.
Parameters
- event_type (typing.Type[T]): The event type to unsubscribe from. This must be the same exact type as was originally subscribed with to be removed correctly.
T
must derive fromhikari.events.base_events.Event
. - callback: The callback to unsubscribe.
Example
The following demonstrates unsubscribing a callback from a message creation event.
from hikari.events.messages import MessageCreateEvent
async def on_message(event):
...
bot.unsubscribe(MessageCreateEvent, on_message)
See Also
hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.wait_for
self,
event_type: Type[~EventT],
/,
timeout: Union[float, int, NoneType],
predicate: Optional[Callable[[~EventT], bool]] = None
) -> ~EventT:
View Source
@abc.abstractmethod async def wait_for( self, event_type: typing.Type[base_events.EventT], /, timeout: typing.Union[float, int, None], predicate: typing.Optional[PredicateT[base_events.EventT]] = None, ) -> base_events.EventT: """Wait for a given event to occur once, then return the event. .. warning:: Async predicates are not supported. Parameters ---------- event_type : typing.Type[hikari.events.base_events.Event] The event type to listen for. This will listen for subclasses of this type additionally. predicate A function taking the event as the single parameter. This should return `True` if the event is one you want to return, or `False` if the event should not be returned. If left as `None` (the default), then the first matching event type that the bot receives (or any subtype) will be the one returned. timeout : typing.Union[float, int, None] The amount of time to wait before raising an `asyncio.TimeoutError` and giving up instead. This is measured in seconds. If `None`, then no timeout will be waited for (no timeout can result in "leaking" of coroutines that never complete if called in an uncontrolled way, so is not recommended). Returns ------- hikari.events.base_events.Event The event that was provided. Raises ------ asyncio.TimeoutError If the timeout is not `None` and is reached before an event is received that the predicate returns `True` for. See Also -------- `hikari.api.event_manager.EventManager.dispatch` `hikari.api.event_manager.EventManager.listen` `hikari.api.event_manager.EventManager.stream` `hikari.api.event_manager.EventManager.subscribe` `hikari.api.event_manager.EventManager.unsubscribe` """
Wait for a given event to occur once, then return the event.
Warning: Async predicates are not supported.
Parameters
- event_type (typing.Type[hikari.events.base_events.Event]): The event type to listen for. This will listen for subclasses of this type additionally.
- predicate: A function taking the event as the single parameter. This should return
True
if the event is one you want to return, orFalse
if the event should not be returned. If left asNone
(the default), then the first matching event type that the bot receives (or any subtype) will be the one returned. - timeout (typing.Union[float, int, None]): The amount of time to wait before raising an
asyncio.TimeoutError
and giving up instead. This is measured in seconds. IfNone
, then no timeout will be waited for (no timeout can result in "leaking" of coroutines that never complete if called in an uncontrolled way, so is not recommended).
Returns
- hikari.events.base_events.Event: The event that was provided.
Raises
- asyncio.TimeoutError: If the timeout is not
None
and is reached before an event is received that the predicate returnsTrue
for.
See Also
hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.unsubscribe
View Source
class EventStream(iterators.LazyIterator[base_events.EventT], abc.ABC): """A base abstract class for all event streamers. Unlike `hikari.iterators.LazyIterator` (which this extends), an event streamer must be started and closed. Examples -------- A streamer may either be started and closed using `with` syntax where `EventStream.open` and `EventStream.close` are implicitly called based on context. ```py with EventStream(app, EventType, timeout=50) as stream: async for entry in stream: ... ``` A streamer may also be directly started and closed using the `EventStream.close` and `EventStream.open`. Note that if you don't call `EventStream.close` after opening a streamer when you're finished with it then it may queue events events in memory indefinitely. ```py stream = EventStream(app, EventType, timeout=50) await stream.open() async for event in stream: ... await stream.close() ``` See Also -------- `hikari.iterators.LazyIterator` """ __slots__: typing.Sequence[str] = () @abc.abstractmethod def close(self) -> None: """Mark this streamer as closed to stop it from queueing and receiving events. If called on an already closed streamer then this will do nothing. .. note:: `with streamer` may be used as a short-cut for opening and closing a streamer. """ @abc.abstractmethod def open(self) -> None: """Mark this streamer as opened to let it start receiving and queueing events. If called on an already started streamer then this will do nothing. .. note:: `with streamer` may be used as a short-cut for opening and closing a stream. """ @abc.abstractmethod def filter( self: _EventStreamT, *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[base_events.EventT], bool]], **attrs: typing.Any, ) -> _EventStreamT: """Filter the items by one or more conditions. Each condition is treated as a predicate, being called with each item that this iterator would return when it is requested. All conditions must evaluate to `True` for the item to be returned. If this is not met, then the item is discarded and ignored, the next matching item will be returned instead, if there is one. Parameters ---------- *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]] Predicates to invoke. These are functions that take a value and return `True` if it is of interest, or `False` otherwise. These may instead include 2-`tuple` objects consisting of a `str` attribute name (nested attributes are referred to using the `.` operator), and values to compare for equality. This allows you to specify conditions such as `members.filter(("user.bot", True))`. **attrs : typing.Any Alternative to passing 2-tuples. Cannot specify nested attributes using this method. Returns ------- EventStream[ValueT] The current stream with the new filter applied. """ @abc.abstractmethod def __enter__(self: _EventStreamT) -> _EventStreamT: raise NotImplementedError @abc.abstractmethod def __exit__( self, exc_type: typing.Optional[typing.Type[BaseException]], exc: typing.Optional[BaseException], exc_tb: typing.Optional[types.TracebackType], ) -> None: raise NotImplementedError
A base abstract class for all event streamers.
Unlike hikari.iterators.LazyIterator
(which this extends), an event streamer must be started and closed.
Examples
A streamer may either be started and closed using with
syntax where EventStream.open
and EventStream.close
are implicitly called based on context.
with EventStream(app, EventType, timeout=50) as stream:
async for entry in stream:
...
A streamer may also be directly started and closed using the EventStream.close
and EventStream.open
. Note that if you don't call EventStream.close
after opening a streamer when you're finished with it then it may queue events events in memory indefinitely.
stream = EventStream(app, EventType, timeout=50)
await stream.open()
async for event in stream:
...
await stream.close()
See Also
Methods
View Source
def awaiting(self, window_size: int = 10) -> LazyIterator[ValueT]: """Await each item concurrently in a fixed size window. .. warning:: Setting a large window size, or setting it to 0 to await everything is a dangerous thing to do if you are making API calls. Some endpoints will get ratelimited and cause a backup of waiting tasks, others may begin to spam global rate limits instead (the `fetch_user` endpoint seems to be notorious for doing this). .. note:: This call assumes that the iterator contains awaitable values as input. MyPy cannot detect this nicely, so any cast is forced internally. If the item is not awaitable, you will receive a `TypeError` instead. You have been warned. You cannot escape the ways of the duck type young grasshopper. Parameters ---------- window_size : int The window size of how many tasks to await at once. You can set this to `0` to await everything at once, but see the below warning. Returns ------- LazyIterator[ValueT] The new lazy iterator to return. """ # Not type safe. Can I make this type safe? return _AwaitingLazyIterator(typing.cast("LazyIterator[typing.Awaitable[ValueT]]", self), window_size)
Await each item concurrently in a fixed size window.
Warning: Setting a large window size, or setting it to 0 to await everything is a dangerous thing to do if you are making API calls. Some endpoints will get ratelimited and cause a backup of waiting tasks, others may begin to spam global rate limits instead (the fetch_user
endpoint seems to be notorious for doing this).
Note: This call assumes that the iterator contains awaitable values as input. MyPy cannot detect this nicely, so any cast is forced internally. If the item is not awaitable, you will receive a TypeError
instead. You have been warned. You cannot escape the ways of the duck type young grasshopper.
Parameters
- window_size (int): The window size of how many tasks to await at once. You can set this to
0
to await everything at once, but see the below warning.
Returns
- LazyIterator[ValueT]: The new lazy iterator to return.
View Source
def chunk(self, chunk_size: int) -> LazyIterator[typing.Sequence[ValueT]]: """Return results in chunks of up to `chunk_size` amount of entries. Parameters ---------- chunk_size : int The limit for how many results should be returned in each chunk. Returns ------- LazyIterator[typing.Sequence[ValueT]] `LazyIterator` that emits each chunked sequence. """ return _ChunkedLazyIterator(self, chunk_size)
Return results in chunks of up to chunk_size
amount of entries.
Parameters
- chunk_size (int): The limit for how many results should be returned in each chunk.
Returns
- LazyIterator[typing.Sequence[ValueT]]:
LazyIterator
that emits each chunked sequence.
View Source
@abc.abstractmethod def close(self) -> None: """Mark this streamer as closed to stop it from queueing and receiving events. If called on an already closed streamer then this will do nothing. .. note:: `with streamer` may be used as a short-cut for opening and closing a streamer. """
Mark this streamer as closed to stop it from queueing and receiving events.
If called on an already closed streamer then this will do nothing.
Note: with streamer
may be used as a short-cut for opening and closing a streamer.
self,
collector: Callable[[Sequence[~ValueT]], Collection[~ValueT]]
) -> Collection[~ValueT]:
View Source
async def collect( self, collector: typing.Callable[[typing.Sequence[ValueT]], typing.Collection[ValueT]] ) -> typing.Collection[ValueT]: """Collect the results into a given type and return it. Parameters ---------- collector A function that consumes a sequence of values and returns a collection. """ return collector(await self)
Collect the results into a given type and return it.
Parameters
- collector: A function that consumes a sequence of values and returns a collection.
View Source
async def count(self) -> int: """Count the number of results. Returns ------- int Number of results found. """ count = 0 async for _ in self: count += 1 return count
Count the number of results.
Returns
- int: Number of results found.
self,
*,
start: int = 0
) -> hikari.iterators.LazyIterator[typing.Tuple[int, ~ValueT]]:
View Source
def enumerate(self, *, start: int = 0) -> LazyIterator[typing.Tuple[int, ValueT]]: """Enumerate the paginated results lazily. This behaves as an asyncio-friendly version of `enumerate` which uses much less memory than collecting all the results first and calling `enumerate` across them. Parameters ---------- start : int Optional int to start at. If omitted, this is `0`. Examples -------- ```py >>> async for i, item in paginated_results.enumerate(): ... print(i, item) (0, foo) (1, bar) (2, baz) (3, bork) (4, qux) >>> async for i, item in paginated_results.enumerate(start=9): ... print(i, item) (9, foo) (10, bar) (11, baz) (12, bork) (13, qux) >>> async for i, item in paginated_results.enumerate(start=9).limit(3): ... print(i, item) (9, foo) (10, bar) (11, baz) ``` Returns ------- LazyIterator[typing.Tuple[int, T]] A paginated results view that asynchronously yields an increasing counter in a tuple with each result, lazily. """ return _EnumeratedLazyIterator(self, start=start)
Enumerate the paginated results lazily.
This behaves as an asyncio-friendly version of enumerate
which uses much less memory than collecting all the results first and calling enumerate
across them.
Parameters
- start (int): Optional int to start at. If omitted, this is
0
.
Examples
>>> async for i, item in paginated_results.enumerate():
... print(i, item)
(0, foo)
(1, bar)
(2, baz)
(3, bork)
(4, qux)
>>> async for i, item in paginated_results.enumerate(start=9):
... print(i, item)
(9, foo)
(10, bar)
(11, baz)
(12, bork)
(13, qux)
>>> async for i, item in paginated_results.enumerate(start=9).limit(3):
... print(i, item)
(9, foo)
(10, bar)
(11, baz)
Returns
- LazyIterator[typing.Tuple[int, T]]: A paginated results view that asynchronously yields an increasing counter in a tuple with each result, lazily.
self: ~_EventStreamT,
*predicates: Union[Tuple[str, Any], Callable[[~EventT], bool]],
**attrs: Any
) -> ~_EventStreamT:
View Source
@abc.abstractmethod def filter( self: _EventStreamT, *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[base_events.EventT], bool]], **attrs: typing.Any, ) -> _EventStreamT: """Filter the items by one or more conditions. Each condition is treated as a predicate, being called with each item that this iterator would return when it is requested. All conditions must evaluate to `True` for the item to be returned. If this is not met, then the item is discarded and ignored, the next matching item will be returned instead, if there is one. Parameters ---------- *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]] Predicates to invoke. These are functions that take a value and return `True` if it is of interest, or `False` otherwise. These may instead include 2-`tuple` objects consisting of a `str` attribute name (nested attributes are referred to using the `.` operator), and values to compare for equality. This allows you to specify conditions such as `members.filter(("user.bot", True))`. **attrs : typing.Any Alternative to passing 2-tuples. Cannot specify nested attributes using this method. Returns ------- EventStream[ValueT] The current stream with the new filter applied. """
Filter the items by one or more conditions.
Each condition is treated as a predicate, being called with each item that this iterator would return when it is requested.
All conditions must evaluate to True
for the item to be returned. If this is not met, then the item is discarded and ignored, the next matching item will be returned instead, if there is one.
Parameters
- *predicates (typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]): Predicates to invoke. These are functions that take a value and return
True
if it is of interest, orFalse
otherwise. These may instead include 2-tuple
objects consisting of astr
attribute name (nested attributes are referred to using the.
operator), and values to compare for equality. This allows you to specify conditions such asmembers.filter(("user.bot", True))
. - **attrs (typing.Any): Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
Returns
- EventStream[ValueT]: The current stream with the new filter applied.
self,
flattener: Union[hikari.internal.spel.AttrGetter[~ValueT, Union[AsyncIterator[~AnotherValueT], Iterable[~AnotherValueT]]], Callable[[~ValueT], Union[AsyncIterator[~AnotherValueT], Iterable[~AnotherValueT]]]]
) -> hikari.iterators.LazyIterator[~AnotherValueT]:
View Source
def flat_map(self, flattener: _FlattenerT[ValueT, AnotherValueT]) -> LazyIterator[AnotherValueT]: r"""Perform a flat mapping operation. This will pass each item in the iterator to the given `function` parameter, expecting a new `typing.Iterable` or `typing.AsyncIterator` to be returned as the result. This means you can map to a new `LazyIterator`, `typing.AsyncIterator`, `typing.Iterable`, async generator, or generator. Remember that `typing.Iterator` implicitly provides `typing.Iterable` compatibility. This is used to provide lazy conversions, and can be used to implement reactive-like pipelines if desired. All results are combined into one large lazy iterator and yielded lazily. Parameters ---------- flattener A function that returns either an async iterator or iterator of new values. Could be an attribute name instead. Example ------- The following example generates a distinct collection of all mentioned users in the given channel from the past 500 messages. ```py def iter_mentioned_users(message: hikari.Message) -> typing.Iterable[Snowflake]: for match in re.findall(r"<@!?(\d+)>", message.content): yield Snowflake(match) mentioned_users = await ( channel .history() .limit(500) .map(".content") .flat_map(iter_mentioned_users) .distinct() ) ``` Returns ------- LazyIterator[AnotherValueT] The new lazy iterator to return. """ return _FlatMapLazyIterator(self, flattener)
Perform a flat mapping operation.
This will pass each item in the iterator to the given function
parameter, expecting a new typing.Iterable
or typing.AsyncIterator
to be returned as the result. This means you can map to a new LazyIterator
, typing.AsyncIterator
, typing.Iterable
, async generator, or generator.
Remember that typing.Iterator
implicitly provides typing.Iterable
compatibility.
This is used to provide lazy conversions, and can be used to implement reactive-like pipelines if desired.
All results are combined into one large lazy iterator and yielded lazily.
Parameters
- flattener: A function that returns either an async iterator or iterator of new values. Could be an attribute name instead.
Example
The following example generates a distinct collection of all mentioned users in the given channel from the past 500 messages.
def iter_mentioned_users(message: hikari.Message) -> typing.Iterable[Snowflake]:
for match in re.findall(r"<@!?(\d+)>", message.content):
yield Snowflake(match)
mentioned_users = await (
channel
.history()
.limit(500)
.map(".content")
.flat_map(iter_mentioned_users)
.distinct()
)
Returns
- LazyIterator[AnotherValueT]: The new lazy iterator to return.
View Source
async def for_each(self, consumer: typing.Callable[[ValueT], typing.Any]) -> None: """Pass each value to a given consumer immediately.""" if asyncio.iscoroutinefunction(consumer): async for item in self: await consumer(item) else: async for item in self: consumer(item)
Pass each value to a given consumer immediately.
View Source
async def last(self) -> ValueT: """Return the last element of this iterator only. .. note:: This method will consume the whole iterator if run. Returns ------- ValueT The last result. Raises ------ LookupError If no result exists. """ return await self.reversed().next()
Return the last element of this iterator only.
Note: This method will consume the whole iterator if run.
Returns
- ValueT: The last result.
Raises
- LookupError: If no result exists.
View Source
def limit(self, limit: int) -> LazyIterator[ValueT]: """Limit the number of items you receive from this async iterator. Parameters ---------- limit : int The number of items to get. This must be greater than zero. Examples -------- ```py >>> async for item in paginated_results.limit(3): ... print(item) ``` Returns ------- LazyIterator[ValueT] A paginated results view that asynchronously yields a maximum of the given number of items before completing. """ return _LimitedLazyIterator(self, limit)
Limit the number of items you receive from this async iterator.
Parameters
- limit (int): The number of items to get. This must be greater than zero.
Examples
>>> async for item in paginated_results.limit(3):
... print(item)
Returns
- LazyIterator[ValueT]: A paginated results view that asynchronously yields a maximum of the given number of items before completing.
self,
transformation: Union[Callable[[~ValueT], ~AnotherValueT], str]
) -> hikari.iterators.LazyIterator[~AnotherValueT]:
View Source
def map( self, transformation: typing.Union[typing.Callable[[ValueT], AnotherValueT], str], ) -> LazyIterator[AnotherValueT]: """Map the values to a different value. Parameters ---------- transformation : typing.Union[typing.Callable[[ValueT], bool], str] The function to use to map the attribute. This may alternatively be a string attribute name to replace the input value with. You can provide nested attributes using the `.` operator. Returns ------- LazyIterator[AnotherValueT] `LazyIterator` that maps each value to another value. """ if isinstance(transformation, str): transformation = typing.cast("spel.AttrGetter[ValueT, AnotherValueT]", spel.AttrGetter(transformation)) return _MappingLazyIterator(self, transformation)
Map the values to a different value.
Parameters
- transformation (typing.Union[typing.Callable[[ValueT], bool], str]): The function to use to map the attribute. This may alternatively be a string attribute name to replace the input value with. You can provide nested attributes using the
.
operator.
Returns
- LazyIterator[AnotherValueT]:
LazyIterator
that maps each value to another value.
View Source
async def next(self) -> ValueT: """Return the next element of this iterator only. Returns ------- ValueT The next result. Raises ------ LookupError If no more results exist. """ try: return await self.__anext__() except StopAsyncIteration: raise LookupError("No elements were found") from None
Return the next element of this iterator only.
Returns
- ValueT: The next result.
Raises
- LookupError: If no more results exist.
View Source
@abc.abstractmethod def open(self) -> None: """Mark this streamer as opened to let it start receiving and queueing events. If called on an already started streamer then this will do nothing. .. note:: `with streamer` may be used as a short-cut for opening and closing a stream. """
Mark this streamer as opened to let it start receiving and queueing events.
If called on an already started streamer then this will do nothing.
Note: with streamer
may be used as a short-cut for opening and closing a stream.
View Source
def reversed(self) -> LazyIterator[ValueT]: """Return a lazy iterator of the remainder of this iterator's values reversed. Returns ------- LazyIterator[ValueT] The lazy iterator of this iterator's remaining values reversed. """ return _ReversedLazyIterator(self)
Return a lazy iterator of the remainder of this iterator's values reversed.
Returns
- LazyIterator[ValueT]: The lazy iterator of this iterator's remaining values reversed.
View Source
def skip(self, number: int) -> LazyIterator[ValueT]: """Drop the given number of items, then yield anything after. Parameters ---------- number : int The max number of items to drop before any items are yielded. Returns ------- LazyIterator[ValueT] A paginated results view that asynchronously yields all items AFTER the given number of items are discarded first. """ return _DropCountLazyIterator(self, number)
Drop the given number of items, then yield anything after.
Parameters
- number (int): The max number of items to drop before any items are yielded.
Returns
- LazyIterator[ValueT]: A paginated results view that asynchronously yields all items AFTER the given number of items are discarded first.
self,
*predicates: Union[Tuple[str, Any], Callable[[~ValueT], bool]],
**attrs: Any
) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
def skip_until( self, *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[ValueT], bool]], **attrs: typing.Any, ) -> LazyIterator[ValueT]: """Discard items while all conditions are False. Items after this will be yielded as normal. Parameters ---------- *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]] Predicates to invoke. These are functions that take a value and return `True` if it is of interest, or `False` otherwise. These may instead include 2-`tuple` objects consisting of a `str` attribute name (nested attributes are referred to using the `.` operator), and values to compare for equality. This allows you to specify conditions such as `members.skip_until(("user.bot", True))`. **attrs : typing.Any Alternative to passing 2-tuples. Cannot specify nested attributes using this method. Returns ------- LazyIterator[ValueT] LazyIterator that only emits values once a condition has failed. All items before this are discarded. """ conditions = self._map_predicates_and_attr_getters("skip_until", *predicates, **attrs) return _DropWhileLazyIterator(self, ~conditions)
Discard items while all conditions are False.
Items after this will be yielded as normal.
Parameters
- *predicates (typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]): Predicates to invoke. These are functions that take a value and return
True
if it is of interest, orFalse
otherwise. These may instead include 2-tuple
objects consisting of astr
attribute name (nested attributes are referred to using the.
operator), and values to compare for equality. This allows you to specify conditions such asmembers.skip_until(("user.bot", True))
. - **attrs (typing.Any): Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
Returns
- LazyIterator[ValueT]: LazyIterator that only emits values once a condition has failed. All items before this are discarded.
self,
*predicates: Union[Tuple[str, Any], Callable[[~ValueT], bool]],
**attrs: Any
) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
def skip_while( self, *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[ValueT], bool]], **attrs: typing.Any, ) -> LazyIterator[ValueT]: """Discard items while all conditions are True. Items after this will be yielded as normal. Parameters ---------- *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]] Predicates to invoke. These are functions that take a value and return `True` if it is of interest, or `False` otherwise. These may instead include 2-`tuple` objects consisting of a `str` attribute name (nested attributes are referred to using the `.` operator), and values to compare for equality. This allows you to specify conditions such as `members.skip_while(("user.bot", True))`. **attrs : typing.Any Alternative to passing 2-tuples. Cannot specify nested attributes using this method. Returns ------- LazyIterator[ValueT] LazyIterator that only emits values once a condition has been met. All items before this are discarded. """ conditions = self._map_predicates_and_attr_getters("skip_while", *predicates, **attrs) return _DropWhileLazyIterator(self, conditions)
Discard items while all conditions are True.
Items after this will be yielded as normal.
Parameters
- *predicates (typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]): Predicates to invoke. These are functions that take a value and return
True
if it is of interest, orFalse
otherwise. These may instead include 2-tuple
objects consisting of astr
attribute name (nested attributes are referred to using the.
operator), and values to compare for equality. This allows you to specify conditions such asmembers.skip_while(("user.bot", True))
. - **attrs (typing.Any): Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
Returns
- LazyIterator[ValueT]: LazyIterator that only emits values once a condition has been met. All items before this are discarded.
View Source
async def sort(self, *, key: typing.Any = None, reverse: bool = False) -> typing.Sequence[ValueT]: """Collect all results, then sort the collection before returning it.""" return sorted(await self, key=key, reverse=reverse)
Collect all results, then sort the collection before returning it.
self,
*predicates: Union[Tuple[str, Any], Callable[[~ValueT], bool]],
**attrs: Any
) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
def take_until( self, *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[ValueT], bool]], **attrs: typing.Any, ) -> LazyIterator[ValueT]: """Return each item until any conditions pass or the end is reached. Parameters ---------- *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]] Predicates to invoke. These are functions that take a value and return `True` if it is of interest, or `False` otherwise. These may instead include 2-`tuple` objects consisting of a `str` attribute name (nested attributes are referred to using the `.` operator), and values to compare for equality. This allows you to specify conditions such as `members.take_until(("user.bot", True))`. **attrs : typing.Any Alternative to passing 2-tuples. Cannot specify nested attributes using this method. Returns ------- LazyIterator[ValueT] LazyIterator that only emits values until any conditions are matched. """ conditions = self._map_predicates_and_attr_getters("take_until", *predicates, **attrs) return _TakeWhileLazyIterator(self, ~conditions)
Return each item until any conditions pass or the end is reached.
Parameters
- *predicates (typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]): Predicates to invoke. These are functions that take a value and return
True
if it is of interest, orFalse
otherwise. These may instead include 2-tuple
objects consisting of astr
attribute name (nested attributes are referred to using the.
operator), and values to compare for equality. This allows you to specify conditions such asmembers.take_until(("user.bot", True))
. - **attrs (typing.Any): Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
Returns
- LazyIterator[ValueT]: LazyIterator that only emits values until any conditions are matched.
self,
*predicates: Union[Tuple[str, Any], Callable[[~ValueT], bool]],
**attrs: Any
) -> hikari.iterators.LazyIterator[~ValueT]:
View Source
def take_while( self, *predicates: typing.Union[typing.Tuple[str, typing.Any], typing.Callable[[ValueT], bool]], **attrs: typing.Any, ) -> LazyIterator[ValueT]: """Return each item until any conditions fail or the end is reached. Parameters ---------- *predicates : typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]] Predicates to invoke. These are functions that take a value and return `True` if it is of interest, or `False` otherwise. These may instead include 2-`tuple` objects consisting of a `str` attribute name (nested attributes are referred to using the `.` operator), and values to compare for equality. This allows you to specify conditions such as `members.take_while(("user.bot", True))`. **attrs : typing.Any Alternative to passing 2-tuples. Cannot specify nested attributes using this method. Returns ------- LazyIterator[ValueT] LazyIterator that only emits values until any conditions are not matched. """ conditions = self._map_predicates_and_attr_getters("take_while", *predicates, **attrs) return _TakeWhileLazyIterator(self, conditions)
Return each item until any conditions fail or the end is reached.
Parameters
- *predicates (typing.Union[typing.Callable[[ValueT], bool], typing.Tuple[str, typing.Any]]): Predicates to invoke. These are functions that take a value and return
True
if it is of interest, orFalse
otherwise. These may instead include 2-tuple
objects consisting of astr
attribute name (nested attributes are referred to using the.
operator), and values to compare for equality. This allows you to specify conditions such asmembers.take_while(("user.bot", True))
. - **attrs (typing.Any): Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
Returns
- LazyIterator[ValueT]: LazyIterator that only emits values until any conditions are not matched.