Skip to content

Commit

Permalink
Feat 🛠️ / Implement A/B testing (#25)
Browse files Browse the repository at this point in the history
* feat: implement A/B testing
  • Loading branch information
butonium authored Mar 20, 2023
1 parent 7fc948b commit 8157592
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 33 deletions.
7 changes: 7 additions & 0 deletions server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ struct Params {
#[structopt(long = "rpc-commitment", default_value = "finalized")]
rpc_commitment: String,

#[structopt(long = "test-partners")]
test_partners: Option<Vec<String>>,

#[structopt(long = "stake-override-identity")]
stake_override_identity: Vec<String>,

Expand Down Expand Up @@ -121,10 +124,14 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
}
});
}
if let Some(partners) = &params.test_partners {
info!("Test partners loaded: {:?}", &partners);
}

let _rpc_server = spawn_rpc_server(
params.rpc_addr.parse().unwrap(),
params.jwt_public_key,
params.test_partners,
balancer.clone(),
tx_signatures,
);
Expand Down
17 changes: 10 additions & 7 deletions server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,22 @@ lazy_static! {
vec![0.002, 0.004, 0.008, 0.016, 0.032, 0.064, 0.128]
)
.unwrap();
pub static ref CHAIN_TX_FINALIZED: IntCounter = register_int_counter!(
pub static ref CHAIN_TX_FINALIZED: IntGaugeVec = register_int_gauge_vec!(
"mtx_chain_tx_finalized",
"How many transactions were finalized on chain"
"How many transactions were finalized on chain",
&["partner", "mode"]
)
.unwrap();
pub static ref CHAIN_TX_TIMEOUT: IntCounter = register_int_counter!(
pub static ref CHAIN_TX_TIMEOUT: IntGaugeVec = register_int_gauge_vec!(
"mtx_chain_tx_timeout",
"How many transactions we were unable to confirm as finalized"
"How many transactions we were unable to confirm as finalized",
&["partner", "mode"]
)
.unwrap();
pub static ref CHAIN_TX_EXECUTION_SUCCESS: IntCounter = register_int_counter!(
pub static ref CHAIN_TX_EXECUTION_SUCCESS: IntGaugeVec = register_int_gauge_vec!(
"mtx_chain_tx_execution_success",
"How many transactions ended on chain without errors"
"How many transactions ended on chain without errors",
&["partner", "mode"]
)
.unwrap();
pub static ref CHAIN_TX_EXECUTION_ERROR: IntCounter = register_int_counter!(
Expand All @@ -55,7 +58,7 @@ lazy_static! {
pub static ref SERVER_RPC_TX_ACCEPTED: IntGaugeVec = register_int_gauge_vec!(
"mtx_server_rpc_tx_accepted",
"How many transactions were accepted by the server",
&["partner"]
&["partner", "mode"]
)
.unwrap();
pub static ref SERVER_RPC_TX_BYTES_IN: IntCounter = register_int_counter!(
Expand Down
63 changes: 53 additions & 10 deletions server/rpc_server.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,39 @@
use crate::auth::{authenticate, load_public_key, Auth};
use crate::solana_service::SignatureWrapper;
use crate::{balancer::*, metrics};
use bincode::config::Options;
use jsonrpc_core::{BoxFuture, MetaIoHandler, Metadata, Result};
use jsonrpc_derive::rpc;
use jsonrpc_http_server::hyper::http::header::AUTHORIZATION;
use jsonrpc_http_server::*;
use log::{error, info};
use rand::Rng;
use serde::{Deserialize, Serialize};
use solana_sdk::{
packet::PACKET_DATA_SIZE, signature::Signature, transaction::VersionedTransaction,
};
use std::sync::Arc;
use solana_sdk::{packet::PACKET_DATA_SIZE, transaction::VersionedTransaction};
use std::{fmt, sync::Arc};
use tokio::sync::{mpsc::UnboundedSender, RwLock};

#[derive(Clone, Debug, PartialEq)]
pub enum Mode {
BLACKHOLE,
FORWARD,
}

impl fmt::Display for Mode {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Mode::BLACKHOLE => write!(f, "BLACKHOLE"),
Mode::FORWARD => write!(f, "FORWARD"),
}
}
}

#[derive(Clone)]
pub struct RpcMetadata {
auth: std::result::Result<Auth, String>,
balancer: Arc<RwLock<Balancer>>,
tx_signatures: UnboundedSender<Signature>,
tx_signatures: UnboundedSender<SignatureWrapper>,
mode: Mode,
}
impl Metadata for RpcMetadata {}

Expand Down Expand Up @@ -68,7 +84,7 @@ impl Rpc for RpcServer {

info!("RPC method sendPriorityTransaction called: {:?}", auth);
metrics::SERVER_RPC_TX_ACCEPTED
.with_label_values(&[&auth.to_string()])
.with_label_values(&[&auth.to_string(), &meta.mode.to_string()])
.inc();
metrics::SERVER_RPC_TX_BYTES_IN.inc_by(data.len() as u64);

Expand Down Expand Up @@ -100,10 +116,19 @@ impl Rpc for RpcServer {
Err(err) => return Box::pin(async move { Err(err) }),
};

if let Err(err) = meta.tx_signatures.send(signature.clone()) {
if let Err(err) = meta.tx_signatures.send(SignatureWrapper {
signature: signature.clone(),
partner_name: auth.to_string(),
mode: meta.mode.clone(),
}) {
error!("Failed to propagate signature to the watcher: {}", err);
}

if meta.mode == Mode::BLACKHOLE {
info!("Transaction blackholed: {}", signature.to_string());
return Box::pin(async move { Ok(signature.to_string()) });
}

Box::pin(async move {
match meta
.balancer
Expand Down Expand Up @@ -134,17 +159,33 @@ pub fn get_io_handler() -> MetaIoHandler<RpcMetadata> {
io
}

fn select_mode(auth: Option<Auth>, partners: Vec<String>) -> Mode {
let mut mode = Mode::FORWARD;

if let Some(auth) = auth {
if partners.contains(&auth.to_string()) {
let mut rng = rand::thread_rng();
if rng.gen::<bool>() {
mode = Mode::BLACKHOLE;
}
}
}
mode
}

pub fn spawn_rpc_server(
rpc_addr: std::net::SocketAddr,
jwt_public_key_path: String,
test_partners: Option<Vec<String>>,
balancer: Arc<RwLock<Balancer>>,
tx_signatures: UnboundedSender<Signature>,
tx_signatures: UnboundedSender<SignatureWrapper>,
) -> Server {
info!("Spawning RPC server.");
let public_key = Arc::new(
load_public_key(jwt_public_key_path)
.expect("Failed to load public key used to verify JWTs"),
);
let partners = test_partners.unwrap_or_default();

ServerBuilder::with_meta_extractor(
get_io_handler(),
Expand All @@ -153,12 +194,14 @@ pub fn spawn_rpc_server(
.headers()
.get(AUTHORIZATION)
.map(|header_value| header_value.to_str().unwrap().to_string());
let auth =
authenticate((*public_key).clone(), auth_header).map_err(|err| err.to_string());

RpcMetadata {
auth: authenticate((*public_key).clone(), auth_header)
.map_err(|err| err.to_string()),
auth: auth.clone(),
balancer: balancer.clone(),
tx_signatures: tx_signatures.clone(),
mode: select_mode(auth.clone().ok(), partners.clone()),
}
},
)
Expand Down
77 changes: 61 additions & 16 deletions server/solana_service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::metrics;
use crate::{metrics, rpc_server::Mode};
use log::{debug, error, info};
use solana_client::{
nonblocking::pubsub_client::PubsubClient, rpc_client::RpcClient,
Expand Down Expand Up @@ -137,11 +137,21 @@ pub fn leaders_stream(
struct SignatureRecord {
created_at: tokio::time::Instant,
signature: Signature,
mode: Mode,
partner_name: String,
}

#[derive(Debug, Clone)]
pub struct SignatureWrapper {
pub signature: Signature,
pub partner_name: String,
pub mode: Mode,
}

pub fn spawn_tx_signature_watcher(
client: Arc<RpcClient>,
) -> Result<UnboundedSender<Signature>, Box<dyn Error + Send + Sync>> {
let (tx_signature, rx_signature) = unbounded_channel::<Signature>();
) -> Result<UnboundedSender<SignatureWrapper>, Box<dyn Error + Send + Sync>> {
let (tx_signature, rx_signature) = unbounded_channel::<SignatureWrapper>();

let mut rx_signature = UnboundedReceiverStream::new(rx_signature);

Expand Down Expand Up @@ -171,18 +181,24 @@ pub fn spawn_tx_signature_watcher(
break;
}
{
let bundle = signature_queue.drain(0..to_be_bundled_count).map(|r| r.signature).collect::<Vec<_>>();
let bundle = signature_queue.drain(0..to_be_bundled_count).map(|r| SignatureWrapper {
signature: r.signature,
partner_name: r.partner_name,
mode: r.mode,
}).collect::<Vec<_>>();

spawn_signature_checker(client.clone(), bundle);
}
}
},
Some(signature) = rx_signature.next() => {
Some(wrapper) = rx_signature.next() => {
signature_queue.push_back(SignatureRecord {
created_at: tokio::time::Instant::now(),
signature: signature.clone(),
signature: wrapper.signature.clone(),
partner_name: wrapper.partner_name,
mode: wrapper.mode,
});
info!("Will watch for {:?}", &signature);
info!("Will watch for {:?}", &wrapper.signature);
},
else => break,
}
Expand All @@ -194,26 +210,55 @@ pub fn spawn_tx_signature_watcher(
Ok(tx_signature)
}

fn spawn_signature_checker(client: Arc<RpcClient>, bundle: Vec<Signature>) {
fn spawn_signature_checker(client: Arc<RpcClient>, bundle: Vec<SignatureWrapper>) {
tokio::spawn(async move {
match client.get_signature_statuses(&bundle) {
match client.get_signature_statuses(
&bundle
.iter()
.map(|f| f.signature)
.collect::<Vec<Signature>>(),
) {
Ok(response) => {
for signature_status in response.value {
for (i, signature_status) in response.value.iter().enumerate() {
let wrapper = bundle.get(i).unwrap();
if let Some(known_status) = signature_status {
info!("Signature status {:?}", known_status);
info!(
"Signature status {:?} | Partner: {:?} | Mode: {:?}",
known_status,
wrapper.partner_name,
wrapper.mode.to_string()
);
match known_status.err {
Some(_) => metrics::CHAIN_TX_EXECUTION_SUCCESS.inc(),
_ => metrics::CHAIN_TX_EXECUTION_SUCCESS.inc(),
Some(_) => metrics::CHAIN_TX_EXECUTION_SUCCESS
.with_label_values(&[
&wrapper.partner_name,
&wrapper.mode.to_string(),
])
.inc(),
_ => metrics::CHAIN_TX_EXECUTION_SUCCESS
.with_label_values(&[
&wrapper.partner_name,
&wrapper.mode.to_string(),
])
.inc(),
};
metrics::CHAIN_TX_FINALIZED.inc();
metrics::CHAIN_TX_FINALIZED
.with_label_values(&[&wrapper.partner_name, &wrapper.mode.to_string()])
.inc();
} else {
metrics::CHAIN_TX_TIMEOUT.inc();
metrics::CHAIN_TX_TIMEOUT
.with_label_values(&[&wrapper.partner_name, &wrapper.mode.to_string()])
.inc();
}
}
}
Err(err) => {
error!("Failed to get signature statuses: {}", err);
metrics::CHAIN_TX_TIMEOUT.inc_by(bundle.len() as u64);
for tx in bundle {
metrics::CHAIN_TX_TIMEOUT
.with_label_values(&[&tx.partner_name, &tx.mode.to_string()])
.inc();
}
}
}
});
Expand Down

0 comments on commit 8157592

Please sign in to comment.