Skip to content

Commit

Permalink
fixed the leak (#30)
Browse files Browse the repository at this point in the history
* fixed the leak

* version

* metrics
  • Loading branch information
janlegner authored Jun 3, 2023
1 parent 264a7e1 commit 930b351
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 34 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ warp = "0.3"
base64 = "0.13.0"
bincode = "1.3.3"
enum_dispatch = "0.3.8"
memory-stats = "1.1.0"

[build-dependencies]
tonic-build = { version = "0.8.0", features = ["prost"] }
2 changes: 1 addition & 1 deletion client/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use log::info;
use solana_sdk::signature::read_keypair_file;
use structopt::StructOpt;

pub const VERSION: &str = "rust-0.0.0-alpha";
pub const VERSION: &str = "rust-0.0.7-beta";

#[derive(Debug, StructOpt)]
struct Params {
Expand Down
18 changes: 17 additions & 1 deletion client/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::grpc_client::pb::{self, RequestMessageEnvelope};
use lazy_static::lazy_static;
use log::error;
use prometheus::{register_int_counter, Encoder, IntCounter, TextEncoder};
use memory_stats::memory_stats;
use prometheus::{
register_int_counter, register_int_gauge, Encoder, IntCounter, IntGauge, TextEncoder,
};
use std::time::Duration;
use tokio::time::sleep;

Expand All @@ -26,27 +29,40 @@ lazy_static! {
"How many transactions failed on the client side"
)
.unwrap();
pub static ref QUIC_FORWARDER_PERMITS_USED_MAX: IntGauge = register_int_gauge!(
"quic_forwarder_available_permits_max",
"QUIC concurrent tasks created at any single moment since the last metric feed to the server"
)
.unwrap();
}

pub fn observe_quic_forwarded_available_permits(permits_used: usize) {
QUIC_FORWARDER_PERMITS_USED_MAX.set(QUIC_FORWARDER_PERMITS_USED_MAX.get().max(permits_used as i64))
}

fn spawn_feeder() -> tokio::sync::mpsc::Receiver<RequestMessageEnvelope> {
let (metrics_sender, metrics_receiver) =
tokio::sync::mpsc::channel::<RequestMessageEnvelope>(METRICS_BUFFER_SIZE);
tokio::spawn(async move {
loop {
let memory_physical = memory_stats().map_or(0, |usage| usage.physical_mem as u64);
if let Err(err) = metrics_sender
.send(pb::RequestMessageEnvelope {
metrics: Some(pb::Metrics {
tx_received: TX_RECEIVED_COUNT.get(),
tx_forward_succeeded: TX_FORWARD_SUCCEEDED_COUNT.get(),
tx_forward_failed: TX_FORWARD_FAILED_COUNT.get(),
version: crate::VERSION.to_string(),
quic_forwarder_permits_used_max: QUIC_FORWARDER_PERMITS_USED_MAX.get() as u64,
memory_physical,
}),
..Default::default()
})
.await
{
error!("Failed to feed client metrics: {}", err);
}
QUIC_FORWARDER_PERMITS_USED_MAX.set(0);
sleep(Duration::from_secs(METRICS_SYNC_TIME_IN_S)).await;
}
});
Expand Down
50 changes: 18 additions & 32 deletions client/quic_forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,16 @@ use solana_client::{
};
use solana_sdk::{signature::Keypair, transport::TransportError};
use std::{net::IpAddr, sync::Arc};
use tokio::{
sync::Semaphore,
time::{timeout, Duration},
};

const SEND_TRANSACTION_TIMEOUT_MS: u64 = 10000;
use tokio::sync::Semaphore;

pub struct QuicForwarder {
max_permits: usize,
throttle_parallel: Arc<Semaphore>,
connection_cache: Arc<ConnectionCache>,
}

impl QuicForwarder {
pub fn new(
identity: Option<Keypair>,
tpu_addr: Option<IpAddr>,
throttle_parallel: usize,
) -> Self {
pub fn new(identity: Option<Keypair>, tpu_addr: Option<IpAddr>, max_permits: usize) -> Self {
let mut connection_cache = ConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE);
if let (Some(identity), Some(tpu_addr)) = (identity, tpu_addr) {
if let Err(err) = connection_cache.update_client_certificate(&identity, tpu_addr) {
Expand All @@ -36,7 +28,8 @@ impl QuicForwarder {
}

Self {
throttle_parallel: Arc::new(Semaphore::new(throttle_parallel)),
max_permits,
throttle_parallel: Arc::new(Semaphore::new(max_permits)),
connection_cache: Arc::new(connection_cache),
}
}
Expand All @@ -45,40 +38,33 @@ impl QuicForwarder {
let tpu = tpu.clone();
let throttle_parallel = self.throttle_parallel.clone();
let connection_cache = self.connection_cache.clone();
let max_permits = self.max_permits;

tokio::spawn(async move {
metrics::observe_quic_forwarded_available_permits(
max_permits - throttle_parallel.available_permits(),
);

let tpu = tpu.parse().unwrap();
let wire_transaction = decode(transaction.data).unwrap();

let throttle_permit = throttle_parallel.acquire_owned().await.unwrap();

info!("Tx {} -> {}", transaction.signature, &tpu);
let conn = connection_cache.get_nonblocking_connection(&tpu);
let request_result = conn.send_wire_transaction(&wire_transaction).await;
Self::handle_send_result(request_result);

let result = timeout(
Duration::from_millis(SEND_TRANSACTION_TIMEOUT_MS),
conn.send_wire_transaction(&wire_transaction),
)
.await;

Self::handle_send_result(result);
drop(throttle_permit);
});
}

fn handle_send_result(result: Result<Result<(), TransportError>, tokio::time::error::Elapsed>) {
match result {
Ok(result) => {
if let Err(err) = result {
error!("Failed to send the transaction: {}", err);
metrics::TX_FORWARD_FAILED_COUNT.inc();
} else {
metrics::TX_FORWARD_SUCCEEDED_COUNT.inc();
}
}
Err(err) => {
error!("Timed out sending transaction {:?}", err);
}
fn handle_send_result(result: Result<(), TransportError>) {
if let Err(err) = result {
error!("Failed to send the transaction: {}", err);
metrics::TX_FORWARD_FAILED_COUNT.inc();
} else {
metrics::TX_FORWARD_SUCCEEDED_COUNT.inc();
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions proto/mtransaction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ message Metrics {
uint64 tx_forward_succeeded = 2;
uint64 tx_forward_failed = 3;
string version = 4;
uint64 quic_forwarder_permits_used_max = 5;
uint64 memory_physical = 6;
}
6 changes: 6 additions & 0 deletions server/grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ fn handle_client_metrics(identity: &String, token: &String, metrics: pb::Metrics
metrics::CLIENT_TX_FORWARD_FAILED
.with_label_values(&[&identity])
.set(metrics.tx_forward_failed as i64);
metrics::CLIENT_QUIC_FORWARDER_PERMITS_USED_MAX
.with_label_values(&[&identity])
.set(metrics.quic_forwarder_permits_used_max as i64);
metrics::CLIENT_MEMORY_PHYSICAL
.with_label_values(&[&identity])
.set(metrics.memory_physical as i64);
}

fn handle_client_pong(identity: &String, pong: pb::Pong, last_ping: &Ping) {
Expand Down
12 changes: 12 additions & 0 deletions server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ lazy_static! {
&["identity"]
)
.unwrap();
pub static ref CLIENT_QUIC_FORWARDER_PERMITS_USED_MAX: IntGaugeVec = register_int_gauge_vec!(
"mtx_client_quic_forwarder_available_permits_max",
"QUIC concurrent tasks created on the client side at any single moment since the last metric feed to the server",
&["identity"]
)
.unwrap();
pub static ref CLIENT_MEMORY_PHYSICAL: IntGaugeVec = register_int_gauge_vec!(
"mtx_client_memory_physical",
"Memory used by the client",
&["identity"]
)
.unwrap();
pub static ref CLIENT_PING_RTT: HistogramVec = register_histogram_vec!(
"mtx_client_ping_rtt",
"Latency to the client based on ping times",
Expand Down

0 comments on commit 930b351

Please sign in to comment.