Skip to content

Commit

Permalink
adds otel; improves docker build
Browse files Browse the repository at this point in the history
  • Loading branch information
dalton-oliveira committed Dec 8, 2023
1 parent e7a437a commit 86b01d3
Show file tree
Hide file tree
Showing 9 changed files with 1,119 additions and 401 deletions.
1,306 changes: 990 additions & 316 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 6 additions & 11 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
FROM messense/rust-musl-cross:x86_64-musl as chef
RUN cargo install cargo-chef wasm-bindgen-cli
RUN rustup target add wasm32-unknown-unknown
RUN apt update && apt install wget
RUN wget https://dmej8g5cpdyqd.cloudfront.net/downloads/noip-duc_3.0.0-beta.7.tar.gz && tar xf noip-duc_3.0.0-beta.7.tar.gz
# RUN ls noip-duc_3.0.0-beta.7/binaries -lah
# RUN noip-duc_3.0.0-beta.7/binaries/noip-duc_3.0.0-beta.7_x86_64-musl

WORKDIR /snake

FROM chef AS planner
# Copy source code from previous stage
COPY . .
# Generate info for caching dependencies
RUN cargo chef prepare --recipe-path recipe-wasm.json
RUN cargo chef prepare --recipe-path recipe.json

FROM chef AS builder
COPY --from=planner /snake/recipe.json recipe.json
# Build & cache dependencies
RUN cargo chef cook --release --recipe-path recipe.json
# Copy source code from previous stage
COPY --from=planner /snake/recipe-wasm.json recipe-wasm.json
RUN cargo chef cook --release --target wasm32-unknown-unknown --recipe-path recipe-wasm.json -p wasm-render
RUN cargo chef cook --release --recipe-path recipe.json -p snake-web

COPY . .
# Build application
RUN bash build.sh

# Create a new stage with a minimal image
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ cargo run -p snake-termion
## Roadmap

