From ca875e8294a4fdfe377ed0291e5aef5ee5bec405 Mon Sep 17 00:00:00 2001 From: iceboy Date: Sat, 30 Mar 2024 23:07:59 -0700 Subject: [PATCH] net/proxy support close in stream and datagram --- net/proxy/datagram.h | 4 +++- net/proxy/misc/random-handler.cc | 8 +------- net/proxy/misc/zero-handler.cc | 8 +------- net/proxy/shadowsocks/connector.cc | 7 ++----- net/proxy/shadowsocks/handler.cc | 18 ++++-------------- net/proxy/socks/handler.cc | 18 ++++-------------- net/proxy/stream.h | 4 +++- net/proxy/system/tcp-socket-stream.cc | 7 ++++++- net/proxy/system/tcp-socket-stream.h | 7 +++---- net/proxy/system/udp-socket-datagram.cc | 5 +++++ net/proxy/system/udp-socket-datagram.h | 7 +++---- 11 files changed, 35 insertions(+), 58 deletions(-) diff --git a/net/proxy/datagram.h b/net/proxy/datagram.h index 1d9ed35..b91984a 100644 --- a/net/proxy/datagram.h +++ b/net/proxy/datagram.h @@ -17,7 +17,6 @@ class Datagram { using executor_type = any_io_executor; virtual ~Datagram() = default; - virtual any_io_executor get_executor() = 0; virtual void async_receive_from( absl::Span buffers, @@ -29,6 +28,9 @@ class Datagram { const udp::endpoint &endpoint, absl::AnyInvocable callback) = 0; + virtual any_io_executor get_executor() = 0; + virtual void close() = 0; + template void async_receive_from( const BuffersT &buffers, diff --git a/net/proxy/misc/random-handler.cc b/net/proxy/misc/random-handler.cc index 87eb29d..b355994 100644 --- a/net/proxy/misc/random-handler.cc +++ b/net/proxy/misc/random-handler.cc @@ -25,7 +25,7 @@ class StreamConnection : public boost::intrusive_ref_counter< private: void read(); void write(); - void close() { stream_.reset(); } + void close() { stream_->close(); } std::unique_ptr stream_; absl::FixedArray read_buffer_; @@ -58,9 +58,6 @@ StreamConnection::StreamConnection(std::unique_ptr stream) } void StreamConnection::read() { - if (!stream_) { - return; - } stream_->async_read_some( buffer(read_buffer_.data(), read_buffer_.size()), [connection = boost::intrusive_ptr(this)]( @@ -74,9 +71,6 @@ void StreamConnection::read() { } void StreamConnection::write() { - if (!stream_) { - return; - } stream_->async_write_some( const_buffer(write_buffer_.data(), write_buffer_.size()), [connection = boost::intrusive_ptr(this)]( diff --git a/net/proxy/misc/zero-handler.cc b/net/proxy/misc/zero-handler.cc index 10b30a7..2f30f40 100644 --- a/net/proxy/misc/zero-handler.cc +++ b/net/proxy/misc/zero-handler.cc @@ -24,7 +24,7 @@ class StreamConnection : public boost::intrusive_ref_counter< private: void read(); void write(); - void close() { stream_.reset(); } + void close() { stream_->close(); } std::unique_ptr stream_; absl::FixedArray read_buffer_; @@ -55,9 +55,6 @@ StreamConnection::StreamConnection(std::unique_ptr stream) write_buffer_(8192) {} void StreamConnection::read() { - if (!stream_) { - return; - } stream_->async_read_some( buffer(read_buffer_.data(), read_buffer_.size()), [connection = boost::intrusive_ptr(this)]( @@ -71,9 +68,6 @@ void StreamConnection::read() { } void StreamConnection::write() { - if (!stream_) { - return; - } stream_->async_write_some( const_buffer(write_buffer_.data(), write_buffer_.size()), [connection = boost::intrusive_ptr(this)]( diff --git a/net/proxy/shadowsocks/connector.cc b/net/proxy/shadowsocks/connector.cc index 6b57069..411f78c 100644 --- a/net/proxy/shadowsocks/connector.cc +++ b/net/proxy/shadowsocks/connector.cc @@ -43,7 +43,8 @@ class Connector::TcpStream : public proxy::Stream { absl::Span buffers, absl::AnyInvocable callback) override; - any_io_executor get_executor() override; + any_io_executor get_executor() override { return connector_.executor_; } + void close() override { base_stream_->close(); } private: void connect(absl::AnyInvocable callback); @@ -455,10 +456,6 @@ void Connector::TcpStream::async_write_some( }); } -any_io_executor Connector::TcpStream::get_executor() { - return connector_.executor_; -} - } // namespace shadowsocks } // namespace proxy } // namespace net diff --git a/net/proxy/shadowsocks/handler.cc b/net/proxy/shadowsocks/handler.cc index fa932a4..7574a7d 100644 --- a/net/proxy/shadowsocks/handler.cc +++ b/net/proxy/shadowsocks/handler.cc @@ -85,9 +85,6 @@ Handler::TcpConnection::TcpConnection( write_header_(handler_.pre_shared_key_.method().is_spec_2022()) {} void Handler::TcpConnection::forward_read() { - if (!stream_) { - return; - } BufferSpan read_buffer = decryptor_.buffer(); stream_->async_read_some( buffer(read_buffer.data(), read_buffer.size()), @@ -342,9 +339,6 @@ void Handler::TcpConnection::forward_parse_host(size_t header_length) { } void Handler::TcpConnection::forward_write() { - if (!remote_stream_) { - return; - } async_write( *remote_stream_, buffer(decryptor_.pop_buffer(read_length_), read_length_), @@ -361,9 +355,6 @@ void Handler::TcpConnection::forward_write() { } void Handler::TcpConnection::backward_read() { - if (!remote_stream_) { - return; - } remote_stream_->async_read_some( buffer(backward_read_buffer_.data(), backward_read_buffer_.size()), [connection = boost::intrusive_ptr(this)]( @@ -378,9 +369,6 @@ void Handler::TcpConnection::backward_read() { } void Handler::TcpConnection::backward_write() { - if (!stream_) { - return; - } ConstBufferSpan read_buffer( backward_read_buffer_.data(), backward_read_size_); do { @@ -420,8 +408,10 @@ void Handler::TcpConnection::backward_write() { } void Handler::TcpConnection::close() { - remote_stream_.reset(); - stream_.reset(); + if (remote_stream_) { + remote_stream_->close(); + } + stream_->close(); } } // namespace shadowsocks diff --git a/net/proxy/socks/handler.cc b/net/proxy/socks/handler.cc index 8e472f2..fca0948 100644 --- a/net/proxy/socks/handler.cc +++ b/net/proxy/socks/handler.cc @@ -76,9 +76,6 @@ Handler::TcpConnection::TcpConnection( backward_buffer_(4096) {} void Handler::TcpConnection::forward_read() { - if (!stream_) { - return; - } stream_->async_read_some( buffer( &forward_buffer_[forward_size_], @@ -246,9 +243,6 @@ void Handler::TcpConnection::connect_host(ConstBufferSpan buffer) { } void Handler::TcpConnection::forward_write() { - if (!remote_stream_) { - return; - } async_write( *remote_stream_, buffer(forward_buffer_.data(), forward_size_), @@ -279,9 +273,6 @@ void Handler::TcpConnection::reply() { } void Handler::TcpConnection::backward_write() { - if (!stream_) { - return; - } async_write( *stream_, buffer(backward_buffer_.data(), backward_size_), @@ -309,9 +300,6 @@ void Handler::TcpConnection::backward_dispatch() { } void Handler::TcpConnection::backward_read() { - if (!remote_stream_) { - return; - } remote_stream_->async_read_some( buffer( &backward_buffer_[backward_size_], @@ -328,8 +316,10 @@ void Handler::TcpConnection::backward_read() { } void Handler::TcpConnection::close() { - remote_stream_.reset(); - stream_.reset(); + if (remote_stream_) { + remote_stream_->close(); + } + stream_->close(); } } // namespace socks diff --git a/net/proxy/stream.h b/net/proxy/stream.h index 022e8de..e7ec186 100644 --- a/net/proxy/stream.h +++ b/net/proxy/stream.h @@ -17,7 +17,6 @@ class Stream { using executor_type = any_io_executor; virtual ~Stream() = default; - virtual any_io_executor get_executor() = 0; virtual void async_read_some( absl::Span buffers, @@ -27,6 +26,9 @@ class Stream { absl::Span buffers, absl::AnyInvocable callback) = 0; + virtual any_io_executor get_executor() = 0; + virtual void close() = 0; + template void async_read_some( const BuffersT &buffers, diff --git a/net/proxy/system/tcp-socket-stream.cc b/net/proxy/system/tcp-socket-stream.cc index 59d683e..007e7e9 100644 --- a/net/proxy/system/tcp-socket-stream.cc +++ b/net/proxy/system/tcp-socket-stream.cc @@ -8,7 +8,7 @@ namespace system { TcpSocketStream::TcpSocketStream(tcp::socket socket, TimerList &timer_list) : socket_(std::move(socket)), - timer_(timer_list, [this]() { socket_.close(); }) {} + timer_(timer_list, [this]() { close(); }) {} void TcpSocketStream::async_read_some( absl::Span buffers, @@ -28,6 +28,11 @@ void TcpSocketStream::async_write_some( timer_.update(); } +void TcpSocketStream::close() { + boost::system::error_code ec; + socket_.close(ec); +} + } // namespace proxy } // namespace system } // namespace net diff --git a/net/proxy/system/tcp-socket-stream.h b/net/proxy/system/tcp-socket-stream.h index c49fb5e..d839ab9 100644 --- a/net/proxy/system/tcp-socket-stream.h +++ b/net/proxy/system/tcp-socket-stream.h @@ -16,10 +16,6 @@ class TcpSocketStream : public Stream { TcpSocketStream(const TcpSocketStream &) = delete; TcpSocketStream &operator=(const TcpSocketStream &) = delete; - any_io_executor get_executor() override { - return socket_.get_executor(); - } - void async_read_some( absl::Span buffers, absl::AnyInvocable callback) override; @@ -28,6 +24,9 @@ class TcpSocketStream : public Stream { absl::Span buffers, absl::AnyInvocable callback) override; + any_io_executor get_executor() override { return socket_.get_executor(); } + void close() override; + using Stream::async_read_some; using Stream::async_write_some; diff --git a/net/proxy/system/udp-socket-datagram.cc b/net/proxy/system/udp-socket-datagram.cc index 3859ccf..c26c467 100644 --- a/net/proxy/system/udp-socket-datagram.cc +++ b/net/proxy/system/udp-socket-datagram.cc @@ -29,6 +29,11 @@ void UdpSocketDatagram::async_send_to( std::move(callback)); } +void UdpSocketDatagram::close() { + boost::system::error_code ec; + socket_.close(ec); +} + } // namespace proxy } // namespace system } // namespace net diff --git a/net/proxy/system/udp-socket-datagram.h b/net/proxy/system/udp-socket-datagram.h index 6a81b5a..ed74bff 100644 --- a/net/proxy/system/udp-socket-datagram.h +++ b/net/proxy/system/udp-socket-datagram.h @@ -15,10 +15,6 @@ class UdpSocketDatagram : public Datagram { UdpSocketDatagram(const UdpSocketDatagram &) = delete; UdpSocketDatagram &operator=(const UdpSocketDatagram &) = delete; - any_io_executor get_executor() override { - return socket_.get_executor(); - } - void async_receive_from( absl::Span buffers, udp::endpoint &endpoint, @@ -29,6 +25,9 @@ class UdpSocketDatagram : public Datagram { const udp::endpoint &endpoint, absl::AnyInvocable callback) override; + any_io_executor get_executor() override { return socket_.get_executor(); } + void close() override; + using Datagram::async_receive_from; using Datagram::async_send_to;