Skip to content

Commit 8c71154

Browse files
committed
fix: status indicator works now + fixed async/sync context
1 parent 3230bb6 commit 8c71154

File tree

7 files changed

+188
-189
lines changed

7 files changed

+188
-189
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@
22
**.log
33
tmp/
44
.vscode
5-
db/
5+
db/
6+
archives

tinydancer/src/consensus.rs

Lines changed: 95 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use crate::tinydancer::{endpoint, ClientService, ClientStatus, Cluster};
21
use crate::sampler::{ArchiveConfig, SlotSubscribeResponse};
2+
use crate::tinydancer::{endpoint, ClientService, ClientStatus, Cluster};
33
use crate::{convert_to_websocket, send_rpc_call, try_coerce_shred};
44
use anyhow::anyhow;
55
use async_trait::async_trait;
@@ -12,42 +12,42 @@ use rayon::prelude::*;
1212
use reqwest::Request;
1313
use rocksdb::{ColumnFamily, Options as RocksOptions, DB};
1414
use serde::de::DeserializeOwned;
15+
use serde_derive::Deserialize;
16+
use serde_derive::Serialize;
1517
use solana_ledger::shred::{ShredId, ShredType};
1618
use solana_ledger::{
17-
ancestor_iterator::{AncestorIterator, AncestorIteratorWithHash},
18-
blockstore::Blockstore,
19-
// blockstore_db::columns::ShredCode,
20-
shred::{Nonce, Shred, ShredCode, ShredData, ShredFetchStats, SIZE_OF_NONCE},
19+
ancestor_iterator::{AncestorIterator, AncestorIteratorWithHash},
20+
blockstore::Blockstore,
21+
// blockstore_db::columns::ShredCode,
22+
shred::{Nonce, Shred, ShredCode, ShredData, ShredFetchStats, SIZE_OF_NONCE},
2123
};
2224
use solana_sdk::hash::hashv;
2325
use solana_sdk::{
24-
clock::Slot,
25-
genesis_config::ClusterType,
26-
hash::{Hash, HASH_BYTES},
27-
packet::PACKET_DATA_SIZE,
28-
pubkey::{Pubkey, PUBKEY_BYTES},
29-
signature::{Signable, Signature, Signer, SIGNATURE_BYTES},
30-
signer::keypair::Keypair,
31-
timing::{duration_as_ms, timestamp},
26+
clock::Slot,
27+
genesis_config::ClusterType,
28+
hash::{Hash, HASH_BYTES},
29+
packet::PACKET_DATA_SIZE,
30+
pubkey::{Pubkey, PUBKEY_BYTES},
31+
signature::{Signable, Signature, Signer, SIGNATURE_BYTES},
32+
signer::keypair::Keypair,
33+
timing::{duration_as_ms, timestamp},
3234
};
3335
use std::str::FromStr;
3436
use std::sync::atomic::{AtomicU32, Ordering};
35-
use std::sync::{Arc, Mutex, MutexGuard};
37+
use std::sync::Arc;
3638
use std::{error::Error, ops::Add};
3739
use std::{
38-
net::{SocketAddr, UdpSocket},
39-
thread::Builder,
40+
net::{SocketAddr, UdpSocket},
41+
thread::Builder,
4042
};
4143
use tiny_logger::logs::{debug, error, info};
4244
use tokio::{
43-
sync::mpsc::UnboundedSender,
44-
task::{JoinError, JoinHandle},
45-
sync::Mutex as TokioMutex,
45+
sync::mpsc::UnboundedSender,
46+
sync::{Mutex, MutexGuard},
47+
task::{JoinError, JoinHandle},
4648
};
4749
use tungstenite::{connect, Message};
4850
use url::Url;
49-
use serde_derive::Deserialize;
50-
use serde_derive::Serialize;
5151

