Skip to content

Commit

Permalink
Make balancer fan out to multiple consumers and multiple copies (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
kronael authored Jun 1, 2024
1 parent 19ad6c9 commit 2cddcea
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 49 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mtransaction"
version = "0.2.2"
version = "0.2.3"
edition = "2021"

[[bin]]
Expand Down
43 changes: 38 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
name = mtransaction

help:
@echo "Usage:"
@echo " build-server builds the dev version of the mtransaction server"
@echo " build compile the package"
@echo " image builds $(name) docker image"
@echo " help show this help"

.PHONY: build-all build-server build-server-release run-server clean run-client run-client-local
.DEFAULT_GOAL := build-all

cert-server:
./scripts/cert-server.bash $(cmd) $(host)
CERT_DIR = certs
JWT_KEY = $(CERT_DIR)/jwtRS256.key

$(CERT_DIR)/ca.cert: $(CERT_DIR)/openssl.server.conf
openssl req -x509 -newkey rsa:4096 -days 3650 -nodes -keyout "$(CERT_DIR)/ca.key" -out "$(CERT_DIR)/ca.cert" -subj "/CN=Marinade"

$(JWT_KEY):
ssh-keygen -t rsa -b 4096 -m PEM -f $@ -N ""
openssl rsa -in $@ -pubout -outform PEM -out $@.pub

cert-client:
./scripts/cert-client.bash $(cmd) $(validator)
Expand Down Expand Up @@ -35,16 +51,33 @@ run-server: build-server
--tls-grpc-ca-cert ./certs/ca.cert \
--jwt-public-key ./certs/jwtRS256.key.pub

run-client-local: build-client
run-server-local: build-server
cargo run --bin mtx-server -- \
--stake-override-identity foo bar \
--stake-override-sol 1000000 2000000 \
--tls-grpc-server-cert ./certs/localhost.cert \
--tls-grpc-server-key ./certs/localhost.key \
--tls-grpc-ca-cert ./certs/ca.cert \

run-client-rpc-devnet: build-client
cargo run --bin mtx-client -- \
--tls-grpc-ca-cert ./certs/ca.cert \
--grpc-urls-file ./client.yml \
--tls-grpc-client-key ./certs/client.$(client).key \
--tls-grpc-client-cert ./certs/client.$(client).cert \
--rpc-url https://api.devnet.solana.com

run-client-rpc-mainnet: build-client
cargo run --bin mtx-client -- \
--tls-grpc-ca-cert ./certs/ca.cert \
--grpc-urls-file ./client-config.yaml \
--grpc-urls-file ./client.yml \
--tls-grpc-client-key ./certs/client.$(client).key \
--tls-grpc-client-cert ./certs/client.$(client).cert \
--rpc-url http://127.0.0.1:8899
--rpc-url `<mainnet-rpc.txt`

run-client-blackhole: build-client
cargo run --bin mtx-client -- \
--grpc-urls-file ./client.yml \
--tls-grpc-ca-cert ./certs/ca.cert \
--tls-grpc-client-key ./certs/client.$(client).key \
--tls-grpc-client-cert ./certs/client.$(client).cert \
Expand Down
4 changes: 2 additions & 2 deletions client/grpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ pub async fn spawn_grpc_client(
)
.await
.map(|v| {
info!("Stream ended from gRPC server: {:?}, {:?}", grpc_host, v);
v
info!("Stream ended from gRPC server: {:?}, {:?}", grpc_host, v);
v
})
}
4 changes: 1 addition & 3 deletions client/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
params.throttle_parallel,
);

let all_tasks: RwLock<HashMap<String, JoinHandle<()>>> =
RwLock::new(HashMap::new());
let all_tasks: RwLock<HashMap<String, JoinHandle<()>>> = RwLock::new(HashMap::new());

