Skip to content

Commit

Permalink
feat: adds PING support to momento proxy resp impl (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
eaddingtonwhite authored Nov 23, 2022
1 parent 0ada0f9 commit e3e79ab
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 0 deletions.
15 changes: 15 additions & 0 deletions src/protocol/resp/src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ use std::io::{Error, ErrorKind};
use std::sync::Arc;

mod get;
mod ping;
mod set;

pub use get::GetRequest;
pub use ping::PingRequest;
pub use set::SetRequest;

#[derive(Default)]
Expand Down Expand Up @@ -93,6 +95,9 @@ impl Parse<Request> for RequestParser {
Some(b"set") | Some(b"SET") => {
SetRequest::try_from(message).map(Request::from)
}
Some(b"ping") | Some(b"PING") => {
PingRequest::try_from(message).map(Request::from)
}
_ => Err(Error::new(ErrorKind::Other, "unknown command")),
},
_ => {
Expand All @@ -115,6 +120,7 @@ impl Compose for Request {
match self {
Self::Get(r) => r.compose(buf),
Self::Set(r) => r.compose(buf),
Self::Ping(r) => r.compose(buf),
}
}
}
Expand All @@ -123,6 +129,7 @@ impl Compose for Request {
pub enum Request {
Get(GetRequest),
Set(SetRequest),
Ping(PingRequest),
}

impl From<GetRequest> for Request {
Expand All @@ -137,10 +144,17 @@ impl From<SetRequest> for Request {
}
}

impl From<PingRequest> for Request {
fn from(other: PingRequest) -> Self {
Self::Ping(other)
}
}

#[derive(Debug, PartialEq, Eq)]
pub enum Command {
Get,
Set,
Ping,
}

impl TryFrom<&[u8]> for Command {
Expand All @@ -150,6 +164,7 @@ impl TryFrom<&[u8]> for Command {
match other {
b"get" | b"GET" => Ok(Command::Get),
b"set" | b"SET" => Ok(Command::Set),
b"ping" | b"PING" => Ok(Command::Ping),
_ => Err(()),
}
}
Expand Down
60 changes: 60 additions & 0 deletions src/protocol/resp/src/request/ping.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2022 Twitter, Inc.
// Licensed under the Apache License, Version 2.0
// http://www.apache.org/licenses/LICENSE-2.0

use super::*;
use std::io::{Error, ErrorKind};

#[derive(Debug, PartialEq, Eq)]
#[allow(clippy::redundant_allocation)]
pub struct PingRequest {}

impl TryFrom<Message> for PingRequest {
type Error = Error;

fn try_from(other: Message) -> Result<Self, Error> {
if let Message::Array(array) = other {
if array.inner.is_none() {
return Err(Error::new(ErrorKind::Other, "malformed command"));
}
Ok(Self {})
} else {
Err(Error::new(ErrorKind::Other, "malformed command"))
}
}
}

impl PingRequest {
pub fn new() -> Self {
Self {}
}
}

impl From<&PingRequest> for Message {
fn from(_: &PingRequest) -> Message {
Message::Array(Array {
inner: Some(vec![Message::BulkString(BulkString::new(b"Ping"))]),
})
}
}

impl Compose for PingRequest {
fn compose(&self, buf: &mut dyn BufMut) -> usize {
let message = Message::from(self);
message.compose(buf)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn parser() {
let parser = RequestParser::new();
assert_eq!(
parser.parse(b"PING\r\n").unwrap().into_inner(),
Request::Ping(PingRequest::new())
);
}
}
5 changes: 5 additions & 0 deletions src/proxy/momento/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ pub(crate) async fn handle_resp_client(
break;
}
}
resp::Request::Ping(_) => {
if resp::ping(&mut socket).await.is_err() {
break;
}
}
}
buf.advance(consumed);
}
Expand Down
2 changes: 2 additions & 0 deletions src/proxy/momento/src/protocol/resp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
pub use protocol_resp::{Request, RequestParser};

mod get;
mod ping;
mod set;

pub use get::*;
pub use ping::*;
pub use set::*;
23 changes: 23 additions & 0 deletions src/proxy/momento/src/protocol/resp/ping.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2022 Twitter, Inc.
// Licensed under the Apache License, Version 2.0
// http://www.apache.org/licenses/LICENSE-2.0

use crate::Error;
use net::TCP_SEND_BYTE;
use session::{SESSION_SEND, SESSION_SEND_BYTE, SESSION_SEND_EX};
use tokio::io::AsyncWriteExt;

const PONG_RSP: &[u8; 7] = b"+PONG\r\n";

pub async fn ping(socket: &mut tokio::net::TcpStream) -> Result<(), Error> {
let mut response_buf = Vec::new();
response_buf.extend_from_slice(PONG_RSP);
SESSION_SEND.increment();
SESSION_SEND_BYTE.add(response_buf.len() as _);
TCP_SEND_BYTE.add(response_buf.len() as _);
if let Err(e) = socket.write_all(&response_buf).await {
SESSION_SEND_EX.increment();
return Err(e);
}
Ok(())
}

0 comments on commit e3e79ab

Please sign in to comment.