Back to top

hikari.impl.event_manager

Event handling logic for more info.

View Source
# -*- coding: utf-8 -*-
# cython: language_level=3
# Copyright (c) 2020 Nekokatt
# Copyright (c) 2021-present davfsa
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Event handling logic for more info."""

from __future__ import annotations

__all__: typing.Sequence[str] = ("EventManagerImpl",)

import asyncio
import base64
import logging
import random
import typing

from hikari import errors
from hikari import intents as intents_
from hikari import presences as presences_
from hikari import snowflakes
from hikari.api import config
from hikari.events import channel_events
from hikari.events import guild_events
from hikari.events import interaction_events
from hikari.events import member_events
from hikari.events import message_events
from hikari.events import reaction_events
from hikari.events import role_events
from hikari.events import scheduled_events
from hikari.events import shard_events
from hikari.events import typing_events
from hikari.events import user_events
from hikari.events import voice_events
from hikari.impl import event_manager_base
from hikari.internal import time
from hikari.internal import ux

if typing.TYPE_CHECKING:
    from hikari import guilds
    from hikari import invites
    from hikari import voices
    from hikari.api import cache as cache_
    from hikari.api import entity_factory as entity_factory_
    from hikari.api import event_factory as event_factory_
    from hikari.api import shard as gateway_shard
    from hikari.internal import data_binding


_LOGGER: typing.Final[logging.Logger] = logging.getLogger("hikari.event_manager")


def _fixed_size_nonce() -> str:
    # This generates nonces of length 28 for use in member chunking.
    head = time.monotonic_ns().to_bytes(8, "big")
    tail = random.getrandbits(92).to_bytes(12, "big")
    return base64.b64encode(head + tail).decode("ascii")


async def _request_guild_members(
    shard: gateway_shard.GatewayShard,
    guild: snowflakes.SnowflakeishOr[guilds.PartialGuild],
    *,
    include_presences: bool,
    nonce: str,
) -> None:
    try:
        await shard.request_guild_members(guild, include_presences=include_presences, nonce=nonce)

    # Ignore errors raised by a shard shutting down
    except errors.ComponentStateConflictError:
        pass


class EventManagerImpl(event_manager_base.EventManagerBase):
    """Provides event handling logic for Discord events."""

    __slots__: typing.Sequence[str] = ("_cache", "_entity_factory")

    def __init__(
        self,
        entity_factory: entity_factory_.EntityFactory,
        event_factory: event_factory_.EventFactory,
        intents: intents_.Intents,
        /,
        *,
        cache: typing.Optional[cache_.MutableCache] = None,
    ) -> None:
        self._cache = cache
        self._entity_factory = entity_factory
        components = cache.settings.components if cache else config.CacheComponents.NONE
        super().__init__(event_factory=event_factory, intents=intents, cache_components=components)

    def _cache_enabled_for(self, components: config.CacheComponents, /) -> bool:
        return self._cache is not None and (self._cache.settings.components & components) == components

    @event_manager_base.filtered(shard_events.ShardReadyEvent, config.CacheComponents.ME)
    async def on_ready(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#ready> for more info."""
        # TODO: cache unavailable guilds on startup, I didn't bother for the time being.
        event = self._event_factory.deserialize_ready_event(shard, payload)

        if self._cache:
            self._cache.update_me(event.my_user)

        await self.dispatch(event)

    @event_manager_base.filtered(shard_events.ShardResumedEvent)
    async def on_resumed(self, shard: gateway_shard.GatewayShard, _: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#resumed> for more info."""
        await self.dispatch(self._event_factory.deserialize_resumed_event(shard))

    @event_manager_base.filtered(channel_events.GuildChannelCreateEvent, config.CacheComponents.GUILD_CHANNELS)
    async def on_channel_create(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#channel-create> for more info."""
        event = self._event_factory.deserialize_guild_channel_create_event(shard, payload)

        if self._cache:
            self._cache.set_guild_channel(event.channel)

        await self.dispatch(event)

    @event_manager_base.filtered(channel_events.GuildChannelUpdateEvent, config.CacheComponents.GUILD_CHANNELS)
    async def on_channel_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#channel-update> for more info."""
        old = self._cache.get_guild_channel(snowflakes.Snowflake(payload["id"])) if self._cache else None
        event = self._event_factory.deserialize_guild_channel_update_event(shard, payload, old_channel=old)

        if self._cache:
            self._cache.update_guild_channel(event.channel)

        await self.dispatch(event)

    @event_manager_base.filtered(channel_events.GuildChannelDeleteEvent, config.CacheComponents.GUILD_CHANNELS)
    async def on_channel_delete(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#channel-delete> for more info."""
        event = self._event_factory.deserialize_guild_channel_delete_event(shard, payload)

        if self._cache:
            self._cache.delete_guild_channel(event.channel.id)

        await self.dispatch(event)

    @event_manager_base.filtered((channel_events.GuildPinsUpdateEvent, channel_events.DMPinsUpdateEvent))
    async def on_channel_pins_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#channel-pins-update> for more info."""
        # TODO: we need a method for this specifically
        await self.dispatch(self._event_factory.deserialize_channel_pins_update_event(shard, payload))

    # Internal granularity is preferred for GUILD_CREATE over decorator based filtering due to its large scope.
    async def on_guild_create(  # noqa: C901 - Function too complex
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-create> for more info."""
        event: typing.Union[guild_events.GuildAvailableEvent, guild_events.GuildJoinEvent, None]

        if "unavailable" in payload and self._enabled_for_event(guild_events.GuildAvailableEvent):
            event = self._event_factory.deserialize_guild_available_event(shard, payload)
        elif "unavailable" not in payload and self._enabled_for_event(guild_events.GuildJoinEvent):
            event = self._event_factory.deserialize_guild_join_event(shard, payload)
        else:
            event = None

        if event:
            # We also filter here to prevent iterating over them and calling a function that won't do anything
            channels = event.channels if self._cache_enabled_for(config.CacheComponents.GUILD_CHANNELS) else None
            emojis = event.emojis if self._cache_enabled_for(config.CacheComponents.EMOJIS) else None
            guild = event.guild if self._cache_enabled_for(config.CacheComponents.GUILDS) else None
            guild_id = event.guild.id
            members = event.members if self._cache_enabled_for(config.CacheComponents.MEMBERS) else None
            presences = event.presences if self._cache_enabled_for(config.CacheComponents.PRESENCES) else None
            roles = event.roles if self._cache_enabled_for(config.CacheComponents.ROLES) else None
            voice_states = event.voice_states if self._cache_enabled_for(config.CacheComponents.VOICE_STATES) else None

        elif self._cache:
            _LOGGER.log(ux.TRACE, "Skipping on_guild_create dispatch due to lack of any registered listeners")
            gd = self._entity_factory.deserialize_gateway_guild(payload)

            channels = gd.channels() if self._cache_enabled_for(config.CacheComponents.GUILD_CHANNELS) else None
            emojis = gd.emojis() if self._cache_enabled_for(config.CacheComponents.EMOJIS) else None
            guild = gd.guild() if self._cache_enabled_for(config.CacheComponents.GUILDS) else None
            guild_id = gd.id
            members = gd.members() if self._cache_enabled_for(config.CacheComponents.MEMBERS) else None
            presences = gd.presences() if self._cache_enabled_for(config.CacheComponents.PRESENCES) else None
            roles = gd.roles() if self._cache_enabled_for(config.CacheComponents.ROLES) else None
            voice_states = gd.voice_states() if self._cache_enabled_for(config.CacheComponents.VOICE_STATES) else None

        else:
            _LOGGER.log(
                ux.TRACE, "Skipping on_guild_create raw dispatch due to lack of any registered listeners or cache need"
            )

            channels = None
            emojis = None
            guild = None
            guild_id = snowflakes.Snowflake(payload["id"])
            members = None
            presences = None
            roles = None
            voice_states = None

        if self._cache:
            if guild:
                self._cache.update_guild(guild)

            if channels:
                self._cache.clear_guild_channels_for_guild(guild_id)
                for channel in channels.values():
                    self._cache.set_guild_channel(channel)

            if emojis:
                self._cache.clear_emojis_for_guild(guild_id)
                for emoji in emojis.values():
                    self._cache.set_emoji(emoji)

            if roles:
                self._cache.clear_roles_for_guild(guild_id)
                for role in roles.values():
                    self._cache.set_role(role)

            if members:
                # TODO: do we really want to invalidate these all after an outage.
                self._cache.clear_members_for_guild(guild_id)
                for member in members.values():
                    self._cache.set_member(member)

            if presences:
                self._cache.clear_presences_for_guild(guild_id)
                for presence in presences.values():
                    self._cache.set_presence(presence)

            if voice_states:
                self._cache.clear_voice_states_for_guild(guild_id)
                for voice_state in voice_states.values():
                    self._cache.set_voice_state(voice_state)

        presences_declared = self._intents & intents_.Intents.GUILD_PRESENCES

        # When intents are enabled Discord will only send other member objects on the guild create
        # payload if presence intents are also declared, so if this isn't the case then we also want
        # to chunk small guilds.
        if (
            self._intents & intents_.Intents.GUILD_MEMBERS
            and (payload.get("large") or not presences_declared)
            and (
                self._cache_enabled_for(config.CacheComponents.MEMBERS)
                or self._enabled_for_event(shard_events.MemberChunkEvent)
            )
        ):
            # We create a task here instead of awaiting the result to avoid any rate-limits from delaying dispatch.
            nonce = f"{shard.id}.{_fixed_size_nonce()}"

            if event:
                event.chunk_nonce = nonce

            coroutine = _request_guild_members(shard, guild_id, include_presences=bool(presences_declared), nonce=nonce)
            asyncio.create_task(coroutine, name=f"{shard.id}:{guild_id} guild create members request")

        if event:
            await self.dispatch(event)

    # Internal granularity is preferred for GUILD_UPDATE over decorator based filtering due to its large scope.
    async def on_guild_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-update> for more info."""
        event: typing.Optional[guild_events.GuildUpdateEvent]
        if self._enabled_for_event(guild_events.GuildUpdateEvent):
            guild_id = snowflakes.Snowflake(payload["id"])
            old = self._cache.get_guild(guild_id) if self._cache else None
            event = self._event_factory.deserialize_guild_update_event(shard, payload, old_guild=old)

            # We also filter here to prevent iterating over them and calling a function that won't do anything
            emojis = event.emojis if self._cache_enabled_for(config.CacheComponents.EMOJIS) else None
            guild = event.guild if self._cache_enabled_for(config.CacheComponents.GUILDS) else None
            roles = event.roles if self._cache_enabled_for(config.CacheComponents.ROLES) else None

        elif self._cache:
            _LOGGER.log(ux.TRACE, "Skipping on_guild_update raw dispatch due to lack of any registered listeners")
            event = None

            gd = self._entity_factory.deserialize_gateway_guild(payload)
            emojis = gd.emojis() if self._cache_enabled_for(config.CacheComponents.EMOJIS) else None
            guild = gd.guild() if self._cache_enabled_for(config.CacheComponents.GUILDS) else None
            guild_id = gd.id
            roles = gd.roles() if self._cache_enabled_for(config.CacheComponents.ROLES) else None

        else:
            _LOGGER.log(
                ux.TRACE, "Skipping on_guild_update raw dispatch due to lack of any registered listeners or cache need"
            )
            return

        if self._cache:
            if guild:
                self._cache.update_guild(guild)

            if emojis:
                self._cache.clear_emojis_for_guild(guild_id)
                for emoji in emojis.values():
                    self._cache.set_emoji(emoji)

            if roles:
                self._cache.clear_roles_for_guild(guild_id)
                for role in roles.values():
                    self._cache.set_role(role)

        if event:
            await self.dispatch(event)

    @event_manager_base.filtered(
        (guild_events.GuildLeaveEvent, guild_events.GuildUnavailableEvent),
        config.CacheComponents.GUILDS
        | config.CacheComponents.GUILD_CHANNELS
        | config.CacheComponents.EMOJIS
        | config.CacheComponents.ROLES
        | config.CacheComponents.PRESENCES
        | config.CacheComponents.VOICE_STATES
        | config.CacheComponents.MEMBERS,
    )
    async def on_guild_delete(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-delete> for more info."""
        event: typing.Union[guild_events.GuildUnavailableEvent, guild_events.GuildLeaveEvent]
        if payload.get("unavailable"):
            event = self._event_factory.deserialize_guild_unavailable_event(shard, payload)

            if self._cache:
                self._cache.set_guild_availability(event.guild_id, False)

        else:
            old: typing.Optional[guilds.GatewayGuild] = None
            if self._cache:
                guild_id = snowflakes.Snowflake(payload["id"])
                #  TODO: this doesn't work in all intent scenarios
                old = self._cache.delete_guild(guild_id)
                self._cache.clear_voice_states_for_guild(guild_id)
                self._cache.clear_invites_for_guild(guild_id)
                self._cache.clear_members_for_guild(guild_id)
                self._cache.clear_presences_for_guild(guild_id)
                self._cache.clear_guild_channels_for_guild(guild_id)
                self._cache.clear_emojis_for_guild(guild_id)
                self._cache.clear_roles_for_guild(guild_id)

            event = self._event_factory.deserialize_guild_leave_event(shard, payload, old_guild=old)

        await self.dispatch(event)

    @event_manager_base.filtered(guild_events.BanCreateEvent)
    async def on_guild_ban_add(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-ban-add> for more info."""
        await self.dispatch(self._event_factory.deserialize_guild_ban_add_event(shard, payload))

    @event_manager_base.filtered(guild_events.BanDeleteEvent)
    async def on_guild_ban_remove(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-ban-remove> for more info."""
        await self.dispatch(self._event_factory.deserialize_guild_ban_remove_event(shard, payload))

    @event_manager_base.filtered(guild_events.EmojisUpdateEvent, config.CacheComponents.EMOJIS)
    async def on_guild_emojis_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-emojis-update> for more info."""
        guild_id = snowflakes.Snowflake(payload["guild_id"])
        old = list(self._cache.clear_emojis_for_guild(guild_id).values()) if self._cache else None

        event = self._event_factory.deserialize_guild_emojis_update_event(shard, payload, old_emojis=old)

        if self._cache:
            for emoji in event.emojis:
                self._cache.set_emoji(emoji)

        await self.dispatch(event)

    @event_manager_base.filtered(())  # An empty sequence here means that this method will always be skipped.
    async def on_guild_integrations_update(self, _: gateway_shard.GatewayShard, __: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-integrations-update> for more info."""
        # This is only here to stop this being logged or dispatched as an "unknown event".
        # This event is made redundant by INTEGRATION_CREATE/DELETE/UPDATE and is thus not parsed or dispatched.
        raise NotImplementedError

    @event_manager_base.filtered(guild_events.IntegrationCreateEvent)
    async def on_integration_create(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        event = self._event_factory.deserialize_integration_create_event(shard, payload)
        await self.dispatch(event)

    @event_manager_base.filtered(guild_events.IntegrationDeleteEvent)
    async def on_integration_delete(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        event = self._event_factory.deserialize_integration_delete_event(shard, payload)
        await self.dispatch(event)

    @event_manager_base.filtered(guild_events.IntegrationUpdateEvent)
    async def on_integration_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        event = self._event_factory.deserialize_integration_update_event(shard, payload)
        await self.dispatch(event)

    @event_manager_base.filtered(member_events.MemberCreateEvent, config.CacheComponents.MEMBERS)
    async def on_guild_member_add(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-member-add> for more info."""
        event = self._event_factory.deserialize_guild_member_add_event(shard, payload)

        if self._cache:
            self._cache.update_member(event.member)

        await self.dispatch(event)

    @event_manager_base.filtered(member_events.MemberDeleteEvent, config.CacheComponents.MEMBERS)
    async def on_guild_member_remove(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-member-remove> for more info."""
        old: typing.Optional[guilds.Member] = None
        if self._cache:
            old = self._cache.delete_member(
                snowflakes.Snowflake(payload["guild_id"]), snowflakes.Snowflake(payload["user"]["id"])
            )

        event = self._event_factory.deserialize_guild_member_remove_event(shard, payload, old_member=old)
        await self.dispatch(event)

    @event_manager_base.filtered(member_events.MemberUpdateEvent, config.CacheComponents.MEMBERS)
    async def on_guild_member_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-member-update> for more info."""
        old: typing.Optional[guilds.Member] = None
        if self._cache:
            old = self._cache.get_member(
                snowflakes.Snowflake(payload["guild_id"]), snowflakes.Snowflake(payload["user"]["id"])
            )

        event = self._event_factory.deserialize_guild_member_update_event(shard, payload, old_member=old)

        if self._cache:
            self._cache.update_member(event.member)

        await self.dispatch(event)

    @event_manager_base.filtered(shard_events.MemberChunkEvent, config.CacheComponents.MEMBERS)
    async def on_guild_members_chunk(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-members-chunk> for more info."""
        event = self._event_factory.deserialize_guild_member_chunk_event(shard, payload)

        if self._cache:
            for member in event.members.values():
                self._cache.set_member(member)

            for presence in event.presences.values():
                self._cache.set_presence(presence)

        await self.dispatch(event)

    @event_manager_base.filtered(role_events.RoleCreateEvent, config.CacheComponents.ROLES)
    async def on_guild_role_create(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-role-create> for more info."""
        event = self._event_factory.deserialize_guild_role_create_event(shard, payload)

        if self._cache:
            self._cache.set_role(event.role)

        await self.dispatch(event)

    @event_manager_base.filtered(role_events.RoleUpdateEvent, config.CacheComponents.ROLES)
    async def on_guild_role_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-role-update> for more info."""
        old = self._cache.get_role(snowflakes.Snowflake(payload["role"]["id"])) if self._cache else None
        event = self._event_factory.deserialize_guild_role_update_event(shard, payload, old_role=old)

        if self._cache:
            self._cache.update_role(event.role)

        await self.dispatch(event)

    @event_manager_base.filtered(role_events.RoleDeleteEvent, config.CacheComponents.ROLES)
    async def on_guild_role_delete(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-role-delete> for more info."""
        old: typing.Optional[guilds.Role] = None
        if self._cache:
            old = self._cache.delete_role(snowflakes.Snowflake(payload["role_id"]))

        event = self._event_factory.deserialize_guild_role_delete_event(shard, payload, old_role=old)

        await self.dispatch(event)

    @event_manager_base.filtered(channel_events.InviteCreateEvent, config.CacheComponents.INVITES)
    async def on_invite_create(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#invite-create> for more info."""
        event = self._event_factory.deserialize_invite_create_event(shard, payload)

        if self._cache:
            self._cache.set_invite(event.invite)

        await self.dispatch(event)

    @event_manager_base.filtered(channel_events.InviteDeleteEvent, config.CacheComponents.INVITES)
    async def on_invite_delete(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#invite-delete> for more info."""
        old: typing.Optional[invites.InviteWithMetadata] = None
        if self._cache:
            old = self._cache.delete_invite(payload["code"])

        event = self._event_factory.deserialize_invite_delete_event(shard, payload, old_invite=old)

        await self.dispatch(event)

    @event_manager_base.filtered(
        (message_events.GuildMessageCreateEvent, message_events.DMMessageCreateEvent), config.CacheComponents.MESSAGES
    )
    async def on_message_create(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-create> for more info."""
        event = self._event_factory.deserialize_message_create_event(shard, payload)

        if self._cache:
            self._cache.set_message(event.message)

        await self.dispatch(event)

    @event_manager_base.filtered(
        (message_events.GuildMessageUpdateEvent, message_events.DMMessageUpdateEvent), config.CacheComponents.MESSAGES
    )
    async def on_message_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-update> for more info."""
        old = self._cache.get_message(snowflakes.Snowflake(payload["id"])) if self._cache else None
        event = self._event_factory.deserialize_message_update_event(shard, payload, old_message=old)

        if self._cache:
            self._cache.update_message(event.message)

        await self.dispatch(event)

    @event_manager_base.filtered(
        (message_events.GuildMessageDeleteEvent, message_events.DMMessageDeleteEvent), config.CacheComponents.MESSAGES
    )
    async def on_message_delete(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-delete> for more info."""
        if self._cache:
            message_id = snowflakes.Snowflake(payload["id"])
            old_message = self._cache.delete_message(message_id)
        else:
            old_message = None

        event = self._event_factory.deserialize_message_delete_event(shard, payload, old_message=old_message)

        await self.dispatch(event)

    @event_manager_base.filtered(
        (message_events.GuildMessageDeleteEvent, message_events.DMMessageDeleteEvent), config.CacheComponents.MESSAGES
    )
    async def on_message_delete_bulk(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-delete-bulk> for more info."""
        old_messages = {}

        if self._cache:
            for message_id in payload["ids"]:
                message_id = snowflakes.Snowflake(message_id)

                if message := self._cache.delete_message(message_id):
                    old_messages[message_id] = message

        await self.dispatch(
            self._event_factory.deserialize_guild_message_delete_bulk_event(shard, payload, old_messages=old_messages)
        )

    @event_manager_base.filtered((reaction_events.GuildReactionAddEvent, reaction_events.DMReactionAddEvent))
    async def on_message_reaction_add(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-reaction-add> for more info."""
        await self.dispatch(self._event_factory.deserialize_message_reaction_add_event(shard, payload))

    # TODO: this is unlikely but reaction cache?

    @event_manager_base.filtered((reaction_events.GuildReactionDeleteEvent, reaction_events.DMReactionDeleteEvent))
    async def on_message_reaction_remove(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-reaction-remove> for more info."""
        await self.dispatch(self._event_factory.deserialize_message_reaction_remove_event(shard, payload))

    @event_manager_base.filtered(
        (reaction_events.GuildReactionDeleteAllEvent, reaction_events.DMReactionDeleteAllEvent)
    )
    async def on_message_reaction_remove_all(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-reaction-remove-all> for more info."""
        await self.dispatch(self._event_factory.deserialize_message_reaction_remove_all_event(shard, payload))

    @event_manager_base.filtered(
        (reaction_events.GuildReactionDeleteEmojiEvent, reaction_events.DMReactionDeleteEmojiEvent)
    )
    async def on_message_reaction_remove_emoji(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-reaction-remove-emoji> for more info."""
        await self.dispatch(self._event_factory.deserialize_message_reaction_remove_emoji_event(shard, payload))

    @event_manager_base.filtered(guild_events.PresenceUpdateEvent, config.CacheComponents.PRESENCES)
    async def on_presence_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#presence-update> for more info."""
        old: typing.Optional[presences_.MemberPresence] = None

        if self._cache:
            old = self._cache.get_presence(
                snowflakes.Snowflake(payload["guild_id"]), snowflakes.Snowflake(payload["user"]["id"])
            )

        event = self._event_factory.deserialize_presence_update_event(shard, payload, old_presence=old)

        if self._cache and event.presence.visible_status is presences_.Status.OFFLINE:
            self._cache.delete_presence(event.presence.guild_id, event.presence.user_id)
        elif self._cache:
            self._cache.update_presence(event.presence)

        # TODO: update user here when partial_user is set self._cache.update_user(event.partial_user)
        await self.dispatch(event)

    @event_manager_base.filtered((typing_events.GuildTypingEvent, typing_events.DMTypingEvent))
    async def on_typing_start(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#typing-start> for more info."""
        await self.dispatch(self._event_factory.deserialize_typing_start_event(shard, payload))

    @event_manager_base.filtered(user_events.OwnUserUpdateEvent, config.CacheComponents.ME)
    async def on_user_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#user-update> for more info."""
        old = self._cache.get_me() if self._cache else None
        event = self._event_factory.deserialize_own_user_update_event(shard, payload, old_user=old)

        if self._cache:
            self._cache.update_me(event.user)

        await self.dispatch(event)

    @event_manager_base.filtered(voice_events.VoiceStateUpdateEvent, config.CacheComponents.VOICE_STATES)
    async def on_voice_state_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#voice-state-update> for more info."""
        old: typing.Optional[voices.VoiceState] = None
        if self._cache:
            old = self._cache.get_voice_state(
                snowflakes.Snowflake(payload["guild_id"]), snowflakes.Snowflake(payload["user_id"])
            )

        event = self._event_factory.deserialize_voice_state_update_event(shard, payload, old_state=old)

        if self._cache and event.state.channel_id is None:
            self._cache.delete_voice_state(event.state.guild_id, event.state.user_id)
        elif self._cache:
            self._cache.update_voice_state(event.state)

        await self.dispatch(event)

    @event_manager_base.filtered(voice_events.VoiceServerUpdateEvent)
    async def on_voice_server_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#voice-server-update> for more info."""
        await self.dispatch(self._event_factory.deserialize_voice_server_update_event(shard, payload))

    @event_manager_base.filtered(channel_events.WebhookUpdateEvent)
    async def on_webhooks_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#webhooks-update> for more info."""
        await self.dispatch(self._event_factory.deserialize_webhook_update_event(shard, payload))

    @event_manager_base.filtered(interaction_events.InteractionCreateEvent)
    async def on_interaction_create(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#interaction-create> for more info."""
        await self.dispatch(self._event_factory.deserialize_interaction_create_event(shard, payload))

    @event_manager_base.filtered(scheduled_events.ScheduledEventCreateEvent)
    async def on_guild_scheduled_event_create(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See https://discord.com/developers/docs/topics/gateway#guild-scheduled-event-create for more info."""
        await self.dispatch(self._event_factory.deserialize_scheduled_event_create_event(shard, payload))

    @event_manager_base.filtered(scheduled_events.ScheduledEventDeleteEvent)
    async def on_guild_scheduled_event_delete(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See https://discord.com/developers/docs/topics/gateway#guild-scheduled-event-delete for more info."""
        await self.dispatch(self._event_factory.deserialize_scheduled_event_delete_event(shard, payload))

    @event_manager_base.filtered(scheduled_events.ScheduledEventUpdateEvent)
    async def on_guild_scheduled_event_update(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See https://discord.com/developers/docs/topics/gateway#guild-scheduled-event-update for more info."""
        await self.dispatch(self._event_factory.deserialize_scheduled_event_update_event(shard, payload))

    @event_manager_base.filtered(scheduled_events.ScheduledEventUserAddEvent)
    async def on_guild_scheduled_event_user_add(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See https://discord.com/developers/docs/topics/gateway#guild-scheduled-event-user-add for more info."""
        await self.dispatch(self._event_factory.deserialize_scheduled_event_user_add_event(shard, payload))

    @event_manager_base.filtered(scheduled_events.ScheduledEventUserRemoveEvent)
    async def on_guild_scheduled_event_user_remove(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See https://discord.com/developers/docs/topics/gateway#guild-scheduled-event-user-remove for more info."""
        await self.dispatch(self._event_factory.deserialize_scheduled_event_user_remove_event(shard, payload))
View Source
class EventManagerImpl(event_manager_base.EventManagerBase):
    """Provides event handling logic for Discord events."""

    __slots__: typing.Sequence[str] = ("_cache", "_entity_factory")

    def __init__(
        self,
        entity_factory: entity_factory_.EntityFactory,
        event_factory: event_factory_.EventFactory,
        intents: intents_.Intents,
        /,
        *,
        cache: typing.Optional[cache_.MutableCache] = None,
    ) -> None:
        self._cache = cache
        self._entity_factory = entity_factory
        components = cache.settings.components if cache else config.CacheComponents.NONE
        super().__init__(event_factory=event_factory, intents=intents, cache_components=components)

    def _cache_enabled_for(self, components: config.CacheComponents, /) -> bool:
        return self._cache is not None and (self._cache.settings.components & components) == components

    @event_manager_base.filtered(shard_events.ShardReadyEvent, config.CacheComponents.ME)
    async def on_ready(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#ready> for more info."""
        # TODO: cache unavailable guilds on startup, I didn't bother for the time being.
        event = self._event_factory.deserialize_ready_event(shard, payload)

        if self._cache:
            self._cache.update_me(event.my_user)

        await self.dispatch(event)

    @event_manager_base.filtered(shard_events.ShardResumedEvent)
    async def on_resumed(self, shard: gateway_shard.GatewayShard, _: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#resumed> for more info."""
        await self.dispatch(self._event_factory.deserialize_resumed_event(shard))

    @event_manager_base.filtered(channel_events.GuildChannelCreateEvent, config.CacheComponents.GUILD_CHANNELS)
    async def on_channel_create(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#channel-create> for more info."""
        event = self._event_factory.deserialize_guild_channel_create_event(shard, payload)

        if self._cache:
            self._cache.set_guild_channel(event.channel)

        await self.dispatch(event)

    @event_manager_base.filtered(channel_events.GuildChannelUpdateEvent, config.CacheComponents.GUILD_CHANNELS)
    async def on_channel_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#channel-update> for more info."""
        old = self._cache.get_guild_channel(snowflakes.Snowflake(payload["id"])) if self._cache else None
        event = self._event_factory.deserialize_guild_channel_update_event(shard, payload, old_channel=old)

        if self._cache:
            self._cache.update_guild_channel(event.channel)

        await self.dispatch(event)

    @event_manager_base.filtered(channel_events.GuildChannelDeleteEvent, config.CacheComponents.GUILD_CHANNELS)
    async def on_channel_delete(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#channel-delete> for more info."""
        event = self._event_factory.deserialize_guild_channel_delete_event(shard, payload)

        if self._cache:
            self._cache.delete_guild_channel(event.channel.id)

        await self.dispatch(event)

    @event_manager_base.filtered((channel_events.GuildPinsUpdateEvent, channel_events.DMPinsUpdateEvent))
    async def on_channel_pins_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#channel-pins-update> for more info."""
        # TODO: we need a method for this specifically
        await self.dispatch(self._event_factory.deserialize_channel_pins_update_event(shard, payload))

    # Internal granularity is preferred for GUILD_CREATE over decorator based filtering due to its large scope.
    async def on_guild_create(  # noqa: C901 - Function too complex
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-create> for more info."""
        event: typing.Union[guild_events.GuildAvailableEvent, guild_events.GuildJoinEvent, None]

        if "unavailable" in payload and self._enabled_for_event(guild_events.GuildAvailableEvent):
            event = self._event_factory.deserialize_guild_available_event(shard, payload)
        elif "unavailable" not in payload and self._enabled_for_event(guild_events.GuildJoinEvent):
            event = self._event_factory.deserialize_guild_join_event(shard, payload)
        else:
            event = None

        if event:
            # We also filter here to prevent iterating over them and calling a function that won't do anything
            channels = event.channels if self._cache_enabled_for(config.CacheComponents.GUILD_CHANNELS) else None
            emojis = event.emojis if self._cache_enabled_for(config.CacheComponents.EMOJIS) else None
            guild = event.guild if self._cache_enabled_for(config.CacheComponents.GUILDS) else None
            guild_id = event.guild.id
            members = event.members if self._cache_enabled_for(config.CacheComponents.MEMBERS) else None
            presences = event.presences if self._cache_enabled_for(config.CacheComponents.PRESENCES) else None
            roles = event.roles if self._cache_enabled_for(config.CacheComponents.ROLES) else None
            voice_states = event.voice_states if self._cache_enabled_for(config.CacheComponents.VOICE_STATES) else None

        elif self._cache:
            _LOGGER.log(ux.TRACE, "Skipping on_guild_create dispatch due to lack of any registered listeners")
            gd = self._entity_factory.deserialize_gateway_guild(payload)

            channels = gd.channels() if self._cache_enabled_for(config.CacheComponents.GUILD_CHANNELS) else None
            emojis = gd.emojis() if self._cache_enabled_for(config.CacheComponents.EMOJIS) else None
            guild = gd.guild() if self._cache_enabled_for(config.CacheComponents.GUILDS) else None
            guild_id = gd.id
            members = gd.members() if self._cache_enabled_for(config.CacheComponents.MEMBERS) else None
            presences = gd.presences() if self._cache_enabled_for(config.CacheComponents.PRESENCES) else None
            roles = gd.roles() if self._cache_enabled_for(config.CacheComponents.ROLES) else None
            voice_states = gd.voice_states() if self._cache_enabled_for(config.CacheComponents.VOICE_STATES) else None

        else:
            _LOGGER.log(
                ux.TRACE, "Skipping on_guild_create raw dispatch due to lack of any registered listeners or cache need"
            )

            channels = None
            emojis = None
            guild = None
            guild_id = snowflakes.Snowflake(payload["id"])
            members = None
            presences = None
            roles = None
            voice_states = None

        if self._cache:
            if guild:
                self._cache.update_guild(guild)

            if channels:
                self._cache.clear_guild_channels_for_guild(guild_id)
                for channel in channels.values():
                    self._cache.set_guild_channel(channel)

            if emojis:
                self._cache.clear_emojis_for_guild(guild_id)
                for emoji in emojis.values():
                    self._cache.set_emoji(emoji)

            if roles:
                self._cache.clear_roles_for_guild(guild_id)
                for role in roles.values():
                    self._cache.set_role(role)

            if members:
                # TODO: do we really want to invalidate these all after an outage.
                self._cache.clear_members_for_guild(guild_id)
                for member in members.values():
                    self._cache.set_member(member)

            if presences:
                self._cache.clear_presences_for_guild(guild_id)
                for presence in presences.values():
                    self._cache.set_presence(presence)

            if voice_states:
                self._cache.clear_voice_states_for_guild(guild_id)
                for voice_state in voice_states.values():
                    self._cache.set_voice_state(voice_state)

        presences_declared = self._intents & intents_.Intents.GUILD_PRESENCES

        # When intents are enabled Discord will only send other member objects on the guild create
        # payload if presence intents are also declared, so if this isn't the case then we also want
        # to chunk small guilds.
        if (
            self._intents & intents_.Intents.GUILD_MEMBERS
            and (payload.get("large") or not presences_declared)
            and (
                self._cache_enabled_for(config.CacheComponents.MEMBERS)
                or self._enabled_for_event(shard_events.MemberChunkEvent)
            )
        ):
            # We create a task here instead of awaiting the result to avoid any rate-limits from delaying dispatch.
            nonce = f"{shard.id}.{_fixed_size_nonce()}"

            if event:
                event.chunk_nonce = nonce

            coroutine = _request_guild_members(shard, guild_id, include_presences=bool(presences_declared), nonce=nonce)
            asyncio.create_task(coroutine, name=f"{shard.id}:{guild_id} guild create members request")

        if event:
            await self.dispatch(event)

    # Internal granularity is preferred for GUILD_UPDATE over decorator based filtering due to its large scope.
    async def on_guild_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-update> for more info."""
        event: typing.Optional[guild_events.GuildUpdateEvent]
        if self._enabled_for_event(guild_events.GuildUpdateEvent):
            guild_id = snowflakes.Snowflake(payload["id"])
            old = self._cache.get_guild(guild_id) if self._cache else None
            event = self._event_factory.deserialize_guild_update_event(shard, payload, old_guild=old)

            # We also filter here to prevent iterating over them and calling a function that won't do anything
            emojis = event.emojis if self._cache_enabled_for(config.CacheComponents.EMOJIS) else None
            guild = event.guild if self._cache_enabled_for(config.CacheComponents.GUILDS) else None
            roles = event.roles if self._cache_enabled_for(config.CacheComponents.ROLES) else None

        elif self._cache:
            _LOGGER.log(ux.TRACE, "Skipping on_guild_update raw dispatch due to lack of any registered listeners")
            event = None

            gd = self._entity_factory.deserialize_gateway_guild(payload)
            emojis = gd.emojis() if self._cache_enabled_for(config.CacheComponents.EMOJIS) else None
            guild = gd.guild() if self._cache_enabled_for(config.CacheComponents.GUILDS) else None
            guild_id = gd.id
            roles = gd.roles() if self._cache_enabled_for(config.CacheComponents.ROLES) else None

        else:
            _LOGGER.log(
                ux.TRACE, "Skipping on_guild_update raw dispatch due to lack of any registered listeners or cache need"
            )
            return

        if self._cache:
            if guild:
                self._cache.update_guild(guild)

            if emojis:
                self._cache.clear_emojis_for_guild(guild_id)
                for emoji in emojis.values():
                    self._cache.set_emoji(emoji)

            if roles:
                self._cache.clear_roles_for_guild(guild_id)
                for role in roles.values():
                    self._cache.set_role(role)

        if event:
            await self.dispatch(event)

    @event_manager_base.filtered(
        (guild_events.GuildLeaveEvent, guild_events.GuildUnavailableEvent),
        config.CacheComponents.GUILDS
        | config.CacheComponents.GUILD_CHANNELS
        | config.CacheComponents.EMOJIS
        | config.CacheComponents.ROLES
        | config.CacheComponents.PRESENCES
        | config.CacheComponents.VOICE_STATES
        | config.CacheComponents.MEMBERS,
    )
    async def on_guild_delete(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-delete> for more info."""
        event: typing.Union[guild_events.GuildUnavailableEvent, guild_events.GuildLeaveEvent]
        if payload.get("unavailable"):
            event = self._event_factory.deserialize_guild_unavailable_event(shard, payload)

            if self._cache:
                self._cache.set_guild_availability(event.guild_id, False)

        else:
            old: typing.Optional[guilds.GatewayGuild] = None
            if self._cache:
                guild_id = snowflakes.Snowflake(payload["id"])
                #  TODO: this doesn't work in all intent scenarios
                old = self._cache.delete_guild(guild_id)
                self._cache.clear_voice_states_for_guild(guild_id)
                self._cache.clear_invites_for_guild(guild_id)
                self._cache.clear_members_for_guild(guild_id)
                self._cache.clear_presences_for_guild(guild_id)
                self._cache.clear_guild_channels_for_guild(guild_id)
                self._cache.clear_emojis_for_guild(guild_id)
                self._cache.clear_roles_for_guild(guild_id)

            event = self._event_factory.deserialize_guild_leave_event(shard, payload, old_guild=old)

        await self.dispatch(event)

    @event_manager_base.filtered(guild_events.BanCreateEvent)
    async def on_guild_ban_add(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-ban-add> for more info."""
        await self.dispatch(self._event_factory.deserialize_guild_ban_add_event(shard, payload))

    @event_manager_base.filtered(guild_events.BanDeleteEvent)
    async def on_guild_ban_remove(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-ban-remove> for more info."""
        await self.dispatch(self._event_factory.deserialize_guild_ban_remove_event(shard, payload))

    @event_manager_base.filtered(guild_events.EmojisUpdateEvent, config.CacheComponents.EMOJIS)
    async def on_guild_emojis_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-emojis-update> for more info."""
        guild_id = snowflakes.Snowflake(payload["guild_id"])
        old = list(self._cache.clear_emojis_for_guild(guild_id).values()) if self._cache else None

        event = self._event_factory.deserialize_guild_emojis_update_event(shard, payload, old_emojis=old)

        if self._cache:
            for emoji in event.emojis:
                self._cache.set_emoji(emoji)

        await self.dispatch(event)

    @event_manager_base.filtered(())  # An empty sequence here means that this method will always be skipped.
    async def on_guild_integrations_update(self, _: gateway_shard.GatewayShard, __: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-integrations-update> for more info."""
        # This is only here to stop this being logged or dispatched as an "unknown event".
        # This event is made redundant by INTEGRATION_CREATE/DELETE/UPDATE and is thus not parsed or dispatched.
        raise NotImplementedError

    @event_manager_base.filtered(guild_events.IntegrationCreateEvent)
    async def on_integration_create(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        event = self._event_factory.deserialize_integration_create_event(shard, payload)
        await self.dispatch(event)

    @event_manager_base.filtered(guild_events.IntegrationDeleteEvent)
    async def on_integration_delete(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        event = self._event_factory.deserialize_integration_delete_event(shard, payload)
        await self.dispatch(event)

    @event_manager_base.filtered(guild_events.IntegrationUpdateEvent)
    async def on_integration_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        event = self._event_factory.deserialize_integration_update_event(shard, payload)
        await self.dispatch(event)

    @event_manager_base.filtered(member_events.MemberCreateEvent, config.CacheComponents.MEMBERS)
    async def on_guild_member_add(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-member-add> for more info."""
        event = self._event_factory.deserialize_guild_member_add_event(shard, payload)

        if self._cache:
            self._cache.update_member(event.member)

        await self.dispatch(event)

    @event_manager_base.filtered(member_events.MemberDeleteEvent, config.CacheComponents.MEMBERS)
    async def on_guild_member_remove(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-member-remove> for more info."""
        old: typing.Optional[guilds.Member] = None
        if self._cache:
            old = self._cache.delete_member(
                snowflakes.Snowflake(payload["guild_id"]), snowflakes.Snowflake(payload["user"]["id"])
            )

        event = self._event_factory.deserialize_guild_member_remove_event(shard, payload, old_member=old)
        await self.dispatch(event)

    @event_manager_base.filtered(member_events.MemberUpdateEvent, config.CacheComponents.MEMBERS)
    async def on_guild_member_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-member-update> for more info."""
        old: typing.Optional[guilds.Member] = None
        if self._cache:
            old = self._cache.get_member(
                snowflakes.Snowflake(payload["guild_id"]), snowflakes.Snowflake(payload["user"]["id"])
            )

        event = self._event_factory.deserialize_guild_member_update_event(shard, payload, old_member=old)

        if self._cache:
            self._cache.update_member(event.member)

        await self.dispatch(event)

    @event_manager_base.filtered(shard_events.MemberChunkEvent, config.CacheComponents.MEMBERS)
    async def on_guild_members_chunk(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-members-chunk> for more info."""
        event = self._event_factory.deserialize_guild_member_chunk_event(shard, payload)

        if self._cache:
            for member in event.members.values():
                self._cache.set_member(member)

            for presence in event.presences.values():
                self._cache.set_presence(presence)

        await self.dispatch(event)

    @event_manager_base.filtered(role_events.RoleCreateEvent, config.CacheComponents.ROLES)
    async def on_guild_role_create(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-role-create> for more info."""
        event = self._event_factory.deserialize_guild_role_create_event(shard, payload)

        if self._cache:
            self._cache.set_role(event.role)

        await self.dispatch(event)

    @event_manager_base.filtered(role_events.RoleUpdateEvent, config.CacheComponents.ROLES)
    async def on_guild_role_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-role-update> for more info."""
        old = self._cache.get_role(snowflakes.Snowflake(payload["role"]["id"])) if self._cache else None
        event = self._event_factory.deserialize_guild_role_update_event(shard, payload, old_role=old)

        if self._cache:
            self._cache.update_role(event.role)

        await self.dispatch(event)

    @event_manager_base.filtered(role_events.RoleDeleteEvent, config.CacheComponents.ROLES)
    async def on_guild_role_delete(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-role-delete> for more info."""
        old: typing.Optional[guilds.Role] = None
        if self._cache:
            old = self._cache.delete_role(snowflakes.Snowflake(payload["role_id"]))

        event = self._event_factory.deserialize_guild_role_delete_event(shard, payload, old_role=old)

        await self.dispatch(event)

    @event_manager_base.filtered(channel_events.InviteCreateEvent, config.CacheComponents.INVITES)
    async def on_invite_create(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#invite-create> for more info."""
        event = self._event_factory.deserialize_invite_create_event(shard, payload)

        if self._cache:
            self._cache.set_invite(event.invite)

        await self.dispatch(event)

    @event_manager_base.filtered(channel_events.InviteDeleteEvent, config.CacheComponents.INVITES)
    async def on_invite_delete(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#invite-delete> for more info."""
        old: typing.Optional[invites.InviteWithMetadata] = None
        if self._cache:
            old = self._cache.delete_invite(payload["code"])

        event = self._event_factory.deserialize_invite_delete_event(shard, payload, old_invite=old)

        await self.dispatch(event)

    @event_manager_base.filtered(
        (message_events.GuildMessageCreateEvent, message_events.DMMessageCreateEvent), config.CacheComponents.MESSAGES
    )
    async def on_message_create(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-create> for more info."""
        event = self._event_factory.deserialize_message_create_event(shard, payload)

        if self._cache:
            self._cache.set_message(event.message)

        await self.dispatch(event)

    @event_manager_base.filtered(
        (message_events.GuildMessageUpdateEvent, message_events.DMMessageUpdateEvent), config.CacheComponents.MESSAGES
    )
    async def on_message_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-update> for more info."""
        old = self._cache.get_message(snowflakes.Snowflake(payload["id"])) if self._cache else None
        event = self._event_factory.deserialize_message_update_event(shard, payload, old_message=old)

        if self._cache:
            self._cache.update_message(event.message)

        await self.dispatch(event)

    @event_manager_base.filtered(
        (message_events.GuildMessageDeleteEvent, message_events.DMMessageDeleteEvent), config.CacheComponents.MESSAGES
    )
    async def on_message_delete(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-delete> for more info."""
        if self._cache:
            message_id = snowflakes.Snowflake(payload["id"])
            old_message = self._cache.delete_message(message_id)
        else:
            old_message = None

        event = self._event_factory.deserialize_message_delete_event(shard, payload, old_message=old_message)

        await self.dispatch(event)

    @event_manager_base.filtered(
        (message_events.GuildMessageDeleteEvent, message_events.DMMessageDeleteEvent), config.CacheComponents.MESSAGES
    )
    async def on_message_delete_bulk(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-delete-bulk> for more info."""
        old_messages = {}

        if self._cache:
            for message_id in payload["ids"]:
                message_id = snowflakes.Snowflake(message_id)

                if message := self._cache.delete_message(message_id):
                    old_messages[message_id] = message

        await self.dispatch(
            self._event_factory.deserialize_guild_message_delete_bulk_event(shard, payload, old_messages=old_messages)
        )

    @event_manager_base.filtered((reaction_events.GuildReactionAddEvent, reaction_events.DMReactionAddEvent))
    async def on_message_reaction_add(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-reaction-add> for more info."""
        await self.dispatch(self._event_factory.deserialize_message_reaction_add_event(shard, payload))

    # TODO: this is unlikely but reaction cache?

    @event_manager_base.filtered((reaction_events.GuildReactionDeleteEvent, reaction_events.DMReactionDeleteEvent))
    async def on_message_reaction_remove(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-reaction-remove> for more info."""
        await self.dispatch(self._event_factory.deserialize_message_reaction_remove_event(shard, payload))

    @event_manager_base.filtered(
        (reaction_events.GuildReactionDeleteAllEvent, reaction_events.DMReactionDeleteAllEvent)
    )
    async def on_message_reaction_remove_all(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-reaction-remove-all> for more info."""
        await self.dispatch(self._event_factory.deserialize_message_reaction_remove_all_event(shard, payload))

    @event_manager_base.filtered(
        (reaction_events.GuildReactionDeleteEmojiEvent, reaction_events.DMReactionDeleteEmojiEvent)
    )
    async def on_message_reaction_remove_emoji(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-reaction-remove-emoji> for more info."""
        await self.dispatch(self._event_factory.deserialize_message_reaction_remove_emoji_event(shard, payload))

    @event_manager_base.filtered(guild_events.PresenceUpdateEvent, config.CacheComponents.PRESENCES)
    async def on_presence_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#presence-update> for more info."""
        old: typing.Optional[presences_.MemberPresence] = None

        if self._cache:
            old = self._cache.get_presence(
                snowflakes.Snowflake(payload["guild_id"]), snowflakes.Snowflake(payload["user"]["id"])
            )

        event = self._event_factory.deserialize_presence_update_event(shard, payload, old_presence=old)

        if self._cache and event.presence.visible_status is presences_.Status.OFFLINE:
            self._cache.delete_presence(event.presence.guild_id, event.presence.user_id)
        elif self._cache:
            self._cache.update_presence(event.presence)

        # TODO: update user here when partial_user is set self._cache.update_user(event.partial_user)
        await self.dispatch(event)

    @event_manager_base.filtered((typing_events.GuildTypingEvent, typing_events.DMTypingEvent))
    async def on_typing_start(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#typing-start> for more info."""
        await self.dispatch(self._event_factory.deserialize_typing_start_event(shard, payload))

    @event_manager_base.filtered(user_events.OwnUserUpdateEvent, config.CacheComponents.ME)
    async def on_user_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#user-update> for more info."""
        old = self._cache.get_me() if self._cache else None
        event = self._event_factory.deserialize_own_user_update_event(shard, payload, old_user=old)

        if self._cache:
            self._cache.update_me(event.user)

        await self.dispatch(event)

    @event_manager_base.filtered(voice_events.VoiceStateUpdateEvent, config.CacheComponents.VOICE_STATES)
    async def on_voice_state_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#voice-state-update> for more info."""
        old: typing.Optional[voices.VoiceState] = None
        if self._cache:
            old = self._cache.get_voice_state(
                snowflakes.Snowflake(payload["guild_id"]), snowflakes.Snowflake(payload["user_id"])
            )

        event = self._event_factory.deserialize_voice_state_update_event(shard, payload, old_state=old)

        if self._cache and event.state.channel_id is None:
            self._cache.delete_voice_state(event.state.guild_id, event.state.user_id)
        elif self._cache:
            self._cache.update_voice_state(event.state)

        await self.dispatch(event)

    @event_manager_base.filtered(voice_events.VoiceServerUpdateEvent)
    async def on_voice_server_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#voice-server-update> for more info."""
        await self.dispatch(self._event_factory.deserialize_voice_server_update_event(shard, payload))

    @event_manager_base.filtered(channel_events.WebhookUpdateEvent)
    async def on_webhooks_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#webhooks-update> for more info."""
        await self.dispatch(self._event_factory.deserialize_webhook_update_event(shard, payload))

    @event_manager_base.filtered(interaction_events.InteractionCreateEvent)
    async def on_interaction_create(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#interaction-create> for more info."""
        await self.dispatch(self._event_factory.deserialize_interaction_create_event(shard, payload))

    @event_manager_base.filtered(scheduled_events.ScheduledEventCreateEvent)
    async def on_guild_scheduled_event_create(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See https://discord.com/developers/docs/topics/gateway#guild-scheduled-event-create for more info."""
        await self.dispatch(self._event_factory.deserialize_scheduled_event_create_event(shard, payload))

    @event_manager_base.filtered(scheduled_events.ScheduledEventDeleteEvent)
    async def on_guild_scheduled_event_delete(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See https://discord.com/developers/docs/topics/gateway#guild-scheduled-event-delete for more info."""
        await self.dispatch(self._event_factory.deserialize_scheduled_event_delete_event(shard, payload))

    @event_manager_base.filtered(scheduled_events.ScheduledEventUpdateEvent)
    async def on_guild_scheduled_event_update(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See https://discord.com/developers/docs/topics/gateway#guild-scheduled-event-update for more info."""
        await self.dispatch(self._event_factory.deserialize_scheduled_event_update_event(shard, payload))

    @event_manager_base.filtered(scheduled_events.ScheduledEventUserAddEvent)
    async def on_guild_scheduled_event_user_add(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See https://discord.com/developers/docs/topics/gateway#guild-scheduled-event-user-add for more info."""
        await self.dispatch(self._event_factory.deserialize_scheduled_event_user_add_event(shard, payload))

    @event_manager_base.filtered(scheduled_events.ScheduledEventUserRemoveEvent)
    async def on_guild_scheduled_event_user_remove(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See https://discord.com/developers/docs/topics/gateway#guild-scheduled-event-user-remove for more info."""
        await self.dispatch(self._event_factory.deserialize_scheduled_event_user_remove_event(shard, payload))

Provides event handling logic for Discord events.

Methods
#  def __init__(
   self,
   entity_factory: hikari.api.entity_factory.EntityFactory,
   event_factory: hikari.api.event_factory.EventFactory,
   intents: hikari.intents.Intents,
   /,
   *,
   cache: Optional[hikari.api.cache.MutableCache] = None
):
View Source
    def __init__(
        self,
        entity_factory: entity_factory_.EntityFactory,
        event_factory: event_factory_.EventFactory,
        intents: intents_.Intents,
        /,
        *,
        cache: typing.Optional[cache_.MutableCache] = None,
    ) -> None:
        self._cache = cache
        self._entity_factory = entity_factory
        components = cache.settings.components if cache else config.CacheComponents.NONE
        super().__init__(event_factory=event_factory, intents=intents, cache_components=components)
#  def consume_raw_event(
   self,
   event_name: str,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    def consume_raw_event(
        self, event_name: str, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        if self._enabled_for_event(shard_events.ShardPayloadEvent):
            payload_event = self._event_factory.deserialize_shard_payload_event(shard, payload, name=event_name)
            self.dispatch(payload_event)
        consumer = self._consumers[event_name.lower()]
        asyncio.create_task(self._handle_dispatch(consumer, shard, payload), name=f"dispatch {event_name}")

Consume a raw event.

Parameters
Raises
  • LookupError: If there is no consumer for the event.
#  def dispatch(self, event: ~EventT) -> _asyncio.Future[typing.Any]:
View Source
    def dispatch(self, event: base_events.EventT) -> asyncio.Future[typing.Any]:
        if not isinstance(event, base_events.Event):
            raise TypeError(f"Events must be subclasses of {base_events.Event.__name__}, not {type(event).__name__}")

        tasks: typing.List[typing.Coroutine[None, typing.Any, None]] = []

        for cls in event.dispatches():
            if listeners := self._listeners.get(cls):
                for callback in listeners:
                    tasks.append(self._invoke_callback(callback, event))

            if cls not in self._waiters:
                continue

            waiter_set = self._waiters[cls]
            for waiter in tuple(waiter_set):
                predicate, future = waiter
                if not future.done():
                    try:
                        if predicate and not predicate(event):
                            continue
                    except Exception as ex:
                        future.set_exception(ex)
                    else:
                        future.set_result(event)

                waiter_set.remove(waiter)

            if not waiter_set:
                del self._waiters[cls]
                self._increment_waiter_group_count(cls, -1)

        return asyncio.gather(*tasks) if tasks else aio.completed_future()

Dispatch an event.

Parameters
Example

We can dispatch custom events by first defining a class that derives from hikari.events.base_events.Event.

import attr

from hikari.traits import RESTAware
from hikari.events.base_events import Event
from hikari.users import User
from hikari.snowflakes import Snowflake

@attr.define()
class EveryoneMentionedEvent(Event):
    app: RESTAware = attr.field()

    author: User = attr.field()
    '''The user who mentioned everyone.'''

    content: str = attr.field()
    '''The message that was sent.'''

    message_id: Snowflake = attr.field()
    '''The message ID.'''

    channel_id: Snowflake = attr.field()
    '''The channel ID.'''

We can then dispatch our event as we see fit.

from hikari.events.messages import MessageCreateEvent

@bot.listen(MessageCreateEvent)
async def on_message(event):
    if "@everyone" in event.content or "@here" in event.content:
        event = EveryoneMentionedEvent(
            author=event.author,
            content=event.content,
            message_id=event.id,
            channel_id=event.channel_id,
        )

        bot.dispatch(event)

This event can be listened to elsewhere by subscribing to it with EventManager.subscribe.

@bot.listen(EveryoneMentionedEvent)
async def on_everyone_mentioned(event):
    print(event.user, "just pinged everyone in", event.channel_id)
Returns
  • asyncio.Future[typing.Any]: A future that can be optionally awaited. If awaited, the future will complete once all corresponding event listeners have been invoked. If not awaited, this will schedule the dispatch of the events in the background for later.
See Also

hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.unsubscribe
hikari.api.event_manager.EventManager.wait_for

#  def get_listeners(
   self,
   event_type: Type[~EventT],
   /,
   *,
   polymorphic: bool = True
) -> Collection[Callable[[~EventT], Coroutine[Any, Any, NoneType]]]:
View Source
    def get_listeners(
        self,
        event_type: typing.Type[base_events.EventT],
        /,
        *,
        polymorphic: bool = True,
    ) -> typing.Collection[event_manager_.CallbackT[base_events.EventT]]:
        if polymorphic:
            listeners: typing.List[event_manager_.CallbackT[base_events.EventT]] = []
            for event in event_type.dispatches():
                if subscribed_listeners := self._listeners.get(event):
                    listeners.extend(subscribed_listeners)

            return listeners

        if items := self._listeners.get(event_type):
            return items.copy()

        return []

Get the listeners for a given event type, if there are any.

Parameters
  • event_type (typing.Type[T]): The event type to look for. T must be a subclass of hikari.events.base_events.Event.
  • polymorphic (bool): If True, this will also return the listeners for all the event types event_type will dispatch. If False, then only listeners for this class specifically are returned. The default is True.
Returns
  • typing.Collection[typing.Callable[[T], typing.Coroutine[typing.Any, typing.Any, None]]: A copy of the collection of listeners for the event. Will return an empty collection if nothing is registered.
#  def listen(
   self,
   *event_types: Type[~EventT]
) -> Callable[[Callable[[~EventT], Coroutine[Any, Any, NoneType]]], Callable[[~EventT], Coroutine[Any, Any, NoneType]]]:
View Source
    def listen(
        self,
        *event_types: typing.Type[base_events.EventT],
    ) -> typing.Callable[[event_manager_.CallbackT[base_events.EventT]], event_manager_.CallbackT[base_events.EventT]]:
        def decorator(
            callback: event_manager_.CallbackT[base_events.EventT],
        ) -> event_manager_.CallbackT[base_events.EventT]:
            # Avoid resolving forward references in the function's signature if
            # event_type was explicitly provided as this may lead to errors.
            if event_types:
                _assert_is_listener(iter(inspect.signature(callback).parameters.values()))
                resolved_types = event_types

            else:
                signature = reflect.resolve_signature(callback)
                params = signature.parameters.values()
                _assert_is_listener(iter(params))
                event_param = next(iter(params))
                annotation = event_param.annotation

                if annotation is event_param.empty:
                    raise TypeError("Must provide the event type in the @listen decorator or as a type hint!")

                if typing.get_origin(annotation) in _UNIONS:
                    # Resolve the types inside the union
                    resolved_types = typing.get_args(annotation)
                else:
                    # Just pass back the annotation
                    resolved_types = (annotation,)

            for resolved_type in resolved_types:
                self.subscribe(resolved_type, callback, _nested=1)

            return callback

        return decorator

Generate a decorator to subscribe a callback to an event type.

This is a second-order decorator.

Parameters
  • *event_types (typing.Optional[typing.Type[T]]): The event types to subscribe to. The implementation may allow this to be undefined. If this is the case, the event type will be inferred instead from the type hints on the function signature. T must be a subclass of hikari.events.base_events.Event.
Returns
  • typing.Callable[[T], T]: A decorator for a coroutine function that passes it to EventManager.subscribe before returning the function reference.
See Also

hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.unsubscribe
hikari.api.event_manager.EventManager.wait_for

#  
@event_manager_base.filtered(channel_events.GuildChannelCreateEvent, config.CacheComponents.GUILD_CHANNELS)
async def on_channel_create(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(channel_events.GuildChannelCreateEvent, config.CacheComponents.GUILD_CHANNELS)
    async def on_channel_create(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#channel-create> for more info."""
        event = self._event_factory.deserialize_guild_channel_create_event(shard, payload)

        if self._cache:
            self._cache.set_guild_channel(event.channel)

        await self.dispatch(event)
#  
@event_manager_base.filtered(channel_events.GuildChannelDeleteEvent, config.CacheComponents.GUILD_CHANNELS)
async def on_channel_delete(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(channel_events.GuildChannelDeleteEvent, config.CacheComponents.GUILD_CHANNELS)
    async def on_channel_delete(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#channel-delete> for more info."""
        event = self._event_factory.deserialize_guild_channel_delete_event(shard, payload)

        if self._cache:
            self._cache.delete_guild_channel(event.channel.id)

        await self.dispatch(event)
#  
@event_manager_base.filtered((channel_events.GuildPinsUpdateEvent, channel_events.DMPinsUpdateEvent))
async def on_channel_pins_update(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered((channel_events.GuildPinsUpdateEvent, channel_events.DMPinsUpdateEvent))
    async def on_channel_pins_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#channel-pins-update> for more info."""
        # TODO: we need a method for this specifically
        await self.dispatch(self._event_factory.deserialize_channel_pins_update_event(shard, payload))
#  
@event_manager_base.filtered(channel_events.GuildChannelUpdateEvent, config.CacheComponents.GUILD_CHANNELS)
async def on_channel_update(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(channel_events.GuildChannelUpdateEvent, config.CacheComponents.GUILD_CHANNELS)
    async def on_channel_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#channel-update> for more info."""
        old = self._cache.get_guild_channel(snowflakes.Snowflake(payload["id"])) if self._cache else None
        event = self._event_factory.deserialize_guild_channel_update_event(shard, payload, old_channel=old)

        if self._cache:
            self._cache.update_guild_channel(event.channel)

        await self.dispatch(event)
#  
@event_manager_base.filtered(guild_events.BanCreateEvent)
async def on_guild_ban_add(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(guild_events.BanCreateEvent)
    async def on_guild_ban_add(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-ban-add> for more info."""
        await self.dispatch(self._event_factory.deserialize_guild_ban_add_event(shard, payload))
#  
@event_manager_base.filtered(guild_events.BanDeleteEvent)
async def on_guild_ban_remove(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(guild_events.BanDeleteEvent)
    async def on_guild_ban_remove(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-ban-remove> for more info."""
        await self.dispatch(self._event_factory.deserialize_guild_ban_remove_event(shard, payload))
#  async def on_guild_create(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    async def on_guild_create(  # noqa: C901 - Function too complex
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-create> for more info."""
        event: typing.Union[guild_events.GuildAvailableEvent, guild_events.GuildJoinEvent, None]

        if "unavailable" in payload and self._enabled_for_event(guild_events.GuildAvailableEvent):
            event = self._event_factory.deserialize_guild_available_event(shard, payload)
        elif "unavailable" not in payload and self._enabled_for_event(guild_events.GuildJoinEvent):
            event = self._event_factory.deserialize_guild_join_event(shard, payload)
        else:
            event = None

        if event:
            # We also filter here to prevent iterating over them and calling a function that won't do anything
            channels = event.channels if self._cache_enabled_for(config.CacheComponents.GUILD_CHANNELS) else None
            emojis = event.emojis if self._cache_enabled_for(config.CacheComponents.EMOJIS) else None
            guild = event.guild if self._cache_enabled_for(config.CacheComponents.GUILDS) else None
            guild_id = event.guild.id
            members = event.members if self._cache_enabled_for(config.CacheComponents.MEMBERS) else None
            presences = event.presences if self._cache_enabled_for(config.CacheComponents.PRESENCES) else None
            roles = event.roles if self._cache_enabled_for(config.CacheComponents.ROLES) else None
            voice_states = event.voice_states if self._cache_enabled_for(config.CacheComponents.VOICE_STATES) else None

        elif self._cache:
            _LOGGER.log(ux.TRACE, "Skipping on_guild_create dispatch due to lack of any registered listeners")
            gd = self._entity_factory.deserialize_gateway_guild(payload)

            channels = gd.channels() if self._cache_enabled_for(config.CacheComponents.GUILD_CHANNELS) else None
            emojis = gd.emojis() if self._cache_enabled_for(config.CacheComponents.EMOJIS) else None
            guild = gd.guild() if self._cache_enabled_for(config.CacheComponents.GUILDS) else None
            guild_id = gd.id
            members = gd.members() if self._cache_enabled_for(config.CacheComponents.MEMBERS) else None
            presences = gd.presences() if self._cache_enabled_for(config.CacheComponents.PRESENCES) else None
            roles = gd.roles() if self._cache_enabled_for(config.CacheComponents.ROLES) else None
            voice_states = gd.voice_states() if self._cache_enabled_for(config.CacheComponents.VOICE_STATES) else None

        else:
            _LOGGER.log(
                ux.TRACE, "Skipping on_guild_create raw dispatch due to lack of any registered listeners or cache need"
            )

            channels = None
            emojis = None
            guild = None
            guild_id = snowflakes.Snowflake(payload["id"])
            members = None
            presences = None
            roles = None
            voice_states = None

        if self._cache:
            if guild:
                self._cache.update_guild(guild)

            if channels:
                self._cache.clear_guild_channels_for_guild(guild_id)
                for channel in channels.values():
                    self._cache.set_guild_channel(channel)

            if emojis:
                self._cache.clear_emojis_for_guild(guild_id)
                for emoji in emojis.values():
                    self._cache.set_emoji(emoji)

            if roles:
                self._cache.clear_roles_for_guild(guild_id)
                for role in roles.values():
                    self._cache.set_role(role)

            if members:
                # TODO: do we really want to invalidate these all after an outage.
                self._cache.clear_members_for_guild(guild_id)
                for member in members.values():
                    self._cache.set_member(member)

            if presences:
                self._cache.clear_presences_for_guild(guild_id)
                for presence in presences.values():
                    self._cache.set_presence(presence)

            if voice_states:
                self._cache.clear_voice_states_for_guild(guild_id)
                for voice_state in voice_states.values():
                    self._cache.set_voice_state(voice_state)

        presences_declared = self._intents & intents_.Intents.GUILD_PRESENCES

        # When intents are enabled Discord will only send other member objects on the guild create
        # payload if presence intents are also declared, so if this isn't the case then we also want
        # to chunk small guilds.
        if (
            self._intents & intents_.Intents.GUILD_MEMBERS
            and (payload.get("large") or not presences_declared)
            and (
                self._cache_enabled_for(config.CacheComponents.MEMBERS)
                or self._enabled_for_event(shard_events.MemberChunkEvent)
            )
        ):
            # We create a task here instead of awaiting the result to avoid any rate-limits from delaying dispatch.
            nonce = f"{shard.id}.{_fixed_size_nonce()}"

            if event:
                event.chunk_nonce = nonce

            coroutine = _request_guild_members(shard, guild_id, include_presences=bool(presences_declared), nonce=nonce)
            asyncio.create_task(coroutine, name=f"{shard.id}:{guild_id} guild create members request")

        if event:
            await self.dispatch(event)
#  
@event_manager_base.filtered((guild_events.GuildLeaveEvent, guild_events.GuildUnavailableEvent), config.CacheComponents.GUILDS | config.CacheComponents.GUILD_CHANNELS | config.CacheComponents.EMOJIS | config.CacheComponents.ROLES | config.CacheComponents.PRESENCES | config.CacheComponents.VOICE_STATES | config.CacheComponents.MEMBERS)
async def on_guild_delete(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(
        (guild_events.GuildLeaveEvent, guild_events.GuildUnavailableEvent),
        config.CacheComponents.GUILDS
        | config.CacheComponents.GUILD_CHANNELS
        | config.CacheComponents.EMOJIS
        | config.CacheComponents.ROLES
        | config.CacheComponents.PRESENCES
        | config.CacheComponents.VOICE_STATES
        | config.CacheComponents.MEMBERS,
    )
    async def on_guild_delete(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-delete> for more info."""
        event: typing.Union[guild_events.GuildUnavailableEvent, guild_events.GuildLeaveEvent]
        if payload.get("unavailable"):
            event = self._event_factory.deserialize_guild_unavailable_event(shard, payload)

            if self._cache:
                self._cache.set_guild_availability(event.guild_id, False)

        else:
            old: typing.Optional[guilds.GatewayGuild] = None
            if self._cache:
                guild_id = snowflakes.Snowflake(payload["id"])
                #  TODO: this doesn't work in all intent scenarios
                old = self._cache.delete_guild(guild_id)
                self._cache.clear_voice_states_for_guild(guild_id)
                self._cache.clear_invites_for_guild(guild_id)
                self._cache.clear_members_for_guild(guild_id)
                self._cache.clear_presences_for_guild(guild_id)
                self._cache.clear_guild_channels_for_guild(guild_id)
                self._cache.clear_emojis_for_guild(guild_id)
                self._cache.clear_roles_for_guild(guild_id)

            event = self._event_factory.deserialize_guild_leave_event(shard, payload, old_guild=old)

        await self.dispatch(event)
#  
@event_manager_base.filtered(guild_events.EmojisUpdateEvent, config.CacheComponents.EMOJIS)
async def on_guild_emojis_update(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(guild_events.EmojisUpdateEvent, config.CacheComponents.EMOJIS)
    async def on_guild_emojis_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-emojis-update> for more info."""
        guild_id = snowflakes.Snowflake(payload["guild_id"])
        old = list(self._cache.clear_emojis_for_guild(guild_id).values()) if self._cache else None

        event = self._event_factory.deserialize_guild_emojis_update_event(shard, payload, old_emojis=old)

        if self._cache:
            for emoji in event.emojis:
                self._cache.set_emoji(emoji)

        await self.dispatch(event)
#  
@event_manager_base.filtered(())
async def on_guild_integrations_update(self, _: hikari.api.shard.GatewayShard, __: Dict[str, Any]) -> None:
View Source
    @event_manager_base.filtered(())  # An empty sequence here means that this method will always be skipped.
    async def on_guild_integrations_update(self, _: gateway_shard.GatewayShard, __: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-integrations-update> for more info."""
        # This is only here to stop this being logged or dispatched as an "unknown event".
        # This event is made redundant by INTEGRATION_CREATE/DELETE/UPDATE and is thus not parsed or dispatched.
        raise NotImplementedError
#  
@event_manager_base.filtered(member_events.MemberCreateEvent, config.CacheComponents.MEMBERS)
async def on_guild_member_add(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(member_events.MemberCreateEvent, config.CacheComponents.MEMBERS)
    async def on_guild_member_add(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-member-add> for more info."""
        event = self._event_factory.deserialize_guild_member_add_event(shard, payload)

        if self._cache:
            self._cache.update_member(event.member)

        await self.dispatch(event)
#  
@event_manager_base.filtered(member_events.MemberDeleteEvent, config.CacheComponents.MEMBERS)
async def on_guild_member_remove(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(member_events.MemberDeleteEvent, config.CacheComponents.MEMBERS)
    async def on_guild_member_remove(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-member-remove> for more info."""
        old: typing.Optional[guilds.Member] = None
        if self._cache:
            old = self._cache.delete_member(
                snowflakes.Snowflake(payload["guild_id"]), snowflakes.Snowflake(payload["user"]["id"])
            )

        event = self._event_factory.deserialize_guild_member_remove_event(shard, payload, old_member=old)
        await self.dispatch(event)
#  
@event_manager_base.filtered(member_events.MemberUpdateEvent, config.CacheComponents.MEMBERS)
async def on_guild_member_update(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(member_events.MemberUpdateEvent, config.CacheComponents.MEMBERS)
    async def on_guild_member_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-member-update> for more info."""
        old: typing.Optional[guilds.Member] = None
        if self._cache:
            old = self._cache.get_member(
                snowflakes.Snowflake(payload["guild_id"]), snowflakes.Snowflake(payload["user"]["id"])
            )

        event = self._event_factory.deserialize_guild_member_update_event(shard, payload, old_member=old)

        if self._cache:
            self._cache.update_member(event.member)

        await self.dispatch(event)
#  
@event_manager_base.filtered(shard_events.MemberChunkEvent, config.CacheComponents.MEMBERS)
async def on_guild_members_chunk(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(shard_events.MemberChunkEvent, config.CacheComponents.MEMBERS)
    async def on_guild_members_chunk(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-members-chunk> for more info."""
        event = self._event_factory.deserialize_guild_member_chunk_event(shard, payload)

        if self._cache:
            for member in event.members.values():
                self._cache.set_member(member)

            for presence in event.presences.values():
                self._cache.set_presence(presence)

        await self.dispatch(event)
#  
@event_manager_base.filtered(role_events.RoleCreateEvent, config.CacheComponents.ROLES)
async def on_guild_role_create(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(role_events.RoleCreateEvent, config.CacheComponents.ROLES)
    async def on_guild_role_create(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-role-create> for more info."""
        event = self._event_factory.deserialize_guild_role_create_event(shard, payload)

        if self._cache:
            self._cache.set_role(event.role)

        await self.dispatch(event)
#  
@event_manager_base.filtered(role_events.RoleDeleteEvent, config.CacheComponents.ROLES)
async def on_guild_role_delete(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(role_events.RoleDeleteEvent, config.CacheComponents.ROLES)
    async def on_guild_role_delete(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-role-delete> for more info."""
        old: typing.Optional[guilds.Role] = None
        if self._cache:
            old = self._cache.delete_role(snowflakes.Snowflake(payload["role_id"]))

        event = self._event_factory.deserialize_guild_role_delete_event(shard, payload, old_role=old)

        await self.dispatch(event)
#  
@event_manager_base.filtered(role_events.RoleUpdateEvent, config.CacheComponents.ROLES)
async def on_guild_role_update(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(role_events.RoleUpdateEvent, config.CacheComponents.ROLES)
    async def on_guild_role_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-role-update> for more info."""
        old = self._cache.get_role(snowflakes.Snowflake(payload["role"]["id"])) if self._cache else None
        event = self._event_factory.deserialize_guild_role_update_event(shard, payload, old_role=old)

        if self._cache:
            self._cache.update_role(event.role)

        await self.dispatch(event)
#  
@event_manager_base.filtered(scheduled_events.ScheduledEventCreateEvent)
async def on_guild_scheduled_event_create(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(scheduled_events.ScheduledEventCreateEvent)
    async def on_guild_scheduled_event_create(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See https://discord.com/developers/docs/topics/gateway#guild-scheduled-event-create for more info."""
        await self.dispatch(self._event_factory.deserialize_scheduled_event_create_event(shard, payload))

See https://discord.com/developers/docs/topics/gateway#guild-scheduled-event-create for more info.

#  
@event_manager_base.filtered(scheduled_events.ScheduledEventDeleteEvent)
async def on_guild_scheduled_event_delete(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(scheduled_events.ScheduledEventDeleteEvent)
    async def on_guild_scheduled_event_delete(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See https://discord.com/developers/docs/topics/gateway#guild-scheduled-event-delete for more info."""
        await self.dispatch(self._event_factory.deserialize_scheduled_event_delete_event(shard, payload))

See https://discord.com/developers/docs/topics/gateway#guild-scheduled-event-delete for more info.

#  
@event_manager_base.filtered(scheduled_events.ScheduledEventUpdateEvent)
async def on_guild_scheduled_event_update(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(scheduled_events.ScheduledEventUpdateEvent)
    async def on_guild_scheduled_event_update(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See https://discord.com/developers/docs/topics/gateway#guild-scheduled-event-update for more info."""
        await self.dispatch(self._event_factory.deserialize_scheduled_event_update_event(shard, payload))

See https://discord.com/developers/docs/topics/gateway#guild-scheduled-event-update for more info.

#  
@event_manager_base.filtered(scheduled_events.ScheduledEventUserAddEvent)
async def on_guild_scheduled_event_user_add(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(scheduled_events.ScheduledEventUserAddEvent)
    async def on_guild_scheduled_event_user_add(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See https://discord.com/developers/docs/topics/gateway#guild-scheduled-event-user-add for more info."""
        await self.dispatch(self._event_factory.deserialize_scheduled_event_user_add_event(shard, payload))

See https://discord.com/developers/docs/topics/gateway#guild-scheduled-event-user-add for more info.

#  
@event_manager_base.filtered(scheduled_events.ScheduledEventUserRemoveEvent)
async def on_guild_scheduled_event_user_remove(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(scheduled_events.ScheduledEventUserRemoveEvent)
    async def on_guild_scheduled_event_user_remove(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See https://discord.com/developers/docs/topics/gateway#guild-scheduled-event-user-remove for more info."""
        await self.dispatch(self._event_factory.deserialize_scheduled_event_user_remove_event(shard, payload))

See https://discord.com/developers/docs/topics/gateway#guild-scheduled-event-user-remove for more info.

#  async def on_guild_update(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    async def on_guild_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#guild-update> for more info."""
        event: typing.Optional[guild_events.GuildUpdateEvent]
        if self._enabled_for_event(guild_events.GuildUpdateEvent):
            guild_id = snowflakes.Snowflake(payload["id"])
            old = self._cache.get_guild(guild_id) if self._cache else None
            event = self._event_factory.deserialize_guild_update_event(shard, payload, old_guild=old)

            # We also filter here to prevent iterating over them and calling a function that won't do anything
            emojis = event.emojis if self._cache_enabled_for(config.CacheComponents.EMOJIS) else None
            guild = event.guild if self._cache_enabled_for(config.CacheComponents.GUILDS) else None
            roles = event.roles if self._cache_enabled_for(config.CacheComponents.ROLES) else None

        elif self._cache:
            _LOGGER.log(ux.TRACE, "Skipping on_guild_update raw dispatch due to lack of any registered listeners")
            event = None

            gd = self._entity_factory.deserialize_gateway_guild(payload)
            emojis = gd.emojis() if self._cache_enabled_for(config.CacheComponents.EMOJIS) else None
            guild = gd.guild() if self._cache_enabled_for(config.CacheComponents.GUILDS) else None
            guild_id = gd.id
            roles = gd.roles() if self._cache_enabled_for(config.CacheComponents.ROLES) else None

        else:
            _LOGGER.log(
                ux.TRACE, "Skipping on_guild_update raw dispatch due to lack of any registered listeners or cache need"
            )
            return

        if self._cache:
            if guild:
                self._cache.update_guild(guild)

            if emojis:
                self._cache.clear_emojis_for_guild(guild_id)
                for emoji in emojis.values():
                    self._cache.set_emoji(emoji)

            if roles:
                self._cache.clear_roles_for_guild(guild_id)
                for role in roles.values():
                    self._cache.set_role(role)

        if event:
            await self.dispatch(event)
#  
@event_manager_base.filtered(guild_events.IntegrationCreateEvent)
async def on_integration_create(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(guild_events.IntegrationCreateEvent)
    async def on_integration_create(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        event = self._event_factory.deserialize_integration_create_event(shard, payload)
        await self.dispatch(event)
#  
@event_manager_base.filtered(guild_events.IntegrationDeleteEvent)
async def on_integration_delete(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(guild_events.IntegrationDeleteEvent)
    async def on_integration_delete(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        event = self._event_factory.deserialize_integration_delete_event(shard, payload)
        await self.dispatch(event)
#  
@event_manager_base.filtered(guild_events.IntegrationUpdateEvent)
async def on_integration_update(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(guild_events.IntegrationUpdateEvent)
    async def on_integration_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        event = self._event_factory.deserialize_integration_update_event(shard, payload)
        await self.dispatch(event)
#  
@event_manager_base.filtered(interaction_events.InteractionCreateEvent)
async def on_interaction_create(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(interaction_events.InteractionCreateEvent)
    async def on_interaction_create(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#interaction-create> for more info."""
        await self.dispatch(self._event_factory.deserialize_interaction_create_event(shard, payload))
#  
@event_manager_base.filtered(channel_events.InviteCreateEvent, config.CacheComponents.INVITES)
async def on_invite_create(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(channel_events.InviteCreateEvent, config.CacheComponents.INVITES)
    async def on_invite_create(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#invite-create> for more info."""
        event = self._event_factory.deserialize_invite_create_event(shard, payload)

        if self._cache:
            self._cache.set_invite(event.invite)

        await self.dispatch(event)
#  
@event_manager_base.filtered(channel_events.InviteDeleteEvent, config.CacheComponents.INVITES)
async def on_invite_delete(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(channel_events.InviteDeleteEvent, config.CacheComponents.INVITES)
    async def on_invite_delete(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#invite-delete> for more info."""
        old: typing.Optional[invites.InviteWithMetadata] = None
        if self._cache:
            old = self._cache.delete_invite(payload["code"])

        event = self._event_factory.deserialize_invite_delete_event(shard, payload, old_invite=old)

        await self.dispatch(event)
#  
@event_manager_base.filtered((message_events.GuildMessageCreateEvent, message_events.DMMessageCreateEvent), config.CacheComponents.MESSAGES)
async def on_message_create(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(
        (message_events.GuildMessageCreateEvent, message_events.DMMessageCreateEvent), config.CacheComponents.MESSAGES
    )
    async def on_message_create(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-create> for more info."""
        event = self._event_factory.deserialize_message_create_event(shard, payload)

        if self._cache:
            self._cache.set_message(event.message)

        await self.dispatch(event)
#  
@event_manager_base.filtered((message_events.GuildMessageDeleteEvent, message_events.DMMessageDeleteEvent), config.CacheComponents.MESSAGES)
async def on_message_delete(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(
        (message_events.GuildMessageDeleteEvent, message_events.DMMessageDeleteEvent), config.CacheComponents.MESSAGES
    )
    async def on_message_delete(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-delete> for more info."""
        if self._cache:
            message_id = snowflakes.Snowflake(payload["id"])
            old_message = self._cache.delete_message(message_id)
        else:
            old_message = None

        event = self._event_factory.deserialize_message_delete_event(shard, payload, old_message=old_message)

        await self.dispatch(event)
#  
@event_manager_base.filtered((message_events.GuildMessageDeleteEvent, message_events.DMMessageDeleteEvent), config.CacheComponents.MESSAGES)
async def on_message_delete_bulk(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(
        (message_events.GuildMessageDeleteEvent, message_events.DMMessageDeleteEvent), config.CacheComponents.MESSAGES
    )
    async def on_message_delete_bulk(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-delete-bulk> for more info."""
        old_messages = {}

        if self._cache:
            for message_id in payload["ids"]:
                message_id = snowflakes.Snowflake(message_id)

                if message := self._cache.delete_message(message_id):
                    old_messages[message_id] = message

        await self.dispatch(
            self._event_factory.deserialize_guild_message_delete_bulk_event(shard, payload, old_messages=old_messages)
        )
#  
@event_manager_base.filtered((reaction_events.GuildReactionAddEvent, reaction_events.DMReactionAddEvent))
async def on_message_reaction_add(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered((reaction_events.GuildReactionAddEvent, reaction_events.DMReactionAddEvent))
    async def on_message_reaction_add(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-reaction-add> for more info."""
        await self.dispatch(self._event_factory.deserialize_message_reaction_add_event(shard, payload))
#  
@event_manager_base.filtered((reaction_events.GuildReactionDeleteEvent, reaction_events.DMReactionDeleteEvent))
async def on_message_reaction_remove(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered((reaction_events.GuildReactionDeleteEvent, reaction_events.DMReactionDeleteEvent))
    async def on_message_reaction_remove(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-reaction-remove> for more info."""
        await self.dispatch(self._event_factory.deserialize_message_reaction_remove_event(shard, payload))
#  
@event_manager_base.filtered((reaction_events.GuildReactionDeleteAllEvent, reaction_events.DMReactionDeleteAllEvent))
async def on_message_reaction_remove_all(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(
        (reaction_events.GuildReactionDeleteAllEvent, reaction_events.DMReactionDeleteAllEvent)
    )
    async def on_message_reaction_remove_all(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-reaction-remove-all> for more info."""
        await self.dispatch(self._event_factory.deserialize_message_reaction_remove_all_event(shard, payload))
#  
@event_manager_base.filtered((reaction_events.GuildReactionDeleteEmojiEvent, reaction_events.DMReactionDeleteEmojiEvent))
async def on_message_reaction_remove_emoji(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(
        (reaction_events.GuildReactionDeleteEmojiEvent, reaction_events.DMReactionDeleteEmojiEvent)
    )
    async def on_message_reaction_remove_emoji(
        self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject
    ) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-reaction-remove-emoji> for more info."""
        await self.dispatch(self._event_factory.deserialize_message_reaction_remove_emoji_event(shard, payload))
#  
@event_manager_base.filtered((message_events.GuildMessageUpdateEvent, message_events.DMMessageUpdateEvent), config.CacheComponents.MESSAGES)
async def on_message_update(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(
        (message_events.GuildMessageUpdateEvent, message_events.DMMessageUpdateEvent), config.CacheComponents.MESSAGES
    )
    async def on_message_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#message-update> for more info."""
        old = self._cache.get_message(snowflakes.Snowflake(payload["id"])) if self._cache else None
        event = self._event_factory.deserialize_message_update_event(shard, payload, old_message=old)

        if self._cache:
            self._cache.update_message(event.message)

        await self.dispatch(event)
#  
@event_manager_base.filtered(guild_events.PresenceUpdateEvent, config.CacheComponents.PRESENCES)
async def on_presence_update(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(guild_events.PresenceUpdateEvent, config.CacheComponents.PRESENCES)
    async def on_presence_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#presence-update> for more info."""
        old: typing.Optional[presences_.MemberPresence] = None

        if self._cache:
            old = self._cache.get_presence(
                snowflakes.Snowflake(payload["guild_id"]), snowflakes.Snowflake(payload["user"]["id"])
            )

        event = self._event_factory.deserialize_presence_update_event(shard, payload, old_presence=old)

        if self._cache and event.presence.visible_status is presences_.Status.OFFLINE:
            self._cache.delete_presence(event.presence.guild_id, event.presence.user_id)
        elif self._cache:
            self._cache.update_presence(event.presence)

        # TODO: update user here when partial_user is set self._cache.update_user(event.partial_user)
        await self.dispatch(event)
#  
@event_manager_base.filtered(shard_events.ShardReadyEvent, config.CacheComponents.ME)
async def on_ready(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(shard_events.ShardReadyEvent, config.CacheComponents.ME)
    async def on_ready(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#ready> for more info."""
        # TODO: cache unavailable guilds on startup, I didn't bother for the time being.
        event = self._event_factory.deserialize_ready_event(shard, payload)

        if self._cache:
            self._cache.update_me(event.my_user)

        await self.dispatch(event)
#  
@event_manager_base.filtered(shard_events.ShardResumedEvent)
async def on_resumed(
   self,
   shard: hikari.api.shard.GatewayShard,
   _: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(shard_events.ShardResumedEvent)
    async def on_resumed(self, shard: gateway_shard.GatewayShard, _: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#resumed> for more info."""
        await self.dispatch(self._event_factory.deserialize_resumed_event(shard))
#  
@event_manager_base.filtered((typing_events.GuildTypingEvent, typing_events.DMTypingEvent))
async def on_typing_start(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered((typing_events.GuildTypingEvent, typing_events.DMTypingEvent))
    async def on_typing_start(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#typing-start> for more info."""
        await self.dispatch(self._event_factory.deserialize_typing_start_event(shard, payload))
#  
@event_manager_base.filtered(user_events.OwnUserUpdateEvent, config.CacheComponents.ME)
async def on_user_update(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(user_events.OwnUserUpdateEvent, config.CacheComponents.ME)
    async def on_user_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#user-update> for more info."""
        old = self._cache.get_me() if self._cache else None
        event = self._event_factory.deserialize_own_user_update_event(shard, payload, old_user=old)

        if self._cache:
            self._cache.update_me(event.user)

        await self.dispatch(event)
#  
@event_manager_base.filtered(voice_events.VoiceServerUpdateEvent)
async def on_voice_server_update(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(voice_events.VoiceServerUpdateEvent)
    async def on_voice_server_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#voice-server-update> for more info."""
        await self.dispatch(self._event_factory.deserialize_voice_server_update_event(shard, payload))
#  
@event_manager_base.filtered(voice_events.VoiceStateUpdateEvent, config.CacheComponents.VOICE_STATES)
async def on_voice_state_update(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(voice_events.VoiceStateUpdateEvent, config.CacheComponents.VOICE_STATES)
    async def on_voice_state_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#voice-state-update> for more info."""
        old: typing.Optional[voices.VoiceState] = None
        if self._cache:
            old = self._cache.get_voice_state(
                snowflakes.Snowflake(payload["guild_id"]), snowflakes.Snowflake(payload["user_id"])
            )

        event = self._event_factory.deserialize_voice_state_update_event(shard, payload, old_state=old)

        if self._cache and event.state.channel_id is None:
            self._cache.delete_voice_state(event.state.guild_id, event.state.user_id)
        elif self._cache:
            self._cache.update_voice_state(event.state)

        await self.dispatch(event)
#  
@event_manager_base.filtered(channel_events.WebhookUpdateEvent)
async def on_webhooks_update(
   self,
   shard: hikari.api.shard.GatewayShard,
   payload: Dict[str, Any]
) -> None:
View Source
    @event_manager_base.filtered(channel_events.WebhookUpdateEvent)
    async def on_webhooks_update(self, shard: gateway_shard.GatewayShard, payload: data_binding.JSONObject) -> None:
        """See <https://discord.com/developers/docs/topics/gateway#webhooks-update> for more info."""
        await self.dispatch(self._event_factory.deserialize_webhook_update_event(shard, payload))
#  def stream(
   self,
   event_type: Type[~EventT],
   /,
   timeout: Union[float, int, NoneType],
   limit: Optional[int] = None
) -> hikari.api.event_manager.EventStream[~EventT]:
View Source
    def stream(
        self,
        event_type: typing.Type[base_events.EventT],
        /,
        timeout: typing.Union[float, int, None],
        limit: typing.Optional[int] = None,
    ) -> event_manager_.EventStream[base_events.EventT]:
        self._check_event(event_type, 1)
        return EventStream(self, event_type, timeout=timeout, limit=limit)

Return a stream iterator for the given event and sub-events.

Warning: If you use await stream.open() to start the stream then you must also close it with await stream.close() otherwise it may queue events in memory indefinitely.

Parameters
  • event_type (typing.Type[hikari.events.base_events.Event]): The event type to listen for. This will listen for subclasses of this type additionally.
  • timeout (typing.Optional[int, float]): How long this streamer should wait for the next event before ending the iteration. If None then this will continue until explicitly broken from.
  • limit (typing.Optional[int]): The limit for how many events this should queue at one time before dropping extra incoming events, leave this as None for the cache size to be unlimited.
Returns
  • EventStream[hikari.events.base_events.Event]: The async iterator to handle streamed events. This must be started with with stream: or stream.open() before asynchronously iterating over it.
Examples
with bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) as stream:
    async for user_id in stream.map("user_id").limit(50):
        ...

or using open() and close()

stream = bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id))
stream.open()

async for user_id in stream.map("user_id").limit(50)
    ...

stream.close()
See Also

hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.unsubscribe
hikari.api.event_manager.EventManager.wait_for

#  def subscribe(
   self,
   event_type: Type[Any],
   callback: Callable[[Any], Coroutine[Any, Any, NoneType]],
   *,
   _nested: int = 0
) -> None:
View Source
    def subscribe(
        self,
        event_type: typing.Type[typing.Any],
        callback: event_manager_.CallbackT[typing.Any],
        *,
        _nested: int = 0,
    ) -> None:
        if not inspect.iscoroutinefunction(callback):
            raise TypeError("Cannot subscribe a non-coroutine function callback")

        # `_nested` is used to show the correct source code snippet if an intent
        # warning is triggered.
        self._check_event(event_type, _nested)

        _LOGGER.debug(
            "subscribing callback 'async def %s%s' to event-type %s.%s",
            getattr(callback, "__name__", "<anon>"),
            inspect.signature(callback),
            event_type.__module__,
            event_type.__qualname__,
        )

        try:
            self._listeners[event_type].append(callback)
        except KeyError:
            self._listeners[event_type] = [callback]
            self._increment_listener_group_count(event_type, 1)

Subscribe a given callback to a given event type.

Parameters
  • event_type (typing.Type[T]): The event type to listen for. This will also listen for any subclasses of the given type. T must be a subclass of hikari.events.base_events.Event.
  • callback: Must be a coroutine function to invoke. This should consume an instance of the given event, or an instance of a valid subclass if one exists. Any result is discarded.
Example

The following demonstrates subscribing a callback to message creation events.

from hikari.events.messages import MessageCreateEvent

async def on_message(event):
    ...

bot.subscribe(MessageCreateEvent, on_message)
See Also

hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.unsubscribe
hikari.api.event_manager.EventManager.wait_for

#  def unsubscribe(
   self,
   event_type: Type[Any],
   callback: Callable[[Any], Coroutine[Any, Any, NoneType]]
) -> None:
View Source
    def unsubscribe(
        self,
        event_type: typing.Type[typing.Any],
        callback: event_manager_.CallbackT[typing.Any],
    ) -> None:
        if listeners := self._listeners.get(event_type):
            _LOGGER.debug(
                "unsubscribing callback %s%s from event-type %s.%s",
                getattr(callback, "__name__", "<anon>"),
                inspect.signature(callback),
                event_type.__module__,
                event_type.__qualname__,
            )
            listeners.remove(callback)
            if not listeners:
                del self._listeners[event_type]
                self._increment_listener_group_count(event_type, -1)

Unsubscribe a given callback from a given event type, if present.

Parameters
  • event_type (typing.Type[T]): The event type to unsubscribe from. This must be the same exact type as was originally subscribed with to be removed correctly. T must derive from hikari.events.base_events.Event.
  • callback: The callback to unsubscribe.
Example

The following demonstrates unsubscribing a callback from a message creation event.

from hikari.events.messages import MessageCreateEvent

async def on_message(event):
    ...

bot.unsubscribe(MessageCreateEvent, on_message)
See Also

hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.wait_for

#  async def wait_for(
   self,
   event_type: Type[~EventT],
   /,
   timeout: Union[float, int, NoneType],
   predicate: Optional[Callable[[~EventT], bool]] = None
) -> ~EventT:
View Source
    async def wait_for(
        self,
        event_type: typing.Type[base_events.EventT],
        /,
        timeout: typing.Union[float, int, None],
        predicate: typing.Optional[event_manager_.PredicateT[base_events.EventT]] = None,
    ) -> base_events.EventT:
        if not inspect.isclass(event_type) or not issubclass(event_type, base_events.Event):
            raise TypeError("Cannot wait for a non-Event type")

        self._check_event(event_type, 1)

        future: asyncio.Future[base_events.EventT] = asyncio.get_running_loop().create_future()

        try:
            waiter_set = self._waiters[event_type]
        except KeyError:
            waiter_set = set()
            self._waiters[event_type] = waiter_set
            self._increment_waiter_group_count(event_type, 1)

        pair = (predicate, future)

        waiter_set.add(pair)  # type: ignore[arg-type]
        try:
            return await asyncio.wait_for(future, timeout=timeout)
        except asyncio.TimeoutError:
            waiter_set.remove(pair)  # type: ignore[arg-type]
            if not waiter_set:
                del self._waiters[event_type]
                self._increment_waiter_group_count(event_type, -1)

            raise

Wait for a given event to occur once, then return the event.

Warning: Async predicates are not supported.

Parameters
  • event_type (typing.Type[hikari.events.base_events.Event]): The event type to listen for. This will listen for subclasses of this type additionally.
  • predicate: A function taking the event as the single parameter. This should return True if the event is one you want to return, or False if the event should not be returned. If left as None (the default), then the first matching event type that the bot receives (or any subtype) will be the one returned.
  • timeout (typing.Union[float, int, None]): The amount of time to wait before raising an asyncio.TimeoutError and giving up instead. This is measured in seconds. If None, then no timeout will be waited for (no timeout can result in "leaking" of coroutines that never complete if called in an uncontrolled way, so is not recommended).
Returns
Raises
  • asyncio.TimeoutError: If the timeout is not None and is reached before an event is received that the predicate returns True for.
See Also

hikari.api.event_manager.EventManager.dispatch
hikari.api.event_manager.EventManager.listen
hikari.api.event_manager.EventManager.stream
hikari.api.event_manager.EventManager.subscribe
hikari.api.event_manager.EventManager.unsubscribe