Skip to content

Commit 0a983af

Browse files
committed
feat(c-bridge): implement new watch_dog actor
1 parent 8023c56 commit 0a983af

File tree

3 files changed

+380
-4
lines changed

3 files changed

+380
-4
lines changed

bridges/centralized-ethereum/src/actors/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,6 @@ pub mod eth_poller;
1212

1313
/// wit_poller actor module
1414
pub mod wit_poller;
15+
16+
/// watch_dog actor module
17+
pub mod watch_dog;
Lines changed: 364 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,364 @@
1+
use crate::{
2+
actors::dr_database::{CountDrsPerState, DrDatabase},
3+
config::Config,
4+
};
5+
use actix::prelude::*;
6+
use async_jsonrpc_client::{transports::tcp::TcpSocket, Transport};
7+
use futures_util::compat::Compat01As03;
8+
use serde::{Deserialize, Serialize};
9+
use serde_json::json;
10+
use std::{
11+
sync::Arc,
12+
time::{Duration, Instant},
13+
};
14+
use web3::{
15+
contract::Contract,
16+
transports::Http,
17+
types::{H160, U256},
18+
};
19+
use witnet_net::client::tcp::{jsonrpc, JsonRpcClient};
20+
use witnet_node::utils::stop_system_if_panicking;
21+
22+
/// EthPoller actor reads periodically new requests from the WRB Contract and includes them
23+
/// in the DrDatabase
24+
#[derive(Default)]
25+
pub struct WatchDog {
26+
/// JSON RPC connection to Wit/node
27+
pub wit_jsonrpc_socket: String,
28+
/// Bridge UTXO min value threshold
29+
pub wit_utxo_min_value_threshold: u64,
30+
/// Web3 object
31+
pub eth_jsonrpc_url: String,
32+
/// Web3 signer address
33+
pub eth_account: H160,
34+
/// WitOracle bridge contract
35+
pub eth_contract: Option<Arc<Contract<web3::transports::Http>>>,
36+
/// Polling period for global status
37+
pub polling_rate_ms: u64,
38+
/// Instant at which the actor is created
39+
pub start_ts: Option<Instant>,
40+
/// Eth balance upon first metric report:
41+
pub start_eth_balance: Option<f64>,
42+
/// Wit balance upon last refund
43+
pub start_wit_balance: Option<f64>,
44+
}
45+
46+
#[derive(Serialize, Deserialize)]
47+
struct WatchDogOutput {
48+
pub running_secs: u64,
49+
}
50+
51+
impl Drop for WatchDog {
52+
fn drop(&mut self) {
53+
log::trace!("Dropping WatchDog");
54+
stop_system_if_panicking("WatchDog");
55+
}
56+
}
57+
58+
/// Make actor from EthPoller
59+
impl Actor for WatchDog {
60+
/// Every actor has to provide execution Context in which it can run.
61+
type Context = Context<Self>;
62+
63+
/// Method to be executed when the actor is started
64+
fn started(&mut self, ctx: &mut Self::Context) {
65+
log::debug!("WatchDog actor has been started!");
66+
67+
self.watch_global_status(None, None, ctx, Duration::from_millis(self.polling_rate_ms));
68+
}
69+
}
70+
71+
/// Required trait for being able to retrieve WatchDog address from system registry
72+
impl actix::Supervised for WatchDog {}
73+
impl SystemService for WatchDog {}
74+
75+
impl WatchDog {
76+
/// Initialize from config
77+
pub fn from_config(config: &Config, eth_contract: Arc<Contract<Http>>) -> Self {
78+
Self {
79+
wit_jsonrpc_socket: config.witnet_jsonrpc_socket.to_string(),
80+
wit_utxo_min_value_threshold: config.witnet_utxo_min_value_threshold,
81+
eth_account: config.eth_from,
82+
eth_contract: Some(eth_contract),
83+
eth_jsonrpc_url: config.eth_jsonrpc_url.clone(),
84+
polling_rate_ms: config.watch_dog_polling_rate_ms,
85+
start_ts: Some(Instant::now()),
86+
start_eth_balance: None,
87+
start_wit_balance: None,
88+
}
89+
}
90+
91+
fn watch_global_status(
92+
&mut self,
93+
eth_balance: Option<f64>,
94+
wit_balance: Option<f64>,
95+
ctx: &mut Context<Self>,
96+
period: Duration,
97+
) {
98+
if self.start_eth_balance.is_none() && eth_balance.is_some() {
99+
self.start_eth_balance = eth_balance;
100+
}
101+
if let Some(wit_balance) = wit_balance {
102+
if wit_balance > self.start_wit_balance.unwrap_or_default() {
103+
self.start_wit_balance = Some(wit_balance);
104+
log::warn!("Wit account refunded to {} $WIT", wit_balance);
105+
}
106+
}
107+
let start_eth_balance = self.start_eth_balance;
108+
let start_wit_balance = self.start_wit_balance;
109+
let wit_jsonrpc_socket = self.wit_jsonrpc_socket.clone();
110+
let wit_utxo_min_value_threshold = self.wit_utxo_min_value_threshold;
111+
let eth_jsonrpc_url = self.eth_jsonrpc_url.clone();
112+
let eth_account = self.eth_account;
113+
let eth_contract_address = self.eth_contract.clone().unwrap().address();
114+
let running_secs = self.start_ts.unwrap().elapsed().as_secs();
115+
116+
let fut = async move {
117+
let mut status = "up-and-running".to_string();
118+
119+
if let Err(err) = check_wit_connection_status(&wit_jsonrpc_socket).await {
120+
status = err;
121+
}
122+
let wit_client = JsonRpcClient::start(&wit_jsonrpc_socket)
123+
.expect("cannot start JSON/WIT connection");
124+
let wit_account = match fetch_wit_account(&wit_client).await {
125+
Ok(pkh) => pkh,
126+
Err(err) => {
127+
if status.eq("up-and-running") {
128+
status = err;
129+
}
130+
None
131+
}
132+
};
133+
134+
let wit_balance = match wit_account.clone() {
135+
Some(pkh) => match fetch_wit_account_balance(&wit_client, pkh.as_str()).await {
136+
Ok(wit_balance) => wit_balance,
137+
Err(err) => {
138+
if status.eq("up-and-running") {
139+
status = err;
140+
}
141+
None
142+
}
143+
},
144+
None => None,
145+
};
146+
147+
let wit_utxos_above_threshold = match wit_account.clone() {
148+
Some(pkh) => {
149+
match fetch_wit_account_count_utxos_above(
150+
&wit_client,
151+
pkh.as_str(),
152+
wit_utxo_min_value_threshold,
153+
)
154+
.await
155+
{
156+
Ok(wit_utxos_above_threshold) => wit_utxos_above_threshold,
157+
Err(err) => {
158+
if status.eq("up-and-running") {
159+
status = err;
160+
}
161+
None
162+
}
163+
}
164+
}
165+
None => None,
166+
};
167+
168+
let eth_balance = match check_eth_account_balance(&eth_jsonrpc_url, eth_account).await {
169+
Ok(Some(eth_balance)) => {
170+
let eth_balance: f64 = eth_balance.to_string().parse().unwrap_or_default();
171+
//Some(Unit::Wei(&eth_balance.to_string()).to_eth_str().unwrap_or_default()),
172+
Some(eth_balance / 1000000000000000000.0)
173+
}
174+
Ok(None) => None,
175+
Err(err) => {
176+
if status.eq("up-and-running") {
177+
status = err;
178+
}
179+
None
180+
}
181+
};
182+
183+
let dr_database = DrDatabase::from_registry();
184+
let (_, drs_pending, drs_finished, _) =
185+
dr_database.send(CountDrsPerState).await.unwrap().unwrap();
186+
187+
let mut metrics: String = "{".to_string();
188+
metrics.push_str(&format!("\"drsFinished\": {drs_finished}, "));
189+
metrics.push_str(&format!("\"drsPending\": {drs_pending}, "));
190+
metrics.push_str(&format!("\"evmAccount\": \"{eth_account}\", "));
191+
if eth_balance.is_some() {
192+
let eth_balance = eth_balance.unwrap();
193+
metrics.push_str(&format!("\"evmBalance\": {:.5}, ", eth_balance));
194+
metrics.push_str(&format!("\"evmContract\": \"{eth_contract_address}\", "));
195+
if let Some(start_eth_balance) = start_eth_balance {
196+
let eth_hourly_earnings =
197+
((eth_balance - start_eth_balance) / running_secs as f64) * 3600_f64;
198+
metrics.push_str(&format!(
199+
"\"evmHourlyEarnings\": {:.5}, ",
200+
eth_hourly_earnings
201+
));
202+
}
203+
}
204+
if wit_account.is_some() {
205+
metrics.push_str(&format!("\"witAccount\": {:?}, ", wit_account.unwrap()));
206+
}
207+
if wit_balance.is_some() {
208+
let wit_balance = wit_balance.unwrap();
209+
metrics.push_str(&format!("\"witBalance\": {:.5}, ", wit_balance));
210+
if let Some(start_wit_balance) = start_wit_balance {
211+
let wit_hourly_expenditure =
212+
((start_wit_balance - wit_balance) / running_secs as f64) * 3600_f64;
213+
metrics.push_str(&format!(
214+
"\"witHourlyExpenditure\": {:.1}, ",
215+
wit_hourly_expenditure
216+
));
217+
}
218+
}
219+
metrics.push_str(&format!("\"witNodeSocket\": \"{}\", ", wit_jsonrpc_socket));
220+
if wit_utxos_above_threshold.is_some() {
221+
metrics.push_str(&format!(
222+
"\"witUtxosAboveThreshold\": {}, ",
223+
wit_utxos_above_threshold.unwrap()
224+
));
225+
}
226+
metrics.push_str(&format!("\"runningSecs\": {running_secs}, "));
227+
metrics.push_str(&format!("\"status\": \"{status}\""));
228+
metrics.push_str("}}");
229+
log::info!("{metrics}");
230+
231+
(eth_balance, wit_balance)
232+
};
233+
234+
ctx.spawn(
235+
fut.into_actor(self)
236+
.then(move |(eth_balance, wit_balance), _act, ctx| {
237+
// Schedule next iteration only when finished,
238+
// as to avoid multiple tasks running in parallel
239+
ctx.run_later(period, move |act, ctx| {
240+
act.watch_global_status(eth_balance, wit_balance, ctx, period);
241+
});
242+
actix::fut::ready(())
243+
}),
244+
);
245+
}
246+
}
247+
248+
async fn check_eth_account_balance(
249+
eth_jsonrpc_url: &str,
250+
eth_account: H160,
251+
) -> Result<Option<U256>, String> {
252+
let web3_http = web3::transports::Http::new(eth_jsonrpc_url)
253+
.map_err(|_e| "evm-disconnect".to_string())
254+
.unwrap();
255+
256+
let web3 = web3::Web3::new(web3_http);
257+
match web3.eth().syncing().await {
258+
Ok(syncing) => match syncing {
259+
web3::types::SyncState::NotSyncing => {
260+
match web3.eth().balance(eth_account, None).await {
261+
Ok(balance) => Ok(Some(balance)),
262+
_ => Ok(None),
263+
}
264+
}
265+
web3::types::SyncState::Syncing(_) => Err("evm-syncing".to_string()),
266+
},
267+
Err(_e) => Err("evm-errors".to_string()),
268+
}
269+
}
270+
271+
async fn check_wit_connection_status(wit_jsonrpc_socket: &str) -> Result<(), String> {
272+
let (_handle, wit_client) = TcpSocket::new(wit_jsonrpc_socket).unwrap();
273+
let wit_client = Arc::new(wit_client);
274+
let res = wit_client.execute("syncStatus", json!(null));
275+
let res = Compat01As03::new(res);
276+
let res = tokio::time::timeout(Duration::from_secs(5), res).await;
277+
278+
match res {
279+
Ok(Ok(_)) => Ok(()),
280+
Ok(Err(_)) => Err("wit-syncing".to_string()),
281+
Err(_elapse) => Err("wit-disconnect".to_string()),
282+
}
283+
}
284+
285+
async fn fetch_wit_account(wit_client: &Addr<JsonRpcClient>) -> Result<Option<String>, String> {
286+
let req = jsonrpc::Request::method("getPkh").timeout(Duration::from_secs(5));
287+
let res = wit_client.send(req).await;
288+
match res {
289+
Ok(Ok(res)) => match serde_json::from_value::<String>(res) {
290+
Ok(pkh) => Ok(Some(pkh)),
291+
Err(_) => Ok(None),
292+
},
293+
Ok(Err(_)) => Ok(None),
294+
Err(_) => Err("wit-errors-getPkh".to_string()),
295+
}
296+
}
297+
298+
async fn fetch_wit_account_balance(
299+
wit_client: &Addr<JsonRpcClient>,
300+
wit_account: &str,
301+
) -> Result<Option<f64>, String> {
302+
let req = jsonrpc::Request::method("getBalance")
303+
.timeout(Duration::from_secs(5))
304+
.params(vec![wit_account, "true"])
305+
.expect("getBalance wrong params");
306+
307+
let res = wit_client.send(req).await;
308+
let res = match res {
309+
Ok(res) => res,
310+
Err(_) => {
311+
return Err("wit-errors-getBalance".to_string());
312+
}
313+
};
314+
315+
match res {
316+
Ok(value) => match value.get("total") {
317+
Some(value) => match value.as_f64() {
318+
Some(value) => Ok(Some(value / 1000000000.0)),
319+
None => Ok(None),
320+
},
321+
None => Ok(None),
322+
},
323+
Err(_) => Err("wit-errors-getBalance".to_string()),
324+
}
325+
}
326+
327+
async fn fetch_wit_account_count_utxos_above(
328+
wit_client: &Addr<JsonRpcClient>,
329+
wit_account: &str,
330+
threshold: u64,
331+
) -> Result<Option<u64>, String> {
332+
let req = jsonrpc::Request::method("getUtxoInfo")
333+
.timeout(Duration::from_secs(5))
334+
.params(wit_account)
335+
.expect("getUtxoInfo wrong params");
336+
337+
let res = wit_client.send(req).await;
338+
let res = match res {
339+
Ok(res) => res,
340+
Err(_) => {
341+
return Err("wit-errors-getUtxoInfo".to_string());
342+
}
343+
};
344+
345+
match res {
346+
Ok(utxo_info) => {
347+
if let Some(utxos) = utxo_info["utxos"].as_array() {
348+
let mut counter: u64 = u64::default();
349+
for utxo in utxos {
350+
if let Some(value) = utxo["value"].as_u64() {
351+
if value >= threshold {
352+
counter += 1;
353+
}
354+
}
355+
}
356+
357+
Ok(Some(counter))
358+
} else {
359+
Ok(None)
360+
}
361+
}
362+
Err(_) => Err("wit-errors-getUtxoInfo".to_string()),
363+
}
364+
}

0 commit comments

Comments
 (0)