diff --git a/bridges/centralized-ethereum/src/actors/dr_reporter.rs b/bridges/centralized-ethereum/src/actors/dr_reporter.rs index 43dc58749..43b73e266 100644 --- a/bridges/centralized-ethereum/src/actors/dr_reporter.rs +++ b/bridges/centralized-ethereum/src/actors/dr_reporter.rs @@ -102,7 +102,7 @@ pub struct Report { /// Data Request's unique query id as known by the WitnetOracle contract pub dr_id: DrId, /// Timestamp at which the reported result was actually generated - pub dr_timestamp: u64, + pub dr_timestamp: i64, /// Hash of the Data Request Transaction in the Witnet blockchain pub dr_tx_hash: Hash, /// Hash of the Data Request Tally Transaction in the Witnet blockchain diff --git a/bridges/centralized-ethereum/src/actors/dr_sender.rs b/bridges/centralized-ethereum/src/actors/dr_sender.rs index 0f80f64f2..991555d6e 100644 --- a/bridges/centralized-ethereum/src/actors/dr_sender.rs +++ b/bridges/centralized-ethereum/src/actors/dr_sender.rs @@ -189,7 +189,7 @@ impl DrSender { dr_reporter_msgs.push(Report { dr_id, - dr_timestamp: u64::from_ne_bytes(get_timestamp().to_ne_bytes()), + dr_timestamp: i64::from_ne_bytes(get_timestamp().to_ne_bytes()), dr_tx_hash: zero_hash, dr_tally_tx_hash: zero_hash, result, diff --git a/bridges/centralized-ethereum/src/actors/wit_poller.rs b/bridges/centralized-ethereum/src/actors/wit_poller.rs index dbf62486f..ae19567ee 100644 --- a/bridges/centralized-ethereum/src/actors/wit_poller.rs +++ b/bridges/centralized-ethereum/src/actors/wit_poller.rs @@ -4,9 +4,8 @@ use actix::prelude::*; use serde_json::json; use witnet_data_structures::{ - chain::{Block, ConsensusConstants, DataRequestInfo, Epoch, EpochConstants, Hash, Hashable}, - get_protocol_version_activation_epoch, get_protocol_version_period, - proto::versioning::ProtocolVersion::{V1_7, V2_0}, + chain::{Block, ConsensusConstants, DataRequestInfo, Hash, Hashable}, + proto::versioning::ProtocolInfo, }; use witnet_net::client::tcp::{jsonrpc, JsonRpcClient}; use witnet_node::utils::stop_system_if_panicking; @@ -25,6 +24,7 @@ use crate::{ #[derive(Default)] pub struct WitPoller { witnet_client: Option>, + witnet_consensus_constants: Option, witnet_dr_txs_polling_rate_ms: u64, witnet_dr_txs_timeout_ms: u64, } @@ -64,6 +64,7 @@ impl WitPoller { pub fn from_config(config: &Config, node_client: Addr) -> Self { Self { witnet_client: Some(node_client), + witnet_consensus_constants: None, witnet_dr_txs_polling_rate_ms: config.witnet_dr_txs_polling_rate_ms, witnet_dr_txs_timeout_ms: config.witnet_dr_txs_timeout_ms, } @@ -71,9 +72,23 @@ impl WitPoller { fn check_tally_pending_drs(&self, ctx: &mut Context, period: Duration) { let witnet_client = self.witnet_client.clone().unwrap(); + let witnet_consensus_constants = self.witnet_consensus_constants.clone(); let timeout_secs = i64::try_from(self.witnet_dr_txs_timeout_ms / 1000).unwrap(); let fut = async move { + let witnet_consensus_constants = match witnet_consensus_constants { + Some(consensus_constants) => consensus_constants, + None => match get_consensus_constants(witnet_client.clone()).await { + Ok(consensus_constants) => { + log::info!("Protocol consensus constants: {:?}", consensus_constants); + consensus_constants + } + Err(_) => { + return None; + } + }, + }; + let dr_database_addr = DrDatabase::from_registry(); let dr_reporter_addr = DrReporter::from_registry(); let pending_drs = dr_database_addr @@ -84,89 +99,100 @@ impl WitPoller { let current_timestamp = get_timestamp(); let mut dr_reporter_msgs = vec![]; - for (dr_id, dr_bytes, dr_tx_hash, dr_tx_creation_timestamp) in pending_drs { - let method = String::from("dataRequestReport"); - let params = json!([dr_tx_hash]); - let req = jsonrpc::Request::method(method) - .timeout(Duration::from_millis(5_000)) - .params(params) - .expect("params failed serialization"); - let report = witnet_client.send(req).await; - let report = match report { - Ok(report) => report, - Err(_) => { - log::error!("Failed to connect to witnet client, will retry later"); - break; + if !pending_drs.is_empty() { + let witnet_protocol_info = match get_protocol_info(witnet_client.clone()).await { + Ok(x) => x, + Err(()) => { + log::error!("Failed to get current protocol info from witnet client, will retry later"); + return Some(witnet_consensus_constants); } }; + for (dr_id, dr_bytes, dr_tx_hash, dr_tx_creation_timestamp) in pending_drs { + let method = String::from("dataRequestReport"); + let params = json!([dr_tx_hash]); + let req = jsonrpc::Request::method(method) + .timeout(Duration::from_millis(5_000)) + .params(params) + .expect("params failed serialization"); + let report = witnet_client.send(req).await; + let report = match report { + Ok(report) => report, + Err(_) => { + log::error!("Failed to connect to witnet client, will retry later"); + break; + } + }; - if let Ok(report) = report { - match serde_json::from_value::>(report) { - Ok(Some(DataRequestInfo { - tally: Some(tally), - block_hash_dr_tx: Some(dr_block_hash), - current_commit_round: dr_commits_round, - .. - })) => { - log::info!("[{}] <= dr_tx = {}", dr_id, dr_tx_hash); + if let Ok(report) = report { + match serde_json::from_value::>(report) { + Ok(Some(DataRequestInfo { + tally: Some(tally), + block_hash_dr_tx: Some(dr_block_hash), + current_commit_round: dr_commits_round, + .. + })) => { + log::info!("[{}] <= dr_tx = {}", dr_id, dr_tx_hash); - let result = tally.tally.clone(); - // Get timestamp of the epoch at which all data request commit txs - // were incuded in the Witnet blockchain: - let dr_timestamp = match get_dr_timestamp( - witnet_client.clone(), - dr_block_hash, - dr_commits_round, - ) - .await - { - Ok(timestamp) => timestamp, - Err(()) => continue, - }; + let result = tally.tally.clone(); + // Get timestamp of the epoch at which all data request commit txs + // were incuded in the Witnet blockchain: + let dr_timestamp = match get_dr_timestamp( + witnet_client.clone(), + &witnet_consensus_constants, + &witnet_protocol_info, + dr_block_hash, + dr_commits_round, + ) + .await + { + Ok(timestamp) => timestamp, + Err(()) => continue, + }; - dr_reporter_msgs.push(Report { - dr_id, - dr_timestamp, - dr_tx_hash, - dr_tally_tx_hash: tally.hash(), - result, - }); - } - Ok(..) => { - // the data request is being resolved, just not yet - } - Err(e) => { - log::error!( - "[{}] => cannot deserialize dr_tx = {}: {:?}", - dr_id, - dr_tx_hash, - e - ); - } - }; - } else { - log::debug!("[{}] <> dr_tx = {}", dr_id, dr_tx_hash); - } + dr_reporter_msgs.push(Report { + dr_id, + dr_timestamp, + dr_tx_hash, + dr_tally_tx_hash: tally.hash(), + result, + }); + } + Ok(..) => { + // the data request is being resolved, just not yet + } + Err(e) => { + log::error!( + "[{}] => cannot deserialize dr_tx = {}: {:?}", + dr_id, + dr_tx_hash, + e + ); + } + }; + } else { + log::debug!("[{}] <> dr_tx = {}", dr_id, dr_tx_hash); + } - let elapsed_secs = current_timestamp - dr_tx_creation_timestamp; - if elapsed_secs >= timeout_secs { - log::warn!( - "[{}] => will retry new dr_tx after {} secs", - dr_id, - elapsed_secs - ); - DrDatabase::from_registry() - .send(SetDrInfoBridge( + let elapsed_secs = current_timestamp - dr_tx_creation_timestamp; + if elapsed_secs >= timeout_secs { + log::warn!( + "[{}] => will retry new dr_tx after {} secs", dr_id, - DrInfoBridge { - dr_bytes, - dr_state: DrState::New, - dr_tx_hash: None, - dr_tx_creation_timestamp: None, - }, - )) - .await - .unwrap(); + elapsed_secs + ); + DrDatabase::from_registry() + .send(SetDrInfoBridge( + dr_id, + DrInfoBridge { + dr_bytes, + dr_state: DrState::New, + dr_tx_hash: None, + dr_tx_creation_timestamp: None, + }, + )) + .await + .unwrap(); + } } } @@ -176,27 +202,64 @@ impl WitPoller { }) .await .unwrap(); + + Some(witnet_consensus_constants) }; - ctx.spawn(fut.into_actor(self).then(move |(), _act, ctx| { - // Wait until the function finished to schedule next call. - // This avoids tasks running in parallel. - ctx.run_later(period, move |act, ctx| { - // Reschedule check_tally_pending_drs - act.check_tally_pending_drs(ctx, period); - }); + ctx.spawn(fut.into_actor(self).then( + move |witnet_consensus_constants: Option, act, ctx| { + act.witnet_consensus_constants = witnet_consensus_constants; + // Wait until the function finished to schedule next call. + // This avoids tasks running in parallel. + ctx.run_later(period, move |act, ctx| { + // Reschedule check_tally_pending_drs + act.check_tally_pending_drs(ctx, period); + }); - actix::fut::ready(()) - })); + actix::fut::ready(()) + }, + )); } } +/// Get network's consensus constants +async fn get_consensus_constants( + witnet_client: Addr, +) -> Result { + let method = String::from("getConsensusConstants"); + let params = json!(null); + let req = jsonrpc::Request::method(method) + .timeout(Duration::from_millis(5_000)) + .params(params) + .expect("params failed serialization"); + let result = witnet_client.send(req).await; + let result = match result { + Ok(result) => result, + Err(_) => { + log::error!("Failed to connect to witnet client, will retry later"); + return Err(()); + } + }; + let consensus_constants = match result { + Ok(value) => serde_json::from_value::(value) + .expect("failed to deserialize consensus constants"), + Err(e) => { + log::error!("error in getConsensusConstants call: {:?}", e); + return Err(()); + } + }; + + Ok(consensus_constants) +} + /// Return the timestamp of this block hash async fn get_dr_timestamp( witnet_client: Addr, + consensus_constants: &ConsensusConstants, + protocol_info: &ProtocolInfo, drt_block_hash: Hash, dr_commits_round: u16, -) -> Result { +) -> Result { let method = String::from("getBlock"); let params = json!([drt_block_hash]); let req = jsonrpc::Request::method(method) @@ -218,64 +281,54 @@ async fn get_dr_timestamp( return Err(()); } }; - let block_epoch = block.block_header.beacon.checkpoint; - match get_consensus_constants(witnet_client.clone()).await { - Ok(x) => x, - Err(()) => { - log::error!("Failed to get consensus constants from witnet client, will retry later"); - return Err(()); - } + let dr_last_commit_epoch = + block.block_header.beacon.checkpoint + u32::from(dr_commits_round + 1); + let protocol_version = if protocol_info + .all_versions + .get_activation_epoch(protocol_info.current_version) + <= dr_last_commit_epoch + { + protocol_info.current_version + } else { + protocol_info.current_version.prev() }; - let epoch_constants = EpochConstants { - checkpoint_zero_timestamp: get_protocol_version_activation_epoch(V1_7).into(), - checkpoints_period: get_protocol_version_period(V1_7), - checkpoint_zero_timestamp_wit2: get_protocol_version_activation_epoch(V2_0).into(), - checkpoints_period_wit2: get_protocol_version_period(V2_0), - }; - let timestamp = convert_block_epoch_to_timestamp( - epoch_constants, - block_epoch + u32::from(dr_commits_round + 1), - ); + let protocol_activation_timestamp = protocol_info + .derive_activation_timestamp(protocol_version, consensus_constants) + .unwrap_or(consensus_constants.checkpoint_zero_timestamp); + let protocol_activation_epoch = protocol_info + .all_versions + .get_activation_epoch(protocol_version); + let protocol_checkpoint_period = protocol_info + .all_checkpoints_periods + .get(&protocol_version) + .unwrap_or(&45u16); - Ok(timestamp) + Ok(protocol_activation_timestamp + + i64::from((dr_last_commit_epoch - protocol_activation_epoch) * u32::from(*protocol_checkpoint_period))) } -async fn get_consensus_constants( - witnet_client: Addr, -) -> Result { - let method = String::from("getConsensusConstants"); +/// Get current protocol info from the Witnet node +async fn get_protocol_info(witnet_client: Addr) -> Result { + let method = String::from("protocol"); let params = json!(null); let req = jsonrpc::Request::method(method) .timeout(Duration::from_millis(5_000)) .params(params) .expect("params failed serialization"); - let result = witnet_client.send(req).await; - let result = match result { - Ok(result) => result, + let report = witnet_client.send(req).await; + let report = match report { + Ok(report) => report, Err(_) => { log::error!("Failed to connect to witnet client, will retry later"); return Err(()); } }; - let consensus_constants = match result { - Ok(value) => serde_json::from_value::(value) - .expect("failed to deserialize consensus constants"), + match report { + Ok(value) => Ok(serde_json::from_value::(value) + .expect("failed to deserialize protocol info")), Err(e) => { - log::error!("error in getConsensusConstants call: {:?}", e); - return Err(()); + log::error!("Error when getting protocol info: {:?}", e); + Err(()) } - }; - - Ok(consensus_constants) -} - -fn convert_block_epoch_to_timestamp(epoch_constants: EpochConstants, epoch: Epoch) -> u64 { - // In case of error, return timestamp 0 - u64::try_from( - epoch_constants - .epoch_timestamp(epoch) - .unwrap_or((0, false)) - .0, - ) - .expect("Epoch timestamp should return a positive value") + } }