Skip to content

Commit

Permalink
Fix/reduce sync requests (#62)
Browse files Browse the repository at this point in the history
* Split client subscribe method into parts: subscribe_markets, subscribe_oracles, subscribe_blockhashes
* subscribe_markets and subscribe_oracles receive a list of markets to subscribe for
* Prefix DriftClient sync methods with try_
* Refactor OracleMap and public functions to take MarketIds
* WebSocketAccountSubscriber don't fetch account on start by default
* Replace marketmap GPA with individual subscriptions
* Fix doc comments+spelling
  • Loading branch information
jordy25519 authored Oct 21, 2024
1 parent 9e08dbe commit ad92773
Show file tree
Hide file tree
Showing 14 changed files with 626 additions and 570 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ rpc_tests = []

[dependencies]
abi_stable = "0.11"
ahash = "0.8.11"
anchor-lang = { version = "0.30", features = ["derive"] }
base64 = "0.22"
bytemuck = "1.17"
dashmap = "6"
env_logger = "0.11"
fnv = "1"
futures-util = "0.3"
log = "0.4"
rayon = { version = "1.9.0", optional = true }
Expand All @@ -49,4 +49,4 @@ hex = "0.4"
hex-literal = "0.4"

[build-dependencies]
drift-idl-gen = { version = "0.1.1", path = "crates/drift-idl-gen"}
drift-idl-gen = { version = "0.1.1", path = "crates/drift-idl-gen"}
31 changes: 16 additions & 15 deletions crates/src/account_map.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use std::sync::{Arc, Mutex, RwLock};

use anchor_lang::AccountDeserialize;
use fnv::FnvHashMap;
use dashmap::DashMap;
use log::debug;
use solana_sdk::{clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey};

use crate::{
utils::get_ws_url, websocket_account_subscriber::WebsocketAccountSubscriber, SdkResult,
UnsubHandle,
};

const LOG_TARGET: &str = "accountmap";

#[derive(Clone, Default)]
pub struct AccountSlot {
raw: Vec<u8>,
Expand All @@ -24,7 +27,7 @@ pub struct DataAndSlot<T> {
pub struct AccountMap {
endpoint: String,
commitment: CommitmentConfig,
inner: RwLock<FnvHashMap<Pubkey, AccountSub<Subscribed>>>,
inner: DashMap<Pubkey, AccountSub<Subscribed>, ahash::RandomState>,
}

impl AccountMap {
Expand All @@ -37,26 +40,23 @@ impl AccountMap {
}
/// Subscribe user account
pub async fn subscribe_account(&self, account: &Pubkey) -> SdkResult<()> {
{
let map = self.inner.read().expect("acquired");
if map.contains_key(account) {
return Ok(());
}
if self.inner.contains_key(account) {
return Ok(());
}
debug!(target: LOG_TARGET, "subscribing: {account:?}");

let user = AccountSub::new(&self.endpoint, self.commitment, *account);
let user = user.subscribe().await?;

let mut map = self.inner.write().expect("acquired");
map.insert(*account, user);
self.inner.insert(*account, user);

Ok(())
}
/// Unsubscribe user account
pub fn unsubscribe_account(&self, account: &Pubkey) {
let mut map = self.inner.write().expect("acquired");
if let Some(u) = map.remove(account) {
let _ = u.unsubscribe();
if let Some((acc, unsub)) = self.inner.remove(account) {
debug!(target: LOG_TARGET, "unsubscribing: {acc:?}");
let _ = unsub.unsubscribe();
}
}
/// Return data of the given `account` as T, if it exists
Expand All @@ -68,8 +68,9 @@ impl AccountMap {
&self,
account: &Pubkey,
) -> Option<DataAndSlot<T>> {
let accounts = self.inner.read().expect("read");
accounts.get(account).map(|u| u.get_account_data_and_slot())
self.inner
.get(account)
.map(|u| u.get_account_data_and_slot())
}
}

Expand Down Expand Up @@ -111,7 +112,7 @@ impl AccountSub<Unsubscribed> {
let data_and_slot = Arc::new(RwLock::new(AccountSlot::default()));
let unsub = self
.subscription
.subscribe(Self::SUBSCRIPTION_ID, {
.subscribe(Self::SUBSCRIPTION_ID, true, {
let data_and_slot = Arc::clone(&data_and_slot);
move |update| {
let mut guard = data_and_slot.write().expect("acquired");
Expand Down
12 changes: 6 additions & 6 deletions crates/src/event_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use std::{
time::Duration,
};

use ahash::HashSet;
use anchor_lang::{AnchorDeserialize, Discriminator};
use base64::Engine;
use fnv::FnvHashSet;
use futures_util::{future::BoxFuture, stream::FuturesOrdered, FutureExt, Stream, StreamExt};
use log::{debug, info, warn};
use regex::Regex;
Expand Down Expand Up @@ -656,15 +656,15 @@ impl DriftEvent {
/// fixed capacity cache of tx signatures
struct TxSignatureCache {
capacity: usize,
entries: FnvHashSet<String>,
entries: HashSet<String>,
age: VecDeque<String>,
}

impl TxSignatureCache {
fn new(capacity: usize) -> Self {
Self {
capacity,
entries: FnvHashSet::<String>::with_capacity_and_hasher(capacity, Default::default()),
entries: HashSet::<String>::with_capacity_and_hasher(capacity, Default::default()),
age: VecDeque::with_capacity(capacity),
}
}
Expand All @@ -689,9 +689,9 @@ impl TxSignatureCache {

#[cfg(test)]
mod test {
use ahash::HashMap;
use anchor_lang::prelude::*;
use base64::Engine;
use fnv::FnvHashMap;
use futures_util::future::ready;
use solana_sdk::{
hash::Hash,
Expand Down Expand Up @@ -852,7 +852,7 @@ mod test {
async fn polled_event_stream_caching() {
let _ = env_logger::try_init();
struct MockRpcProvider {
tx_responses: FnvHashMap<String, EncodedTransactionWithStatusMeta>,
tx_responses: HashMap<String, EncodedTransactionWithStatusMeta>,
signatures: tokio::sync::Mutex<Vec<String>>,
}

Expand Down Expand Up @@ -952,7 +952,7 @@ mod test {
let signatures: Vec<String> = (0..order_events.len())
.map(|_| Signature::new_unique().to_string())
.collect();
let mut tx_responses = FnvHashMap::<String, EncodedTransactionWithStatusMeta>::default();
let mut tx_responses = HashMap::<String, EncodedTransactionWithStatusMeta>::default();
for s in signatures.iter() {
let (oar, or) = order_events.pop().unwrap();
tx_responses.insert(
Expand Down
14 changes: 5 additions & 9 deletions crates/src/jit_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
accounts::User,
build_accounts,
constants::{self, state_account, JIT_PROXY_ID},
DriftClient, MarketId, MarketType, PostOnlyParam, ReferrerInfo, SdkError, SdkResult,
drift_idl, DriftClient, MarketId, MarketType, PostOnlyParam, ReferrerInfo, SdkError, SdkResult,
TransactionBuilder, Wallet,
};

Expand Down Expand Up @@ -131,7 +131,7 @@ impl JitProxyClient {
let program_data = tx_builder.program_data();
let account_data = tx_builder.account_data();

let writable_markets = match order.market_type {
let writable_markets = match order.market_type.into() {
MarketType::Perp => {
vec![MarketId::perp(order.market_index)]
}
Expand Down Expand Up @@ -161,18 +161,14 @@ impl JitProxyClient {
accounts.push(AccountMeta::new(referrer_info.referrer_stats(), false));
}

if order.market_type == MarketType::Spot {
if order.market_type == drift_idl::types::MarketType::Spot {
let spot_market_vault = self
.drift_client
.get_spot_market_account_and_slot(order.market_index)
.expect("spot market exists")
.data
.try_get_spot_market_account(order.market_index)?
.vault;
let quote_spot_market_vault = self
.drift_client
.get_spot_market_account_and_slot(MarketId::QUOTE_SPOT.index())
.expect("quote market exists")
.data
.try_get_spot_market_account(MarketId::QUOTE_SPOT.index())?
.vault;
accounts.push(AccountMeta::new_readonly(spot_market_vault, false));
accounts.push(AccountMeta::new_readonly(quote_spot_market_vault, false));
Expand Down
Loading

0 comments on commit ad92773

Please sign in to comment.