build_tasks(
grpc_urls_from_file.clone(),
Expand Down Expand Up @@ -271,7 +270,6 @@ async fn build_tasks(
) {
let mut all_tasks = all_tasks.write().await;
for i in grpc_urls {

let grpc_parsed_url: Uri = i.parse().expect("failed to parse grpc url");

let tls_grpc_ca_cert = params.tls_grpc_ca_cert.clone();
Expand Down
79 changes: 47 additions & 32 deletions server/balancer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::grpc_server::{self, build_tx_message_envelope};
use crate::metrics;
use crate::solana_service::{get_tpu_by_identity, leaders_stream};
use crate::{N_CONSUMERS, N_COPIES, NODES_REFRESH_SECONDS};
use jsonrpc_http_server::*;
use log::{error, info};
use rand::rngs::StdRng;
Expand Down Expand Up @@ -32,6 +33,7 @@ impl Drop for TxConsumer {
}
}

#[derive(Default)]
pub struct Balancer {
pub tx_consumers: HashMap<String, TxConsumer>,
pub stake_weights: HashMap<String, u64>,
Expand All @@ -41,16 +43,6 @@ pub struct Balancer {
}

impl Balancer {
pub fn new() -> Self {
Self {
tx_consumers: Default::default(),
stake_weights: Default::default(),
total_connected_stake: 0,
leaders: Default::default(),
leader_tpus: Default::default(),
}
}

pub fn subscribe(
&mut self,
identity: String,
Expand Down Expand Up @@ -86,39 +78,61 @@ impl Balancer {
}
}

pub fn pick_random_tx_consumer_with_target(&self) -> Option<(&TxConsumer, Vec<String>)> {
let mut rng: StdRng = SeedableRng::from_entropy();
if self.total_connected_stake == 0 {
return None;
}
pub fn sample_consumer_stake_weighted(&self, rng: &mut StdRng) -> Option<&TxConsumer> {
let total_weight = if self.total_connected_stake == 0 {
self.tx_consumers.len() as u64
} else {
self.total_connected_stake
};

let random_stake_point = rng.gen_range(0..self.total_connected_stake);
let random_stake_point = rng.gen_range(0..total_weight);
let mut accumulated_sum = 0;

for (_, tx_consumer) in self.tx_consumers.iter() {
accumulated_sum += tx_consumer.stake;
accumulated_sum += tx_consumer.stake + 1;
if random_stake_point < accumulated_sum {
return Some((tx_consumer, self.leader_tpus.clone()));
return Some(tx_consumer);
}
}
return None;
}

pub fn pick_leader_tx_consumers_with_targets(&self) -> Vec<(&TxConsumer, Vec<String>)> {
self.tx_consumers
pub fn pick_consumers(&self) -> Vec<(&TxConsumer, Vec<String>)> {
// If some of the consumers are leaders, pick them.
// Pick the rest randomly.
let mut consumers: Vec<_> = self
.tx_consumers
.iter()
.flat_map(|(identity, tx_consumer)| match self.leaders.get(identity) {
Some(tpu) => Some((tx_consumer, vec![tpu.clone()])),
_ => None,
})
.collect()
}

pub fn pick_tx_consumers(&self) -> Vec<(&TxConsumer, Vec<String>)> {
let mut consumers: Vec<_> = self.pick_leader_tx_consumers_with_targets();
.collect();

let mut rng = SeedableRng::from_entropy();

for _ in 0..N_CONSUMERS - consumers.len().min(N_CONSUMERS) {
// TODO ... make it so that the sample can not degenerate
match self.sample_consumer_stake_weighted(&mut rng) {
Some(pick) => {
consumers.push((pick, vec![]));
}
None => {
break;
}
}
}

if let Some(random_tx_consumer) = self.pick_random_tx_consumer_with_target() {
consumers.push(random_tx_consumer);
let n = consumers.len();
if n > 0 && !self.leader_tpus.is_empty() {
let chunk_len = self.leader_tpus.len() / n;
for (i, tpus) in self.leader_tpus.chunks(chunk_len).enumerate() {
for tpu in tpus {
for j in 0..N_COPIES {
consumers[(i + j) % n].1.push(tpu.clone());
}
}
}
}

consumers
Expand Down Expand Up @@ -154,7 +168,7 @@ impl Balancer {
) -> std::result::Result<(), Box<dyn std::error::Error>> {
info!("Forwarding tx {}...", &signature);

let tx_consumers = self.pick_tx_consumers();
let tx_consumers = self.pick_consumers();

if tx_consumers.len() == 0 {
error!("Dropping tx, no available clients");
Expand Down Expand Up @@ -193,7 +207,7 @@ pub fn balancer_updater(
);

let mut refresh_cluster_nodes_hint = Box::pin(
tokio_stream::iter(std::iter::repeat(())).throttle(tokio::time::Duration::from_secs(3600)),
tokio_stream::iter(std::iter::repeat(())).throttle(tokio::time::Duration::from_secs(NODES_REFRESH_SECONDS)),
);

let mut tpu_by_identity = Default::default();
Expand All @@ -206,19 +220,20 @@ pub fn balancer_updater(
Ok(result) => {
info!("Updated TPUs by identity (count: {})", result.len());
tpu_by_identity = result;
info!("identities updated # {}", serde_json::to_string(&tpu_by_identity).unwrap_or_else(|_| "null".to_string()));
},
Err(err) => error!("Failed to update TPUs by identity: {}", err)
};
},
Some(leaders) = rx_leaders.next() => {
let leaders_with_tpus = leaders
.iter()
.filter_map(|identity| match tpu_by_identity.get(identity) {
.into_iter()
.filter_map(|identity| match tpu_by_identity.get(&identity) {
Some(tpu) => Some((identity.clone(), tpu.clone())),
_=> None,
})
.collect();
info!("New leaders {:?}", &leaders_with_tpus);
info!("new leaders # {:?}", &leaders_with_tpus);
balancer.write().await.set_leaders(leaders_with_tpus);
},
}
Expand Down
8 changes: 7 additions & 1 deletion server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ use std::sync::Arc;
use structopt::StructOpt;
use tokio::sync::RwLock;

pub const N_COPIES: usize = 2;
pub const N_LEADERS: u64 = 7;
pub const N_CONSUMERS: usize = 3;
pub const LEADER_REFRESH_SECONDS: u64 = 3600;
pub const NODES_REFRESH_SECONDS: u64 = 60;

#[derive(Debug, StructOpt)]
struct Params {
#[structopt(long = "tls-grpc-server-cert")]
Expand Down Expand Up @@ -70,7 +76,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let params = Params::from_args();

let client = Arc::new(solana_client(params.rpc_url, params.rpc_commitment));
let balancer = Arc::new(RwLock::new(Balancer::new()));
let balancer = Arc::new(RwLock::new(Balancer::default()));

let pubsub_client = Arc::new(PubsubClient::new(&params.ws_rpc_url).await?);

Expand Down
9 changes: 5 additions & 4 deletions server/solana_service.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::{ N_LEADERS, LEADER_REFRESH_SECONDS };
use crate::{metrics, rpc_server::Mode};
use log::{debug, error, info};
use solana_client::{
Expand Down Expand Up @@ -86,7 +87,6 @@ pub fn get_tpu_by_identity(
.collect())
}

const MAX_LEADERS: u64 = 5;
pub fn leaders_stream(
client: Arc<RpcClient>,
pubsub_client: Arc<PubsubClient>,
Expand All @@ -96,7 +96,7 @@ pub fn leaders_stream(
tokio::spawn(async move {
let mut refresh_leaders_schedule_hint = Box::pin(
tokio_stream::iter(std::iter::repeat(()))
.throttle(tokio::time::Duration::from_secs(3600)),
.throttle(tokio::time::Duration::from_secs(LEADER_REFRESH_SECONDS)),
); // todo implement some sound logic to refresh
let (mut slot_notifications, _slot_unsubscribe) = pubsub_client.slot_subscribe().await?;

Expand All @@ -106,11 +106,12 @@ pub fn leaders_stream(
loop {
tokio::select! {
_ = refresh_leaders_schedule_hint.next() => {
info!("Will refresh leaders..");
info!("Will refresh leaders...");
schedule = get_leader_schedule(client.as_ref())?;
info!("leaders refreshed # {}", serde_json::to_string(&schedule).unwrap_or_else(|_| "null".to_string()));
},
Some(slot_info) = slot_notifications.next() => {
let current_leaders: HashSet<_> = (0..MAX_LEADERS)
let current_leaders: HashSet<_> = (0..N_LEADERS)
.map(|nth_leader| nth_leader * 4 + (slot_info.slot % 432000))
.map(|slot| schedule.get(&slot))
.flatten()
Expand Down

0 comments on commit 2cddcea

Please sign in to comment.