Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
12 changes: 9 additions & 3 deletions .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ env:

jobs:
clippy:
strategy:
matrix:
variant: ["multi-process", "single-process"]
runs-on: ubuntu-latest

steps:
Expand All @@ -24,12 +27,15 @@ jobs:
with:
tool: cargo-generate

- run: cargo generate --path . --name instance --template-values-file .github/template-values.toml
- run: cargo generate ${{ matrix.variant }} --path . --name instance --values-file .github/values.toml

- run: cargo clippy
working-directory: instance

fmt:
rustfmt:
strategy:
matrix:
variant: ["multi-process", "single-process"]
runs-on: ubuntu-latest

steps:
Expand All @@ -45,7 +51,7 @@ jobs:
with:
tool: cargo-generate

- run: cargo generate --path . --name instance --template-values-file .github/template-values.toml
- run: cargo generate ${{ matrix.variant }} --path . --name instance --values-file .github/values.toml

- run: cargo fmt -- --check
working-directory: instance
37 changes: 31 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,42 @@
# Twilight Template

An opinionated [Twilight] bot template.
A set of opinionated [Twilight] bot templates.

The template is built in a modular fashion and provides the following features:
## Usage

1. Install [cargo-generate]: `cargo install cargo-generate`
1. Create a bot based upon a select template: `cargo generate twilight-rs/template`

## Variants

- Single-process: fast restarts
- Multi-process: zero downtime restarts

### Single-process

* Gateway session resumption between processes
This template is built in a modular fashion and provides the following features:

* Gateway session resumption between restarts
* Clean shutdown that awaits event handler completion
* An administrative `/restart <shard>` command

## Usage
### Multi-process

1. Install [cargo-generate]: `cargo install cargo-generate`
1. Create a bot based upon this template: `cargo generate twilight-rs/template`
This template is split into `gateway` and `worker` crates, where the `gateway`
forwards events and provides state to the `worker`. It otherwise mirrors the
single-process template:

* gateway:
* Gateway session resumption between restarts
* Clean shutdown that awaits event forwarder completion
* worker:
* Clean shutdown that awaits event handler completion

**Note**: this adds IPC overhead compared to the single-process template. You
may spread the load of the single-process template:

1. Across threads with the multi-threaded tokio runtime
1. Across machines by partitioning your shards (i.e. each machine runs 1/X shards)

[cargo-generate]: https://github.com/cargo-generate/cargo-generate
[Twilight]: https://github.com/twilight-rs/twilight
9 changes: 1 addition & 8 deletions cargo-generate.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,2 @@
[template]
ignore = [".github", "Cargo.lock", "README.md", "target"]

[hooks]
init = ["init-script.rhai"]

[placeholders]
admin_guild_id = { prompt = "Enter the admin guild ID (where admin commands are available)", type = "string" }
application_id = { prompt = "Enter the application ID (available in the applications dashboard: <https://discord.com/developers/applications>)", type = "string" }
sub_templates = ["multi-process", "single-process"]
1 change: 0 additions & 1 deletion init-script.rhai

This file was deleted.

File renamed without changes.
26 changes: 26 additions & 0 deletions multi-process/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[workspace]
members = ["gateway", "worker"]
resolver = "3"

[workspace.package]
edition = "2024"

[workspace.dependencies]
anyhow = "1"
axum = "0.8"
futures-util = "0.3"
reqwest = "0.13"
rustls = "0.23"
serde = "1"
serde_json = "1"
tokio = "1"
tokio-stream = "0.1"
tokio-util = "0.7"
tokio-websockets = "0.13"
tracing = "0.1"
tracing-subscriber = "0.3"
twilight-cache-inmemory = "0.17"
twilight-gateway = "0.17"
twilight-http = "0.17"
twilight-model = "0.17"
twilight-util = "0.17"
5 changes: 5 additions & 0 deletions multi-process/cargo-generate.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[template]
ignore = ["Cargo.lock", "target"]

[placeholders]
application_id = { prompt = "Enter the application ID (available in the applications dashboard: <https://discord.com/developers/applications>)", type = "string" }
19 changes: 19 additions & 0 deletions multi-process/gateway/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "gateway"
version = "0.1.0"
edition.workspace = true

[dependencies]
anyhow.workspace = true
axum = { workspace = true, features = ["ws"] }
rustls.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
tokio = { workspace = true, features = ["fs", "macros", "net", "rt", "signal"] }
tokio-stream.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
twilight-cache-inmemory.workspace = true
twilight-gateway.workspace = true
twilight-http.workspace = true
twilight-model.workspace = true
26 changes: 26 additions & 0 deletions multi-process/gateway/src/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use std::{ops::Deref, sync::OnceLock};
use tokio::sync::Notify;
use twilight_cache_inmemory::InMemoryCache;

pub static CONTEXT: Ref = Ref(OnceLock::new());

#[derive(Debug)]
pub struct Context {
pub cache: InMemoryCache,
pub notify: Notify,
}

pub fn init(cache: InMemoryCache, notify: Notify) {
let context = Context { cache, notify };
assert!(CONTEXT.0.set(context).is_ok());
}

pub struct Ref(OnceLock<Context>);

