-
Notifications
You must be signed in to change notification settings - Fork 0
Commands and Events
hikari-clusters comes with a builtin IPC system using websockets. The IPC has some builtin commands and events, but you should really write your own commands and events (to avoid confusing behaviour). Built-in commands are non-public and may change at any time.
For the most part, you will only be writing and using commands or events from the bot, but you can also do it from Brain
and Server
.
The bot instances always has a cluster
attribute, as it is set by the cluster launcher. The cluster
attribute has an ipc
attribute, which has a commands
attribute. (I know it's a lot, you'll get used to it though). Defining commands or even is done like this:
MY_COMMANDS = hikari_cluster.commands.CommandGroup()
@MY_COMMANDS.add("get_stats")
async def get_bot_stats(pl: hikari_clusters.payload.COMMAND, bot: MyBot):
return {"guilds": len(bot.cache._guild_entries)} # get the number of cached guilds
class MyBot(hikari.GatewayBot):
cluster: hikari_clusters.Cluster # purely optional typehint
def __init__(*args, **kwargs):
super().__init__(*args, **kwargs)
async def start(self, *args, **kwargs) -> None:
self.cluster.ipc.commands.cmd_kwargs["bot"] = self
self.cluster.ipc.commands.include(MY_COMMANDS)
await super().start(*args, **kwargs)
async def get_stats_from_all_clusters(bot: MyBot):
responses = await bot.ipc.send_command(
bot.ipc.clusters, # send this command to all clusters
"get_stats", # call the "get_stats" command
)
# responses will be a dictionary that looks like this:
# {
# 0: payload.Payload[payload.ResponseOk],
# 1: payload.Payload[payload.ResponseOk],
# ...
# }
# The cluster might return a different payload type. Some alternatives are:
# - payload.Payload[payload.OkResponse]: worked as expected
# - payload.Payload[payload.ResponseNotFound]: the command was not registered
# - payload.Payload[payload.ResponseTraceback]: the command raised an exception
# - callbacks.NoResponse: The client simply didn't respond. Most likely, they disconnected.
#
# Because of this, you should always check that the type of each response is a payload.Payload,
# and that payload.data is of type ResponseOk. It could look something like this:
total_guilds = 0
for resp in responses.values():
if not isinstance(resp, payload.Payload):
continue
if not isinstance(resp.data, payload.ResponseOk):
continue
total_guilds += resp.data.data["guilds"] # the final .data is the dict returned by the command
return total_guilds
Events are different from commands in one big way: The don't return a response. There are a few reasons you might do this:
- If you don't need to get a response
- If it's not a huge deal if you know that the command callback succeeded
- You want this to be fast.
ipc.send_command
waits for responses, butipc.send_event
triggers the send and then continues.
Events are registered in almost the exact same way:
MY_EVENTS = hikari_clusters.events.EventGroup()
@EVENTS.add("shutdown")
async def shutdown(pl: payload.EVENT, bot: MyBot):
bot.close()
class MyBot(hikari.GatewayBot):
cluster: hikari_clusters.Cluster # purely optional typehint
def __init__(*args, **kwargs):
super().__init__(*args, **kwargs)
async def start(self, *args, **kwargs) -> None:
self.cluster.ipc.events.event_kwargs["bot"] = self
self.cluster.ipc.events.include(MY_EVENTS)
await super().start(*args, **kwargs)
Events are triggered in almost the same way as commands as well:
async def shutdown_bot(bot: MyBot):
await bot.ipc.send_event(bot.ipc.clusters, "shutdown")