Skip to content

Commit

Permalink
chore: rebase fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
merklefruit committed Jan 24, 2024
1 parent 1436582 commit 0e74e5a
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 11 deletions.
18 changes: 9 additions & 9 deletions msg-socket/src/req/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
use tokio::sync::{mpsc, oneshot};
use tokio_util::codec::Framed;

use crate::{req::SocketState, ReqMessage};
use crate::req::SocketState;

use super::{Command, ReqError, ReqOptions};
use msg_wire::{
Expand Down Expand Up @@ -197,28 +197,28 @@ where

// Check for outgoing messages from the socket handle
match this.from_socket.poll_recv(cx) {
Poll::Ready(Some(Command::Send { message, response })) => {
// Queue the message for sending
Poll::Ready(Some(Command::Send {
mut message,
response,
})) => {
let start = std::time::Instant::now();

let mut msg = ReqMessage::new(message);

let len_before = msg.payload().len();
let len_before = message.payload().len();
if len_before > this.options.min_compress_size {
if let Some(ref compressor) = this.compressor {
if let Err(e) = msg.compress(compressor.as_ref()) {
if let Err(e) = message.compress(compressor.as_ref()) {
tracing::error!("Failed to compress message: {:?}", e);
}

tracing::debug!(
"Compressed message from {} to {} bytes",
len_before,
msg.payload().len()
message.payload().len()
);
}
}

let msg = msg.into_wire(this.id_counter);
let msg = message.into_wire(this.id_counter);
let msg_id = msg.id();
this.id_counter = this.id_counter.wrapping_add(1);
this.egress_queue.push_back(msg);
Expand Down
2 changes: 1 addition & 1 deletion msg-socket/src/req/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub enum ReqError {

pub enum Command {
Send {
message: Bytes,
message: ReqMessage,
response: oneshot::Sender<Result<Bytes, ReqError>>,
},
}
Expand Down
5 changes: 4 additions & 1 deletion msg-socket/src/req/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use msg_wire::{auth, reqrep};

use super::{Command, ReqDriver, ReqError, ReqOptions, DEFAULT_BUFFER_SIZE};
use crate::backoff::ExponentialBackoff;
use crate::ReqMessage;
use crate::{req::stats::SocketStats, req::SocketState};

/// The request socket.
Expand Down Expand Up @@ -62,11 +63,13 @@ where
pub async fn request(&self, message: Bytes) -> Result<Bytes, ReqError> {
let (response_tx, response_rx) = oneshot::channel();

let msg = ReqMessage::new(message);

self.to_driver
.as_ref()
.ok_or(ReqError::SocketClosed)?
.send(Command::Send {
message,
message: msg,
response: response_tx,
})
.await
Expand Down

0 comments on commit 0e74e5a

Please sign in to comment.