Skip to content

Commit

Permalink
Always reclaim buffer space (#229)
Browse files Browse the repository at this point in the history
* simplify some buffer usage with `Vec<u8>`, fix a bug where we didn't reclaim capacity

* fixup
  • Loading branch information
mhils authored Feb 17, 2025
1 parent 49e5d6c commit eb29a8a
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 23 deletions.
3 changes: 2 additions & 1 deletion mitmproxy-linux/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ This package requires the following software to build (via https://aya-rs.dev/bo

1. Install build dependencies (see above).
2. Install mitmproxy_linux as editable: `pip install -e .`
3. Run something along the lines of `mitmdump --mode local:curl`.
3. Remove `$VIRTUAL_ENV/bin/mitmproxy-linux-redirector`
4. Run something along the lines of `mitmdump --mode local:curl`.
You should see a `Development mode: Compiling mitmproxy-linux-redirector...` message.


Expand Down
14 changes: 8 additions & 6 deletions mitmproxy-linux/src/main2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,18 @@ async fn main() -> anyhow::Result<()> {
std::process::exit(0);
});

let mut ipc_buf = BytesMut::with_capacity(IPC_BUF_SIZE);
let mut ipc_buf = Vec::with_capacity(IPC_BUF_SIZE);
let mut dev_buf = BytesMut::with_capacity(IPC_BUF_SIZE);

loop {
ipc_buf.clear();
select! {
r = ipc.recv_buf(&mut ipc_buf) => {
match r {
Ok(len) if len > 0 => {
let Ok(FromProxy { message: Some(message)}) = FromProxy::decode(&mut ipc_buf) else {
let Ok(FromProxy { message: Some(message)}) = FromProxy::decode(ipc_buf.as_slice()) else {
return Err(anyhow!("Received invalid IPC message: {:?}", &ipc_buf[..len]));
};
assert_eq!(ipc_buf.len(), 0);
// debug!("Received IPC message: {message:?}");

match message {
Expand Down Expand Up @@ -161,10 +161,12 @@ async fn main() -> anyhow::Result<()> {
};

packet.encode(&mut ipc_buf)?;
let encoded = ipc_buf.split();

// debug!("Sending packet to proxy: {} {:?}", encoded.len(), &encoded);
ipc.send(&encoded).await?;
ipc.send(ipc_buf.as_slice()).await?;

// Reclaim space in dev_buf.
drop(packet);
assert!(dev_buf.try_reclaim(IPC_BUF_SIZE));
},
}
}
Expand Down
11 changes: 4 additions & 7 deletions src/packet_sources/macos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,8 @@ impl PacketSourceTask for MacOsTask {
},
// pipe through changes to the intercept list
Some(conf) = self.conf_rx.recv() => {
let msg = ipc::InterceptConf::from(conf);
let len = msg.encoded_len();
let mut buf = BytesMut::with_capacity(len);
msg.encode(&mut buf)?;
control_channel.send(buf.freeze()).await.context("Failed to write to control channel")?;
let msg = ipc::InterceptConf::from(conf).encode_to_vec();
control_channel.send(Bytes::from(msg)).await.context("Failed to write to control channel")?;
},
}
}
Expand Down Expand Up @@ -191,12 +188,12 @@ impl ConnectionTask {
.read_u32()
.await
.context("Failed to read handshake.")? as usize;
let mut buf = BytesMut::zeroed(len);
let mut buf = vec![0; len];
self.stream
.read_exact(&mut buf)
.await
.context("Failed to read handshake contents.")?;
NewFlow::decode(&buf[..]).context("Invalid handshake IPC")?
NewFlow::decode(buf.as_slice()).context("Invalid handshake IPC")?
};

match new_flow {
Expand Down
15 changes: 6 additions & 9 deletions src/packet_sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::messages::{
use crate::network::add_network_layer;
use crate::{ipc, shutdown, MAX_PACKET_SIZE};
use anyhow::{anyhow, Context, Result};
use prost::bytes::{Bytes, BytesMut};
use prost::bytes::Bytes;
use prost::Message;
use std::future::Future;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
Expand Down Expand Up @@ -53,11 +53,12 @@ async fn forward_packets<T: AsyncRead + AsyncWrite + Unpin>(
mut conf_rx: UnboundedReceiver<InterceptConf>,
shutdown: shutdown::Receiver,
) -> Result<()> {
let mut buf = BytesMut::with_capacity(IPC_BUF_SIZE);
let mut buf = Vec::with_capacity(IPC_BUF_SIZE);
let (mut network_task_handle, net_tx, mut net_rx) =
add_network_layer(transport_events_tx, transport_commands_rx, shutdown);

loop {
buf.clear();
tokio::select! {
// Monitor the network task for errors or planned shutdown.
// This way we implicitly monitor the shutdown channel.
Expand All @@ -68,9 +69,7 @@ async fn forward_packets<T: AsyncRead + AsyncWrite + Unpin>(
message: Some(ipc::from_proxy::Message::InterceptConf(conf.into())),
};
msg.encode(&mut buf)?;

// debug!("Sending IPC message to redirector: {} {:?}", buf.len(), buf);
channel.write_all_buf(&mut buf).await.context("failed to propagate interception config update")?;
channel.write_all(&buf).await.context("failed to propagate interception config update")?;
},
// read packets from the IPC pipe into our network stack.
_ = channel.read_buf(&mut buf) => {
Expand All @@ -85,10 +84,9 @@ async fn forward_packets<T: AsyncRead + AsyncWrite + Unpin>(
return Err(anyhow!("redirect daemon exited prematurely."));
}

let Ok(PacketWithMeta { data, tunnel_info}) = PacketWithMeta::decode(&mut buf) else {
let Ok(PacketWithMeta { data, tunnel_info}) = PacketWithMeta::decode(buf.as_slice()) else {
return Err(anyhow!("Received invalid IPC message from redirector: {:?}", &buf));
};
assert!(buf.is_empty());

// TODO: Use Bytes in SmolPacket to avoid copy
let data = data.to_vec();
Expand Down Expand Up @@ -121,10 +119,9 @@ async fn forward_packets<T: AsyncRead + AsyncWrite + Unpin>(
match e {
NetworkCommand::SendPacket(packet) => {
let packet = ipc::FromProxy { message: Some(ipc::from_proxy::Message::Packet( ipc::Packet { data: Bytes::from(packet.into_inner()) }))};
assert!(buf.is_empty());
packet.encode(&mut buf)?;
// debug!("Sending packet: {} {:?}", buf.len(), &packet.message.as_ref().unwrap());
channel.write_all_buf(&mut buf).await.context("failed to send packet")?;
channel.write_all(&buf).await.context("failed to send packet")?;
}
}
}
Expand Down

0 comments on commit eb29a8a

Please sign in to comment.