Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always reclaim buffer space #229

Merged
merged 2 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion mitmproxy-linux/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ This package requires the following software to build (via https://aya-rs.dev/bo

1. Install build dependencies (see above).
2. Install mitmproxy_linux as editable: `pip install -e .`
3. Run something along the lines of `mitmdump --mode local:curl`.
3. Remove `$VIRTUAL_ENV/bin/mitmproxy-linux-redirector`
4. Run something along the lines of `mitmdump --mode local:curl`.
You should see a `Development mode: Compiling mitmproxy-linux-redirector...` message.


Expand Down
14 changes: 8 additions & 6 deletions mitmproxy-linux/src/main2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,18 @@ async fn main() -> anyhow::Result<()> {
std::process::exit(0);
});

let mut ipc_buf = BytesMut::with_capacity(IPC_BUF_SIZE);
let mut ipc_buf = Vec::with_capacity(IPC_BUF_SIZE);
let mut dev_buf = BytesMut::with_capacity(IPC_BUF_SIZE);

loop {
ipc_buf.clear();
select! {
r = ipc.recv_buf(&mut ipc_buf) => {
match r {
Ok(len) if len > 0 => {
let Ok(FromProxy { message: Some(message)}) = FromProxy::decode(&mut ipc_buf) else {
let Ok(FromProxy { message: Some(message)}) = FromProxy::decode(ipc_buf.as_slice()) else {
return Err(anyhow!("Received invalid IPC message: {:?}", &ipc_buf[..len]));
};
assert_eq!(ipc_buf.len(), 0);
// debug!("Received IPC message: {message:?}");

match message {
Expand Down Expand Up @@ -161,10 +161,12 @@ async fn main() -> anyhow::Result<()> {
};

packet.encode(&mut ipc_buf)?;
let encoded = ipc_buf.split();

// debug!("Sending packet to proxy: {} {:?}", encoded.len(), &encoded);
ipc.send(&encoded).await?;
ipc.send(ipc_buf.as_slice()).await?;

// Reclaim space in dev_buf.
drop(packet);
assert!(dev_buf.try_reclaim(IPC_BUF_SIZE));
},
}
}
Expand Down
11 changes: 4 additions & 7 deletions src/packet_sources/macos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,8 @@ impl PacketSourceTask for MacOsTask {
},
// pipe through changes to the intercept list
Some(conf) = self.conf_rx.recv() => {
let msg = ipc::InterceptConf::from(conf);
let len = msg.encoded_len();
let mut buf = BytesMut::with_capacity(len);
msg.encode(&mut buf)?;
control_channel.send(buf.freeze()).await.context("Failed to write to control channel")?;
let msg = ipc::InterceptConf::from(conf).encode_to_vec();
control_channel.send(Bytes::from(msg)).await.context("Failed to write to control channel")?;
},
}
}
Expand Down Expand Up @@ -191,12 +188,12 @@ impl ConnectionTask {
.read_u32()
.await
.context("Failed to read handshake.")? as usize;
let mut buf = BytesMut::zeroed(len);
let mut buf = vec![0; len];
self.stream
.read_exact(&mut buf)
.await
.context("Failed to read handshake contents.")?;
NewFlow::decode(&buf[..]).context("Invalid handshake IPC")?
NewFlow::decode(buf.as_slice()).context("Invalid handshake IPC")?
};

match new_flow {
Expand Down
15 changes: 6 additions & 9 deletions src/packet_sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::messages::{
use crate::network::add_network_layer;
use crate::{ipc, shutdown, MAX_PACKET_SIZE};
use anyhow::{anyhow, Context, Result};
use prost::bytes::{Bytes, BytesMut};
use prost::bytes::Bytes;
use prost::Message;
use std::future::Future;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
Expand Down Expand Up @@ -53,11 +53,12 @@ async fn forward_packets<T: AsyncRead + AsyncWrite + Unpin>(
mut conf_rx: UnboundedReceiver<InterceptConf>,
shutdown: shutdown::Receiver,
) -> Result<()> {
let mut buf = BytesMut::with_capacity(IPC_BUF_SIZE);
let mut buf = Vec::with_capacity(IPC_BUF_SIZE);
let (mut network_task_handle, net_tx, mut net_rx) =
add_network_layer(transport_events_tx, transport_commands_rx, shutdown);

loop {
buf.clear();
tokio::select! {
// Monitor the network task for errors or planned shutdown.
// This way we implicitly monitor the shutdown channel.
Expand All @@ -68,9 +69,7 @@ async fn forward_packets<T: AsyncRead + AsyncWrite + Unpin>(
message: Some(ipc::from_proxy::Message::InterceptConf(conf.into())),
};
msg.encode(&mut buf)?;

// debug!("Sending IPC message to redirector: {} {:?}", buf.len(), buf);
channel.write_all_buf(&mut buf).await.context("failed to propagate interception config update")?;
channel.write_all(&buf).await.context("failed to propagate interception config update")?;
},
// read packets from the IPC pipe into our network stack.
_ = channel.read_buf(&mut buf) => {
Expand All @@ -85,10 +84,9 @@ async fn forward_packets<T: AsyncRead + AsyncWrite + Unpin>(
return Err(anyhow!("redirect daemon exited prematurely."));
}

let Ok(PacketWithMeta { data, tunnel_info}) = PacketWithMeta::decode(&mut buf) else {
let Ok(PacketWithMeta { data, tunnel_info}) = PacketWithMeta::decode(buf.as_slice()) else {
return Err(anyhow!("Received invalid IPC message from redirector: {:?}", &buf));
};
assert!(buf.is_empty());

// TODO: Use Bytes in SmolPacket to avoid copy
let data = data.to_vec();
Expand Down Expand Up @@ -121,10 +119,9 @@ async fn forward_packets<T: AsyncRead + AsyncWrite + Unpin>(
match e {
NetworkCommand::SendPacket(packet) => {
let packet = ipc::FromProxy { message: Some(ipc::from_proxy::Message::Packet( ipc::Packet { data: Bytes::from(packet.into_inner()) }))};
assert!(buf.is_empty());
packet.encode(&mut buf)?;
// debug!("Sending packet: {} {:?}", buf.len(), &packet.message.as_ref().unwrap());
channel.write_all_buf(&mut buf).await.context("failed to send packet")?;
channel.write_all(&buf).await.context("failed to send packet")?;
}
}
}
Expand Down