diff --git a/mitmproxy-linux/README.md b/mitmproxy-linux/README.md index 39c583ac..c70d2d34 100644 --- a/mitmproxy-linux/README.md +++ b/mitmproxy-linux/README.md @@ -16,7 +16,8 @@ This package requires the following software to build (via https://aya-rs.dev/bo 1. Install build dependencies (see above). 2. Install mitmproxy_linux as editable: `pip install -e .` -3. Run something along the lines of `mitmdump --mode local:curl`. +3. Remove `$VIRTUAL_ENV/bin/mitmproxy-linux-redirector` +4. Run something along the lines of `mitmdump --mode local:curl`. You should see a `Development mode: Compiling mitmproxy-linux-redirector...` message. diff --git a/mitmproxy-linux/src/main2.rs b/mitmproxy-linux/src/main2.rs index 8bb407e6..b1fb09f7 100644 --- a/mitmproxy-linux/src/main2.rs +++ b/mitmproxy-linux/src/main2.rs @@ -109,18 +109,18 @@ async fn main() -> anyhow::Result<()> { std::process::exit(0); }); - let mut ipc_buf = BytesMut::with_capacity(IPC_BUF_SIZE); + let mut ipc_buf = Vec::with_capacity(IPC_BUF_SIZE); let mut dev_buf = BytesMut::with_capacity(IPC_BUF_SIZE); loop { + ipc_buf.clear(); select! { r = ipc.recv_buf(&mut ipc_buf) => { match r { Ok(len) if len > 0 => { - let Ok(FromProxy { message: Some(message)}) = FromProxy::decode(&mut ipc_buf) else { + let Ok(FromProxy { message: Some(message)}) = FromProxy::decode(ipc_buf.as_slice()) else { return Err(anyhow!("Received invalid IPC message: {:?}", &ipc_buf[..len])); }; - assert_eq!(ipc_buf.len(), 0); // debug!("Received IPC message: {message:?}"); match message { @@ -161,10 +161,12 @@ async fn main() -> anyhow::Result<()> { }; packet.encode(&mut ipc_buf)?; - let encoded = ipc_buf.split(); - // debug!("Sending packet to proxy: {} {:?}", encoded.len(), &encoded); - ipc.send(&encoded).await?; + ipc.send(ipc_buf.as_slice()).await?; + + // Reclaim space in dev_buf. + drop(packet); + assert!(dev_buf.try_reclaim(IPC_BUF_SIZE)); }, } } diff --git a/src/packet_sources/macos.rs b/src/packet_sources/macos.rs index 8a2947d6..74a6b826 100644 --- a/src/packet_sources/macos.rs +++ b/src/packet_sources/macos.rs @@ -152,11 +152,8 @@ impl PacketSourceTask for MacOsTask { }, // pipe through changes to the intercept list Some(conf) = self.conf_rx.recv() => { - let msg = ipc::InterceptConf::from(conf); - let len = msg.encoded_len(); - let mut buf = BytesMut::with_capacity(len); - msg.encode(&mut buf)?; - control_channel.send(buf.freeze()).await.context("Failed to write to control channel")?; + let msg = ipc::InterceptConf::from(conf).encode_to_vec(); + control_channel.send(Bytes::from(msg)).await.context("Failed to write to control channel")?; }, } } @@ -191,12 +188,12 @@ impl ConnectionTask { .read_u32() .await .context("Failed to read handshake.")? as usize; - let mut buf = BytesMut::zeroed(len); + let mut buf = vec![0; len]; self.stream .read_exact(&mut buf) .await .context("Failed to read handshake contents.")?; - NewFlow::decode(&buf[..]).context("Invalid handshake IPC")? + NewFlow::decode(buf.as_slice()).context("Invalid handshake IPC")? }; match new_flow { diff --git a/src/packet_sources/mod.rs b/src/packet_sources/mod.rs index 45b71679..53aea639 100755 --- a/src/packet_sources/mod.rs +++ b/src/packet_sources/mod.rs @@ -6,7 +6,7 @@ use crate::messages::{ use crate::network::add_network_layer; use crate::{ipc, shutdown, MAX_PACKET_SIZE}; use anyhow::{anyhow, Context, Result}; -use prost::bytes::{Bytes, BytesMut}; +use prost::bytes::Bytes; use prost::Message; use std::future::Future; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; @@ -53,11 +53,12 @@ async fn forward_packets( mut conf_rx: UnboundedReceiver, shutdown: shutdown::Receiver, ) -> Result<()> { - let mut buf = BytesMut::with_capacity(IPC_BUF_SIZE); + let mut buf = Vec::with_capacity(IPC_BUF_SIZE); let (mut network_task_handle, net_tx, mut net_rx) = add_network_layer(transport_events_tx, transport_commands_rx, shutdown); loop { + buf.clear(); tokio::select! { // Monitor the network task for errors or planned shutdown. // This way we implicitly monitor the shutdown channel. @@ -68,9 +69,7 @@ async fn forward_packets( message: Some(ipc::from_proxy::Message::InterceptConf(conf.into())), }; msg.encode(&mut buf)?; - - // debug!("Sending IPC message to redirector: {} {:?}", buf.len(), buf); - channel.write_all_buf(&mut buf).await.context("failed to propagate interception config update")?; + channel.write_all(&buf).await.context("failed to propagate interception config update")?; }, // read packets from the IPC pipe into our network stack. _ = channel.read_buf(&mut buf) => { @@ -85,10 +84,9 @@ async fn forward_packets( return Err(anyhow!("redirect daemon exited prematurely.")); } - let Ok(PacketWithMeta { data, tunnel_info}) = PacketWithMeta::decode(&mut buf) else { + let Ok(PacketWithMeta { data, tunnel_info}) = PacketWithMeta::decode(buf.as_slice()) else { return Err(anyhow!("Received invalid IPC message from redirector: {:?}", &buf)); }; - assert!(buf.is_empty()); // TODO: Use Bytes in SmolPacket to avoid copy let data = data.to_vec(); @@ -121,10 +119,9 @@ async fn forward_packets( match e { NetworkCommand::SendPacket(packet) => { let packet = ipc::FromProxy { message: Some(ipc::from_proxy::Message::Packet( ipc::Packet { data: Bytes::from(packet.into_inner()) }))}; - assert!(buf.is_empty()); packet.encode(&mut buf)?; // debug!("Sending packet: {} {:?}", buf.len(), &packet.message.as_ref().unwrap()); - channel.write_all_buf(&mut buf).await.context("failed to send packet")?; + channel.write_all(&buf).await.context("failed to send packet")?; } } }