- Trace backend and front-end calls with [Open Telemetry](https://github.com/open-telemetry/opentelemetry-rust)
- Add unit and integration tests
- Experiment WebRTC in order to reduce latency
- Add unit and integration tests
- Run it on a embedded system with restricted memory and processing power
- Large world where the snake can navigate to stress test chosen data structures
- Other game elements such as walls and wormholes
Expand Down
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
set -e
cargo build -p wasm-render --target wasm32-unknown-unknown --release
wasm-bindgen --target web --out-dir snake-web/www/wasm --no-typescript ./target/wasm32-unknown-unknown/release/wasm_render.wasm
cargo build --release
cargo build -p snake-web --release
12 changes: 9 additions & 3 deletions snake-web/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@ edition.workspace = true

[dependencies]
snake = { path = "../core" }
tokio = { version = "1.32.0", features = ["macros"] }
salvo = { version = "0.58.0", features = ["websocket", "serve-static"] }
tokio = { version = "1", features = ["macros"] }
salvo = { version = "0.59.0", features = ["websocket", "serve-static", "otel", "affix"] }
once_cell = "1"
futures-util = { version = "0.3", default-features = false }
rust-embed = "8.0.0"
bincode = "2.0.0-rc.3"
tracing = "0.1.39"
tracing = "0.1"
tracing-subscriber = "0.3.17"
opentelemetry = { version = "0.21" }
opentelemetry_sdk = { version = "0.21.1", features = ["rt-tokio"] }
tracing-opentelemetry = { version = "0.22.0" }
sentry = "0.32.0"
sentry-tracing = "0.32.0"
openssl = { version = "0.10.54", features = ["vendored"] }
34 changes: 12 additions & 22 deletions snake-web/src/input_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,15 @@ use salvo::websocket::WebSocket;
use snake::{game::Game, types::Direction, utils::decode};
use std::{sync::Arc, time::SystemTime};
use tokio::sync::RwLock;
use tracing::debug;
use tracing::{instrument, Span};

use crate::{websocket_game::DIRECTION, websocket_game::PING, DirectionArc};
use crate::{websocket_game::DIRECTION, websocket_game::PING};

pub fn rx_commands(
direction_arc: DirectionArc,
snake_id: u16,
mut rx: SplitStream<WebSocket>,
game: Arc<RwLock<Game>>,
) {
#[instrument(skip_all)]
pub fn rx_commands(snake_id: u16, mut rx: SplitStream<WebSocket>, game: Arc<RwLock<Game>>) {
let fut = async move {
let mut direction = direction_arc.read().await.clone();
while let Some(result) = rx.next().await {
let now = SystemTime::now();
match result {
Err(_msg) => break,
Ok(msg) => {
Expand All @@ -25,15 +21,10 @@ pub fn rx_commands(
continue;
}
match msg {
[PING, ..] => log_ping(msg, snake_id),
[PING, ..] => log_ping(msg, now, snake_id),
[DIRECTION, code] => {
if let Some(next_direction) = to_direction(*code) {
if direction == next_direction {
continue;
}
let mut d = RwLock::write(&direction_arc).await;
*d = next_direction;
direction = next_direction;
RwLock::write(&game).await.head_to(snake_id, next_direction);
}
}
_ => continue,
Expand All @@ -46,13 +37,12 @@ pub fn rx_commands(
tokio::task::spawn(fut);
}

fn log_ping(msg: &[u8], snake_id: u16) {
#[instrument(fields(ping_ms, ping_μs) skip(msg, now))]
fn log_ping(msg: &[u8], now: SystemTime, snake_id: u16) {
let (past, _size): (SystemTime, usize) = decode(&msg[1..msg.len()]).unwrap();
let μs = SystemTime::now()
.duration_since(past)
.expect("ping measure error")
.as_millis();
debug!("snake: {snake_id} ping: {:?} ms", μs);
let duration = now.duration_since(past).expect("ping measure error");
Span::current().record("ping_ms", duration.as_millis());
Span::current().record("ping_μs", duration.as_micros());
}

fn to_direction(msg: u8) -> Option<Direction> {
Expand Down
51 changes: 48 additions & 3 deletions snake-web/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
use std::sync::Arc;

use once_cell::sync::Lazy;

use opentelemetry::global;
use opentelemetry::trace::TracerProvider;
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::Tracer};

use salvo::otel::Tracing;

use rust_embed::RustEmbed;
use salvo::prelude::*;
use salvo::serve_static::static_embed;
use salvo::websocket::WebSocketUpgrade;
use snake_web::websocket_game::WsGame;
use tracing::level_filters::LevelFilter;
use tracing::Level;
use tracing_subscriber;
use tracing_subscriber::prelude::*;
use tracing_subscriber::{self};

#[derive(RustEmbed)]
#[folder = "www/"]
Expand All @@ -15,12 +25,43 @@ struct Assets;
static GAME: Lazy<WsGame> = Lazy::new(|| WsGame::new());
const PORT_BIND: &str = "80";

fn init_tracer() -> Tracer {
global::set_text_map_propagator(TraceContextPropagator::new());
let provider = opentelemetry_sdk::trace::TracerProvider::builder().build();
let tracer = provider.tracer("snake-web");
global::set_tracer_provider(provider);

let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer.clone());

tracing_subscriber::registry()
.with(sentry_tracing::layer())
.with(LevelFilter::from_level(Level::DEBUG))
.with(opentelemetry)
.init();

return tracer;
}

#[tokio::main]
async fn main() {
tracing_subscriber::fmt().with_max_level(Level::INFO).init();
let tracer = init_tracer();

let _guard = std::env::var("SENTRY_SDN").ok().map(|sdn| {
println!("{}", sdn);
sentry::init((
sdn,
sentry::ClientOptions {
release: sentry::release_name!(),
traces_sample_rate: 1.0,
..Default::default()
},
))
});

GAME.start_game();
let router = Router::new()
.hoop(affix::inject(Arc::new(tracer.clone())))
.hoop(Tracing::new(tracer))
.push(Router::with_path("game_data").goal(user_connected))
.push(Router::with_path("<*path>").get(static_embed::<Assets>().fallback("index.html")));

Expand All @@ -33,7 +74,11 @@ async fn main() {
}

#[handler]
async fn user_connected(req: &mut Request, res: &mut Response) -> Result<(), StatusError> {
async fn user_connected(
req: &mut Request,
res: &mut Response,
_depot: &mut Depot,
) -> Result<(), StatusError> {
WebSocketUpgrade::new()
.upgrade(req, res, |ws| GAME.ingress_user(ws))
.await
Expand Down
94 changes: 51 additions & 43 deletions snake-web/src/websocket_game.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{input_thread::rx_commands, DirectionArc, Directions};
use crate::input_thread::rx_commands;
use futures_util::SinkExt;
use futures_util::StreamExt;
use salvo::websocket::{Message, WebSocket};
Expand All @@ -19,21 +19,24 @@ use tokio::{
},
time::{sleep, Instant},
};
use tracing::{error, instrument};
use tracing::error;
use tracing::error_span;
use tracing::info_span;
use tracing::span;
use tracing::Level;

const CONFIG: GameConfig = GameConfig {
size: 5,
start: (1, 0),
dim: (30, 20),
direction: Direction::Right,
};
const TICK_INTERVAL: u128 = 250 * 1000;
const TICK_INTERVAL: u128 = 251 * 1000;
#[derive(Debug)]
pub struct WsGame {
pub game: Arc<RwLock<Game>>,
directions: Directions,
tx: Arc<RwLock<Sender<Message>>>,
rx: Receiver<Message>,
game_data_sender: Arc<RwLock<Sender<Message>>>,
game_data_receiver: Receiver<Message>,
}

pub const GAME_DATA: u8 = 1;
Expand All @@ -45,64 +48,65 @@ impl WsGame {
pub fn new() -> WsGame {
let game = Game::new(CONFIG);
let game = Arc::new(RwLock::new(game));
let data: Message = Message::binary(vec![]);
let (tx, rx) = watch::channel(data);
let (game_data_sender, game_data_receiver) = watch::channel(Message::binary(vec![]));

WsGame {
game,
directions: Directions::default(),
tx: Arc::new(RwLock::new(tx)),
rx,
game_data_sender: Arc::new(RwLock::new(game_data_sender)),
game_data_receiver,
}
}

#[instrument]
async fn add_user(&self) -> (u16, DirectionArc) {
let direction = Arc::new(RwLock::new(CONFIG.direction.clone()));
let mut directions = RwLock::write(&self.directions).await;

let snake_id = RwLock::write(&self.game).await.add_snake();
directions.insert(snake_id, Arc::clone(&direction));

return (snake_id, direction);
}

#[instrument(skip(ws))]
pub async fn ingress_user(&self, ws: WebSocket) {
let (mut ws_tx, ws_rx) = ws.split();

let (snake_id, direction) = self.add_user().await;
let snake_id = RwLock::write(&self.game).await.add_snake();
let game = Arc::clone(&self.game);
let mut rx = self.rx.clone();
let mut game_data_receiver = self.game_data_receiver.clone();
tokio::task::spawn(async move {
let notify = Message::binary(to_command(NOTIFY, encode(snake_id).unwrap()));
if let Err(_msg) = ws_tx.send(notify).await {
RwLock::write(&game).await.remove_snake(snake_id);
}
loop {
if let Ok(()) = rx.changed().await {
let ping =
Message::binary(to_command(PING, encode(SystemTime::now()).unwrap()));
if let Err(_msg) = ws_tx.send(ping).await {
if let Ok(()) = game_data_receiver.changed().await {
let loop_span = span!(Level::INFO, "game_data", snake_id);
let _enter = loop_span.enter();

let game_span = info_span!("game_data");
let game_data = game_data_receiver.borrow_and_update().to_owned();

if let Err(_msg) = ws_tx.send(game_data).await {
error_span!("game_data");
break;
}
let game_data = rx.borrow_and_update().to_owned();
drop(game_span);

if let Err(_msg) = ws_tx.send(game_data).await {
let ping_span = info_span!("ping");
let ping =
Message::binary(to_command(PING, encode(SystemTime::now()).unwrap()));
if let Err(_msg) = ws_tx.send(ping).await {
// @todo how to send error spans
span!(Level::ERROR, "ping_error");
break;
}
drop(ping_span);
} else {
break;
}
}
let span = span!(Level::INFO, "remove", snake_id);

RwLock::write(&game).await.remove_snake(snake_id);
drop(span);
});

rx_commands(direction, snake_id, ws_rx, Arc::clone(&self.game));
rx_commands(snake_id, ws_rx, Arc::clone(&self.game))
}

pub fn start_game(&self) {
let game_arc = Arc::clone(&self.game);
let directions = Arc::clone(&self.directions);
let tx = Arc::clone(&self.tx);
let game_data_sender = Arc::clone(&self.game_data_sender);
let fut = async move {
{
let mut game = RwLock::write(&game_arc).await;
Expand All @@ -115,26 +119,30 @@ impl WsGame {
loop {
let now = Instant::now();
{
let root = info_span!("game_loop");
let _enter = root.enter();

let mut game = RwLock::write(&game_arc).await;
let tx = RwLock::write(&tx).await;
if game.state == GameState::Over {
break;
}

// copy received directions on game
for (snake_id, direction) in RwLock::read(&directions).await.iter() {
let direction = RwLock::read(direction).await.clone();
game.head_to(*snake_id, direction);
}

let span = info_span!("game_tick");
game.tick();
game.add_missing_food();
drop(span);

let span = info_span!("encode_game_data");
let game_data = game.encode_game_data();
let game_data = Message::binary(to_command(GAME_DATA, game_data));
if let Err(msg) = tx.send(game_data) {
drop(span);

let span = info_span!("send_game_data");
let game_data_sender = RwLock::write(&game_data_sender).await;
if let Err(msg) = game_data_sender.send(game_data) {
error!("error sending game_data {msg:?}");
}
drop(span);
}

let elapsed_micro = now.elapsed().as_micros();
Expand All @@ -144,7 +152,7 @@ impl WsGame {
}
};

tokio::task::spawn(fut);
tokio::spawn(fut);
}
}

Expand Down
2 changes: 1 addition & 1 deletion wasm-render/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition.workspace = true
[dependencies]
js-sys = "0.3.64"
snake = { path = "../core" }
wasm-bindgen = "0.2.88"
wasm-bindgen = "0.2"
bincode = "2.0.0-rc.3"

[lib]
Expand Down

0 comments on commit 86b01d3

Please sign in to comment.