Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
538 changes: 537 additions & 1 deletion packages/fastlane/Cargo.lock

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions packages/fastlane/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pipeit-fastlane"
version = "0.1.3"
version = "0.1.4"
edition = "2021"
description = "Native QUIC client for direct Solana TPU transaction submission"
license = "MIT"
Expand All @@ -26,6 +26,8 @@ solana-sdk = "3.0"
# Async runtime
tokio = { version = "1", features = ["full"] }
futures-util = "0.3"
yellowstone-grpc-client = { version = "10.2.0" }
yellowstone-grpc-proto = { version = "10.1.1" }

# Data structures
dashmap = "6"
Expand All @@ -37,8 +39,9 @@ thiserror = "1"
[build-dependencies]
napi-build = "2"

[patch.crates-io]
protobuf-src = { path = "vendor/protobuf-src" }

[profile.release]
lto = false
opt-level = 3


2 changes: 1 addition & 1 deletion packages/fastlane/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pipeit/fastlane",
"version": "0.1.3",
"version": "0.1.4",
"description": "Native QUIC client for direct Solana TPU transaction submission",
"main": "index.js",
"types": "index.d.ts",
Expand Down
16 changes: 13 additions & 3 deletions packages/fastlane/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ pub struct TpuClientConfig {
pub rpc_url: String,
/// WebSocket URL for slot update subscriptions.
pub ws_url: String,
/// Optional gRPC URL for Yellowstone slot subscriptions.
/// When set, this takes precedence over WebSocket tracking.
pub grpc_url: Option<String>,
/// Optional gRPC x-token for authenticated Yellowstone endpoints.
pub grpc_x_token: Option<String>,
/// Number of upcoming leaders to send transactions to (default: 2).
pub fanout: Option<u32>,
/// Whether to pre-warm connections to upcoming leaders (default: true).
Expand Down Expand Up @@ -145,7 +150,12 @@ impl TpuClient {

// Initialize leader tracker
let leader_tracker = runtime.block_on(async {
LeaderTracker::new(config.rpc_url.clone(), config.ws_url.clone())
LeaderTracker::new(
config.rpc_url.clone(),
config.ws_url.clone(),
config.grpc_url.clone(),
config.grpc_x_token.clone(),
)
.await
.context("Failed to create leader tracker")
}).map_err(anyhow_to_napi)?;
Expand Down Expand Up @@ -182,11 +192,11 @@ impl TpuClient {
let _ = lt_for_slots.run_slot_listener().await;
});

// Start socket updater (every 60 seconds)
// Start socket updater (every 10 seconds for fresher TPU sockets)
let lt_for_sockets = lt_clone.clone();
let socket_updater = tokio::spawn(async move {
lt_for_sockets
.run_socket_updater(Duration::from_secs(60))
.run_socket_updater(Duration::from_secs(10))
.await;
});

Expand Down
131 changes: 116 additions & 15 deletions packages/fastlane/src/tracker/leader_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct LeaderInfo {

/// TPU socket addresses for a validator.
/// Stores both normal and forwards ports for flexible routing.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TpuSockets {
/// Standard TPU QUIC socket address.
pub tpu_socket: Option<String>,
Expand All @@ -52,6 +52,10 @@ pub struct LeaderTracker {
rpc_url: String,
/// WebSocket URL for subscriptions.
ws_url: String,
/// Optional Yellowstone gRPC URL for slot subscriptions.
grpc_url: Option<String>,
/// Optional Yellowstone gRPC x-token for authenticated endpoints.
grpc_x_token: Option<String>,
/// Real-time slot tracker.
pub slots_tracker: RwLock<SlotsTracker>,
/// Leader schedule tracker.
Expand All @@ -69,7 +73,12 @@ impl LeaderTracker {
///
/// * `rpc_url` - RPC endpoint URL
/// * `ws_url` - WebSocket endpoint URL
pub async fn new(rpc_url: String, ws_url: String) -> Result<Self> {
pub async fn new(
rpc_url: String,
ws_url: String,
grpc_url: Option<String>,
grpc_x_token: Option<String>,
) -> Result<Self> {
let rpc_client = RpcClient::new(rpc_url.clone());

let schedule_tracker = ScheduleTracker::new(&rpc_client)
Expand All @@ -79,6 +88,8 @@ impl LeaderTracker {
Ok(Self {
rpc_url,
ws_url,
grpc_url,
grpc_x_token,
slots_tracker: RwLock::new(SlotsTracker::new()),
schedule_tracker: RwLock::new(schedule_tracker),
leader_sockets: RwLock::new(HashMap::new()),
Expand Down Expand Up @@ -272,7 +283,7 @@ impl LeaderTracker {
/// Updates the leader socket addresses from cluster nodes.
///
/// Fetches both normal TPU QUIC and TPU forwards QUIC addresses.
/// Should be called periodically (e.g., every 60 seconds) as
/// Should be called periodically (e.g., every 10 seconds) as
/// validator IPs can change.
pub async fn update_leader_sockets(&self) -> Result<()> {
let rpc_client = RpcClient::new(self.rpc_url.clone());
Expand All @@ -282,29 +293,39 @@ impl LeaderTracker {
.await
.context("Failed to fetch cluster nodes")?;

let mut new_sockets = HashMap::new();
let mut sockets = self.leader_sockets.write().await;
let mut seen = HashSet::new();

for node in nodes {
let pubkey = node.pubkey.to_string();
seen.insert(pubkey.clone());

// Standard TPU QUIC socket (full SocketAddr from RPC)
let tpu_socket = node.tpu_quic.map(|addr| addr.to_string());

// TPU forwards QUIC socket (preferred by validators)
let tpu_forwards_socket = node.tpu_forwards_quic.map(|addr| addr.to_string());

// Only add if at least one socket is available
// Only store if at least one socket is available
if tpu_socket.is_some() || tpu_forwards_socket.is_some() {
new_sockets.insert(
node.pubkey.to_string(),
TpuSockets {
tpu_socket,
tpu_forwards_socket,
},
);
let new_entry = TpuSockets {
tpu_socket,
tpu_forwards_socket,
};

let needs_update = sockets
.get(&pubkey)
.map(|existing| existing != &new_entry)
.unwrap_or(true);

if needs_update {
sockets.insert(pubkey, new_entry);
}
}
}

let mut sockets = self.leader_sockets.write().await;
*sockets = new_sockets;
// Remove validators no longer present in the cluster nodes response.
sockets.retain(|pubkey, _| seen.contains(pubkey));

Ok(())
}
Expand All @@ -324,9 +345,18 @@ impl LeaderTracker {
}
}

/// Inner slot listener that handles the WebSocket connection.
/// Inner slot listener that handles the configured connection.
/// Returns when the connection ends (either normally or due to error).
async fn run_slot_listener_inner(&self) -> Result<()> {
if let Some(grpc_url) = self.grpc_url.as_ref() {
return self.run_grpc_slot_listener_inner(grpc_url).await;
}

self.run_wss_slot_listener_inner().await
}

/// Inner slot listener that handles the WebSocket connection.
async fn run_wss_slot_listener_inner(&self) -> Result<()> {
let ws_client = PubsubClient::new(&self.ws_url)
.await
.context("Failed to connect to WebSocket")?;
Expand All @@ -350,6 +380,76 @@ impl LeaderTracker {
Ok(())
}

/// Inner slot listener that handles the Yellowstone gRPC connection.
async fn run_grpc_slot_listener_inner(&self, grpc_url: &str) -> Result<()> {
use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcBuilder};
use yellowstone_grpc_proto::geyser::{
SubscribeRequest, SubscribeRequestFilterSlots,
subscribe_update::UpdateOneof,
};

let mut builder = GeyserGrpcBuilder::from_shared(grpc_url.to_string())
.context("Failed to build gRPC client")?;
let x_token = self.grpc_x_token.clone();
builder = builder.x_token(x_token).context("Failed to set gRPC x-token")?;

let mut client = builder
.tls_config(ClientTlsConfig::default().with_enabled_roots())
.context("Failed to configure gRPC TLS")?
.connect()
.await
.context("Failed to connect to gRPC endpoint")?;

let subscribe_request = SubscribeRequest {
slots: std::collections::HashMap::from([(
"fastlane-slot-tracker".to_string(),
SubscribeRequestFilterSlots {
interslot_updates: Some(true),
..Default::default()
},
)]),
..Default::default()
};

let mut stream = client
.subscribe_once(subscribe_request)
.await
.context("Failed to subscribe to gRPC slot updates")?;

let mut ready_set = false;
while let Some(result) = stream.next().await {
let update = result.context("gRPC slot stream error")?;
if let Some(UpdateOneof::Slot(slot_update)) = update.update_oneof {
let slot = slot_update.slot;

// Record the slot update (monotonic source; bypass outlier filtering)
let curr_slot = {
let mut tracker = self.slots_tracker.write().await;
tracker.record_monotonic(slot)
};

// Mark as ready once we start receiving updates
if !ready_set {
let mut ready = self.ready.write().await;
*ready = true;
ready_set = true;
}

// Check if we need to rotate to next epoch (keep schedule fresh across epoch boundaries)
let needs_rotation = {
let schedule_tracker = self.schedule_tracker.read().await;
curr_slot >= schedule_tracker.next_epoch_slot_start()
};

if needs_rotation {
self.rotate_epoch(curr_slot).await?;
}
}
}

Ok(())
}

/// Handles a single slot update event.
async fn handle_slot_event(&self, slot_update: SlotUpdate) -> Result<()> {
// Convert to our SlotEvent type
Expand Down Expand Up @@ -405,6 +505,7 @@ impl std::fmt::Debug for LeaderTracker {
f.debug_struct("LeaderTracker")
.field("rpc_url", &self.rpc_url)
.field("ws_url", &self.ws_url)
.field("grpc_url", &self.grpc_url)
.finish()
}
}
Expand Down
15 changes: 14 additions & 1 deletion packages/fastlane/src/tracker/slots_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@ impl SlotsTracker {
self.record(SlotEvent::End(slot))
}

/// Records a slot update from a monotonic source (e.g., gRPC).
///
/// Ignores out-of-order or duplicate slots and bypasses outlier filtering.
pub fn record_monotonic(&mut self, slot: Slot) -> Slot {
if slot <= self.current_slot {
return self.current_slot;
}

self.current_slot = slot;
self.recent_events.clear();
self.recent_events.push_back(SlotEvent::Start(slot));
self.current_slot
}

/// Estimates the current slot based on recent events.
///
/// Uses a median-based approach to filter outliers:
Expand Down Expand Up @@ -182,4 +196,3 @@ mod tests {
}
}


10 changes: 10 additions & 0 deletions packages/fastlane/vendor/protobuf-src/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "protobuf-src"
version = "1.1.0+21.5"
edition = "2021"
license = "Apache-2.0"
publish = false
description = "Local override to supply a prebuilt protoc without building C++ protobuf"

[dependencies]
protoc-bin-vendored = "2"
29 changes: 29 additions & 0 deletions packages/fastlane/vendor/protobuf-src/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::path::PathBuf;

/// Returns the path to a protoc binary.
///
/// Prefers the PROTOC env var if set, otherwise falls back to a vendored binary.
pub fn protoc() -> PathBuf {
if let Ok(path) = std::env::var("PROTOC") {
return PathBuf::from(path);
}

protoc_bin_vendored::protoc_bin_path()
.expect("protoc-bin-vendored should provide a protoc binary")
}

/// Returns the path to the protobuf include directory if available.
pub fn include() -> PathBuf {
if let Ok(path) = std::env::var("PROTOC_INCLUDE") {
return PathBuf::from(path);
}

let protoc_path = protoc();
if let Some(bin_dir) = protoc_path.parent() {
if let Some(root_dir) = bin_dir.parent() {
return root_dir.join("include");
}
}

PathBuf::new()
}