impl Deref for Ref {
type Target = Context;

fn deref(&self) -> &Self::Target {
self.0.get().unwrap()
}
}
158 changes: 158 additions & 0 deletions multi-process/gateway/src/forward.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use crate::{CONTEXT, ResumeInfo};
use anyhow::anyhow;
use axum::{
body::Bytes,
extract::{WebSocketUpgrade, ws::Message as SocketMessage},
response::Response,
};
use serde::de::DeserializeSeed as _;
use std::{collections::VecDeque, error::Error, pin::pin};
use tokio::{
signal,
sync::broadcast::{self, error::RecvError},
};
use tokio_stream::StreamExt as _;
use twilight_gateway::{CloseFrame, Event, EventTypeFlags, Message as ShardMessage, Shard};
use twilight_model::gateway::{OpCode, event::GatewayEventDeserializer};

const BUFFER_LIMIT: usize = 1000;
const EVENT_TYPES: EventTypeFlags = EventTypeFlags::INTERACTION_CREATE
.union(EventTypeFlags::READY)
.union(EventTypeFlags::ROLE_CREATE)
.union(EventTypeFlags::ROLE_DELETE)
.union(EventTypeFlags::ROLE_UPDATE);

fn parse(input: &str) -> anyhow::Result<Option<Event>> {
let deserializer =
GatewayEventDeserializer::from_json(input).ok_or_else(|| anyhow!("missing opcode"))?;
let opcode = OpCode::from(deserializer.op()).ok_or_else(|| anyhow!("unknown opcode"))?;
let event_type = EventTypeFlags::try_from((opcode, deserializer.event_type()))
.map_err(|_| anyhow!("missing event type"))?;

Ok(EVENT_TYPES
.contains(event_type)
.then(|| {
let mut json_deserializer = serde_json::Deserializer::from_str(input);
deserializer.deserialize(&mut json_deserializer)
})
.transpose()?
.map(Into::into))
}

enum ShardState {
Active,
Shutdown,
}

impl ShardState {
fn is_active(&self) -> bool {
matches!(self, Self::Active)
}

fn is_shutdown(&self) -> bool {
matches!(self, Self::Shutdown)
}
}

#[tracing::instrument(fields(id = shard.id().number()), skip_all)]
pub async fn shard(event_tx: broadcast::Sender<Bytes>, mut shard: Shard) -> ResumeInfo {
let mut notified = None;
let mut buffer = VecDeque::new();
let mut shutdown = pin!(signal::ctrl_c());
let mut state = ShardState::Active;

loop {
tokio::select! {
_ = notified.as_mut().unwrap(), if notified.is_some() => {
notified = None;
loop {
let inner = CONTEXT.notify.notified();
match event_tx.send(buffer.pop_front().unwrap()) {
Ok(_) if buffer.is_empty() => break,
Ok(_) => {}
Err(error) => {
notified = Some(Box::pin(inner));
buffer.push_front(error.0);
break;
}
}
}
}
_ = &mut shutdown, if !state.is_shutdown() => {
if state.is_active() {
shard.close(CloseFrame::RESUME);
}
state = ShardState::Shutdown;
}
event = shard.next() => {
match event {
Some(Ok(ShardMessage::Close(_))) if !state.is_active() => break,
Some(Ok(ShardMessage::Close(_))) => {}
Some(Ok(ShardMessage::Text(json))) => {
match parse(&json) {
Ok(Some(event)) => {
CONTEXT.cache.update(&event);

let inner = notified.is_none().then(|| CONTEXT.notify.notified());
if let Err(error) = event_tx.send(json.into()) {
if let Some(inner) = inner {
notified = Some(Box::pin(inner));
}

if buffer.len() == BUFFER_LIMIT {
buffer.pop_front();
}
buffer.push_back(error.0);
}
}
Ok(_) => {}
Err(error) => tracing::warn!(error = &*error, "failed to deserialize event"),
}
}
Some(Err(error)) => tracing::warn!(error = &error as &dyn Error, "shard failed to receive event"),
None => break,
}
}
}
}

return ResumeInfo::from(&shard);
}

#[tracing::instrument(skip_all)]
pub async fn socket(ws: WebSocketUpgrade, weak: broadcast::WeakSender<Bytes>) -> Response {
ws.on_upgrade(async move |mut socket| {
if let Some(mut event_rx) = weak.upgrade().map(|tx| tx.subscribe()) {
tracing::info!("worker connected");
CONTEXT.notify.notify_waiters();

loop {
tokio::select! {
message = socket.recv() => {
match message {
Some(Ok(SocketMessage::Close(_))) => return,
Some(Err(error)) => tracing::warn!(error = &error as &dyn Error, "socket failed to receive event"),
None => return,
_ => {}
}
}
event = event_rx.recv() => {
match event {
Ok(event) => {
if let Err(error) = socket.send(SocketMessage::Text(event.try_into().unwrap())).await {
tracing::warn!(error = &error as &dyn Error, "socket failed to send event");
return;
}
}
Err(RecvError::Closed) => return,
Err(RecvError::Lagged(count)) => tracing::warn!("socket lagged {count} events"),
}
}
}
}
}

// Drive socket to completion.
while socket.recv().await.is_some() {}
})
}
Loading