Skip to content

Commit

Permalink
Merge branch 'aa/doc/EN-535-document-odin' into aa/doc/EN-535-documen…
Browse files Browse the repository at this point in the history
…t-odin2
  • Loading branch information
amab8901 committed Nov 14, 2023
2 parents ae1e109 + 095fa31 commit aff16d2
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 1 deletion.
5 changes: 5 additions & 0 deletions drasil-hugin/src/protocol/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub trait IntoFrame {
fn into_frame(self) -> Frame;
}

/// The transaction types that can be executed in Drasil.
#[derive(Debug)]
pub enum Command {
BuildContract(BuildContract),
Expand All @@ -49,6 +50,10 @@ pub enum Command {
}

impl Command {
/// Parse compressed data from a network connection into human-readable form.
/// Example: if Heimdallr (client) sends a transaction to Odin (server) via
/// network connection, then Odin can use this function to convert the
/// parse the data into human-readable form.
pub fn from_frame(frame: Frame) -> crate::Result<Command> {
let mut parse = Parse::new(frame)?;
log::debug!("FromFrame: {:?}", &parse);
Expand Down
5 changes: 5 additions & 0 deletions drasil-hugin/src/protocol/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,25 @@ use tokio::io::BufWriter;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

/// Wrapper around `TcpStream` connection that adds the ability to store I/O data in buffer.
#[derive(Debug)]
pub struct Connection {
stream: BufWriter<TcpStream>,
buffer: BytesMut,
}

impl Connection {

/// Create new connection that allows you to store I/O data in buffer.
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream: BufWriter::new(stream),
buffer: BytesMut::with_capacity(50 * 1024),
}
}

/// Read data sent by your counterpart in the given connection.
/// Example: Odin server reading data sent by a client.
pub async fn read_frame(&mut self) -> crate::Result<Option<Frame>> {
loop {
if let Some(frame) = self.parse_frame().await? {
Expand Down
4 changes: 4 additions & 0 deletions drasil-hugin/src/protocol/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ use std::io::Cursor;
use std::num::TryFromIntError;
use std::string::FromUtf8Error;

/// Data package sent through a network connection to instruct the recipient which
/// Drasil transaction to execute.
/// Example: Heimdallr (client) sending compressed data to Odin (server) via TCP
/// connection to tell Odin which transaction to perform
#[derive(Clone, Debug)]
pub enum Frame {
Simple(String),
Expand Down
4 changes: 4 additions & 0 deletions drasil-hugin/src/protocol/shutdown.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use tokio::sync::broadcast;

/// Specifies the state of a given connection between server and client.
/// This can be used as a trigger mechanism for whether the server should
/// continue listening to a connection or drop it.
#[derive(Debug)]
pub struct Shutdown {
shutdown: bool,
Expand All @@ -18,6 +21,7 @@ impl Shutdown {
self.shutdown
}

///
pub async fn recv(&mut self) {
if self.shutdown {
return;
Expand Down
29 changes: 28 additions & 1 deletion services/odin/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
/// Odin is a server that receives transaction requests from Heimdallr clients. Odin can establish a TCP
/// connection with a given number of Heimdallr clients. Each Heimdallr client can send a transaction
/// request through the TCP connection to Odin server, which will cause the Odin server to
/// parse the transaction request from compressed form into human-readable form (also the form expected by
/// other parts of Drasil). The parsed transaction request is then passed to Hugin for further processing.

extern crate pretty_env_logger;
use drasil_hugin::protocol::{connection::Connection, Shutdown};
use drasil_hugin::Command;
Expand All @@ -11,15 +17,20 @@ use tokio::time::{self, Duration};
use std::env;
use std::str;

/// Wrapper that extends the server (`TcpListener`) with additional network capabilities.
struct Listener {
/// basic server implementation
listener: TcpListener,
/// maximal number of clients that can be simultaneously connected to the server
limit_connections: Arc<Semaphore>,

notify_shutdown: broadcast::Sender<()>,
shutdown_complete_rx: mpsc::Receiver<()>,
shutdown_complete_tx: mpsc::Sender<()>,
}

/// Wrapper around `Connection` connection that prevents the number of connections from exceeding
/// a maximal number, and enables the checking of whether the connection is alive.
struct Handler {
connection: Connection,
limit_connections: Arc<Semaphore>,
Expand All @@ -28,10 +39,12 @@ struct Handler {
_shutdown_complete: mpsc::Sender<()>,
}

/// Default address exposed by Odin server if another address isn't specified.
const DEFAULT_HOST: &str = "127.0.0.1";
const DEFAULT_PORT: &str = "6142";
const MAX_CONNECTIONS: usize = 1000;

/// Run the Odin server until Odin receives ctrl_c shutdown command.
pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<()> {
let (notify_shutdown, _) = broadcast::channel(1);
let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1);
Expand All @@ -43,24 +56,27 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<
shutdown_complete_rx,
};

// Run Odin server, until Odin receives ctrl+C which causes the server to turn off
tokio::select! {
res = server.run() => {
if let Err(err) = res {
log::error!("failed to accept: {:?}",err);
log::error!("failed to accept: {:?}", err);
}
}
_ = shutdown => {
log::info!("shutting down")
}
}

// Separate out Odin's channels in preparation for shutdown
let Listener {
mut shutdown_complete_rx,
shutdown_complete_tx,
notify_shutdown,
..
} = server;

// Shut down the broadcast & mpsc channels
drop(notify_shutdown);
drop(shutdown_complete_tx);
let _ = shutdown_complete_rx.recv().await;
Expand All @@ -69,12 +85,15 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<
}

impl Listener {
/// Run Odin server by establishing connections between Odin server
/// and (one or) many remote clients.
async fn run(&mut self) -> crate::Result<()> {
log::info!(
"accepting inbound connections at {:?}",
self.listener.local_addr()?
);

// each iteration represents a single connection
loop {
self.limit_connections.acquire().await?.forget();
let socket = self.accept().await?;
Expand All @@ -94,6 +113,9 @@ impl Listener {
}
}

/// Accept incoming connection from remote client. This function checks for
/// connection requests over and over. If remote client hasn't made any connection
/// requests, this function will wait twice as long as last time before checking again.
async fn accept(&mut self) -> crate::Result<TcpStream> {
log::info!("accepted connection");
let mut backoff = 1;
Expand All @@ -113,6 +135,8 @@ impl Listener {
}

impl Handler {
/// Continuously listen for incoming instructions from the given remote client
/// to Odin server until the connection is shut down.
async fn run(&mut self) -> crate::Result<()> {
log::debug!("started new handler");
while !self.shutdown.is_shutdown() {
Expand All @@ -135,12 +159,15 @@ impl Handler {
}
}

/// Increase the number of allowed connections to Odin server by 1 when
/// current connection is terminated
impl Drop for Handler {
fn drop(&mut self) {
self.limit_connections.add_permits(1);
}
}

/// Specify the address that Odin server will expose, and then run the Odin server.
use tokio::signal;
#[tokio::main]
pub async fn main() -> crate::Result<()> {
Expand Down

0 comments on commit aff16d2

Please sign in to comment.