5252
pub struct ConsensusService {
5353
consensus_indices: Vec<u64>,
@@ -58,20 +58,20 @@ pub struct ConsensusServiceConfig {
5858
pub cluster: Cluster,
5959
pub archive_config: ArchiveConfig,
6060
pub instance: Arc<rocksdb::DB>,
61-
pub status_consensus: Arc<TokioMutex<ClientStatus>>,
61+
pub client_status: Arc<Mutex<ClientStatus>>,
6262
pub sample_qty: usize,
6363
}
6464

6565
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
6666
#[serde(rename_all = "camelCase")]
67-
pub struct RpcBlockCommitment<T> {
67+
pub struct RpcBlockCommitment<T> {
6868
pub commitment: Option<T>,
6969
pub total_stake: u64,
7070
}
7171

7272
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
7373
#[serde(rename_all = "camelCase")]
74-
pub struct GetCommittmentResponse {
74+
pub struct GetCommittmentResponse {
7575
pub jsonrpc: String,
7676
pub result: RpcBlockCommitment<BlockCommitmentArray>,
7777
pub id: i64,
@@ -84,96 +84,95 @@ pub const VOTE_THRESHOLD_SIZE: f64 = 2f64 / 3f64;
8484

8585
#[async_trait]
8686
impl ClientService<ConsensusServiceConfig> for ConsensusService {
87-
type ServiceError = tokio::task::JoinError;
87+
type ServiceError = tokio::task::JoinError;
8888

89-
fn new(config: ConsensusServiceConfig) -> Self {
90-
let consensus_handler = tokio::spawn(async move {
91-
let rpc_url = endpoint(config.cluster);
92-
let pub_sub = convert_to_websocket!(rpc_url);
89+
fn new(config: ConsensusServiceConfig) -> Self {
90+
let consensus_handler = tokio::spawn(async move {
91+
let rpc_url = endpoint(config.cluster);
92+
let pub_sub = convert_to_websocket!(rpc_url);
9393

94-
let mut threads = Vec::default();
94+
let mut threads = Vec::default();
9595

96-
let (slot_update_tx, slot_update_rx) = crossbeam::channel::unbounded::<u64>();
96+
let (slot_update_tx, slot_update_rx) = crossbeam::channel::unbounded::<u64>();
9797

98-
let status_arc = config.status_consensus.clone();
98+
let status_arc = config.client_status.clone();
9999

100-
// waits on new slots => triggers slot_verify_loop
101-
threads.push(tokio::spawn(slot_update_loop(
102-
slot_update_tx,
103-
pub_sub,
104-
config.status_consensus,
105-
)));
100+
// waits on new slots => triggers slot_verify_loop
101+
threads.push(tokio::spawn(slot_update_loop(
102+
slot_update_tx,
103+
pub_sub,
104+
config.client_status,
105+
)));
106106

107-
// verify slot votes
108-
threads.push(tokio::spawn(slot_verify_loop(
109-
slot_update_rx,
110-
rpc_url,
111-
status_arc,
112-
)));
107+
// verify slot votes
108+
threads.push(tokio::spawn(slot_verify_loop(
109+
slot_update_rx,
110+
rpc_url,
111+
status_arc,
112+
)));
113113

114+
for thread in threads {
115+
thread.await;
116+
}
117+
});
114118

115-
for thread in threads {
116-
thread.await;
119+
Self {
120+
consensus_handler,
121+
consensus_indices: Vec::default(),
117122
}
118-
});
119-
120-
Self {
121-
consensus_handler,
122-
consensus_indices: Vec::default(),
123123
}
124-
}
125124

126-
async fn join(self) -> std::result::Result<(), Self::ServiceError> {
127-
self.consensus_handler.await
128-
}
125+
async fn join(self) -> std::result::Result<(), Self::ServiceError> {
126+
self.consensus_handler.await
127+
}
129128
}
130129

131130
pub async fn slot_update_loop(
132131
slot_update_tx: Sender<u64>,
133132
pub_sub: String,
134-
status_sampler: Arc<TokioMutex<ClientStatus>>,
133+
client_status: Arc<Mutex<ClientStatus>>,
135134
) -> anyhow::Result<()> {
136-
let result = match connect(Url::parse(pub_sub.as_str()).unwrap()) {
137-
Ok((socket, _response)) => Some((socket, _response)),
138-
Err(_) => {
139-
let mut status = status_sampler.lock().await;
140-
*status = ClientStatus::Crashed(String::from("Client can't connect to socket"));
141-
None
142-
}
143-
};
144-
145-
if result.is_none() {
146-
return Err(anyhow!(""));
147-
}
148-
149-
let (mut socket, _response) = result.unwrap();
150-
151-
socket.write_message(Message::Text(
152-
r#"{ "jsonrpc": "2.0", "id": 1, "method": "slotSubscribe" }"#.into(),
153-
))?;
135+
let result = match connect(Url::parse(pub_sub.as_str()).unwrap()) {
136+
Ok((socket, _response)) => Some((socket, _response)),
137+
Err(_) => {
138+
let mut status = client_status.lock().await;
139+
*status = ClientStatus::Crashed(String::from("Client can't connect to socket"));
140+
None
141+
}
142+
};
154143

155-
loop {
156-
match socket.read_message() {
157-
Ok(msg) => {
158-
let res = serde_json::from_str::<SlotSubscribeResponse>(msg.to_string().as_str());
144+
if result.is_none() {
145+
return Err(anyhow!(""));
146+
}
159147

160-
// info!("res: {:?}", msg.to_string().as_str());
161-
if let Ok(res) = res {
162-
match slot_update_tx.send(res.params.result.root as u64) {
163-
Ok(_) => {
164-
info!("slot updated: {:?}", res.params.result.root);
165-
}
166-
Err(e) => {
167-
info!("error here: {:?} {:?}", e, res.params.result.root as u64);
168-
continue; // @TODO: we should add retries here incase send fails for some reason
148+
let (mut socket, _response) = result.unwrap();
149+
150+
socket.write_message(Message::Text(
151+
r#"{ "jsonrpc": "2.0", "id": 1, "method": "slotSubscribe" }"#.into(),
152+
))?;
153+
154+
loop {
155+
match socket.read_message() {
156+
Ok(msg) => {
157+
let res = serde_json::from_str::<SlotSubscribeResponse>(msg.to_string().as_str());
158+
159+
// info!("res: {:?}", msg.to_string().as_str());
160+
if let Ok(res) = res {
161+
match slot_update_tx.send(res.params.result.root as u64) {
162+
Ok(_) => {
163+
info!("slot updated: {:?}", res.params.result.root);
164+
}
165+
Err(e) => {
166+
info!("error here: {:?} {:?}", e, res.params.result.root as u64);
167+
continue; // @TODO: we should add retries here incase send fails for some reason
168+
}
169169
}
170170
}
171171
}
172+
Err(e) => info!("err: {:?}", e),
172173
}
173-
Err(e) => info!("err: {:?}", e),
174174
}
175175
}
176-
}
177176

178177
// verifies the total vote on the slot > 2/3
179178
fn verify_slot(slot_commitment: RpcBlockCommitment<BlockCommitmentArray>) -> bool {
@@ -191,20 +190,20 @@ fn verify_slot(slot_commitment: RpcBlockCommitment<BlockCommitmentArray>) -> boo
191190
pub async fn slot_verify_loop(
192191
slot_update_rx: Receiver<u64>,
193192
endpoint: String,
194-
status_sampler: Arc<tokio::sync::Mutex<ClientStatus>>,
193+
client_status: Arc<Mutex<ClientStatus>>,
195194
) -> anyhow::Result<()> {
196-
loop {
197-
let mut status = status_sampler.lock().await;
195+
loop {
196+
let mut status = client_status.lock().await;
198197
if let ClientStatus::Crashed(_) = &*status {
199198
return Err(anyhow!("Client crashed"));
200199
} else {
201-
*status = ClientStatus::Active(String::from(
202-
"Monitoring Tinydancer: Verifying consensus",
203-
));
200+
*status =
201+
ClientStatus::Active(String::from("Monitoring Tinydancer: Verifying consensus"));
204202
}
203+
drop(status);
205204
if let Ok(slot) = slot_update_rx.recv() {
206205
let slot_commitment_result = request_slot_voting(slot, &endpoint).await;
207-
206+
208207
if let Err(e) = slot_commitment_result {
209208
println!("Error {}", e);
210209
info!("{}", e);
@@ -229,7 +228,6 @@ pub async fn request_slot_voting(
229228
slot: u64,
230229
endpoint: &String,
231230
) -> Result<GetCommittmentResponse, serde_json::Error> {
232-
233231
let request = serde_json::json!({
234232
"jsonrpc": "2.0",
235233
"id": 1,

tinydancer/src/macros.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,17 @@ macro_rules! block_on {
1717
rt.handle().block_on($func).expect($error);
1818
};
1919
}
20+
#[macro_export]
21+
macro_rules! block_on_async {
22+
($func:expr) => {{
23+
let rt = tokio::runtime::Builder::new_current_thread()
24+
.enable_all()
25+
.build()
26+
.unwrap();
27+
rt.block_on($func)
28+
}};
29+
}
30+
2031
#[macro_export]
2132
macro_rules! try_coerce_shred {
2233
($response:expr) => {{

tinydancer/src/main.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ use std::{
4646
use tinydancer::{endpoint, Cluster, TinyDancer, TinyDancerConfig};
4747
mod macros;
4848
use colored::Colorize;
49+
mod consensus;
4950
mod rpc_wrapper;
5051
mod sampler;
51-
mod consensus;
5252
mod ui;
5353

5454
use anyhow::{anyhow, Result};
@@ -149,7 +149,7 @@ async fn main() -> Result<()> {
149149
archive_path,
150150
shred_archive_duration,
151151
tui_monitor,
152-
consensus_mode
152+
consensus_mode,
153153
} => {
154154
let config_file =
155155
get_config_file().map_err(|_| anyhow!("tinydancer config not set"))?;
@@ -234,8 +234,6 @@ async fn main() -> Result<()> {
234234
}
235235
}
236236
ConfigSubcommands::Set { log_path, cluster } => {
237-
// println!("{:?}", fs::create_dir_all("~/.config/tinydancer"));
238-
239237
let home_path = std::env::var("HOME").unwrap();
240238
let tinydancer_dir = home_path + "/.config/tinydancer";
241239

0 commit comments

Comments
 (0)