Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 8 additions & 23 deletions src/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub enum ShardRestartResult {
}

impl ShardRestartResult {
pub fn is_forced(&self) -> bool {
pub fn is_forced(self) -> bool {
matches!(self, Self::ForcedRestart)
}
}
Expand Down Expand Up @@ -90,24 +90,12 @@ impl State {
}
}

pub struct Dispatcher<'a> {
#[allow(dead_code)]
pub shard: &'a mut Shard,
tracker: &'a TaskTracker,
}

impl<'a> Dispatcher<'a> {
fn new(shard: &'a mut Shard, tracker: &'a TaskTracker) -> Self {
Self { shard, tracker }
}

pub fn dispatch(self, future: impl Future<Output = ()> + Send + 'static) {
self.tracker.spawn(future);
}
}

#[tracing::instrument(name = "dispatcher", fields(shard.id = shard.id().number()), skip_all)]
pub async fn run(mut shard: Shard, mut event_handler: impl FnMut(Dispatcher, Event)) -> ResumeInfo {
pub async fn run<Fut: Future<Output = ()> + Send + 'static, S>(
mut event_handler: impl FnMut(Event, S) -> Fut,
mut shard: Shard,
mut state_provider: impl FnMut(&mut Shard) -> S,
) -> ResumeInfo {
let mut receiver = ShardHandle::insert(shard.id());
let mut shutdown = pin!(signal::ctrl_c());
let tracker = TaskTracker::new();
Expand All @@ -132,11 +120,8 @@ pub async fn run(mut shard: Shard, mut event_handler: impl FnMut(Dispatcher, Eve
event = shard.next_event(EVENT_TYPES) => {
match event {
Some(Ok(Event::GatewayClose(_))) if !state.is_active() => break,
Some(Ok(event)) => event_handler(Dispatcher::new(&mut shard, &tracker), event),
Some(Err(error)) => {
tracing::warn!(error = &error as &dyn Error, "shard failed to receive an event");
continue;
}
Some(Ok(event)) => _ = tracker.spawn(event_handler(event, state_provider(&mut shard))),
Some(Err(error)) => tracing::warn!(error = &error as &dyn Error, "shard failed to receive event"),
None => break,
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async fn main() -> anyhow::Result<()> {

let tasks = shards
.into_iter()
.map(|shard| tokio::spawn(dispatch::run(shard, event_handler)))
.map(|shard| tokio::spawn(dispatch::run(event_handler, shard, |_shard| ())))
.collect::<Vec<_>>();

signal::ctrl_c().await?;
Expand All @@ -92,20 +92,20 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}

fn event_handler(dispatcher: dispatch::Dispatcher, event: Event) {
async fn event_handler(event: Event, _state: ()) {
async fn log_err(future: Instrumented<impl Future<Output = anyhow::Result<()>>>) {
let mut future = pin!(future);
if let Err(error) = future.as_mut().await {
let _enter = future.span().enter();
tracing::warn!(error = &*error, "event handler failed");
tracing::warn!(error = &*error, "failed to handle event");
}
}

#[allow(clippy::single_match)]
match event {
Event::InteractionCreate(event) => {
let span = tracing::info_span!(parent: None, "interaction", id = %event.id);
dispatcher.dispatch(log_err(command::interaction(event).instrument(span)))
let span = tracing::info_span!("interaction", id = %event.id);
log_err(command::interaction(event).instrument(span)).await;
}
_ => {}
}
Expand Down