Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add a configurable max timeout for outgoing predicate payload requests #642

Merged
merged 6 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions components/chainhook-cli/src/config/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub struct ConfigFile {
pub storage: StorageConfigFile,
pub pox_config: Option<PoxConfigFile>,
pub http_api: Option<PredicatesApiConfigFile>,
pub predicates: Option<PredicatesConfigFile>,
pub event_source: Option<Vec<EventSourceConfigFile>>,
pub limits: LimitsConfigFile,
pub network: NetworkConfigFile,
Expand Down Expand Up @@ -64,6 +65,11 @@ pub struct NetworkConfigFile {
pub stacks_events_ingestion_port: Option<u16>,
}

#[derive(Deserialize, Debug, Clone)]
pub struct PredicatesConfigFile {
pub payload_http_request_timeout_ms: Option<u64>,
}

#[derive(Deserialize, Debug, Clone)]
#[serde(rename_all = "snake_case")]
pub enum NetworkConfigMode {
Expand Down
23 changes: 22 additions & 1 deletion components/chainhook-cli/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub mod generator;

use chainhook_sdk::chainhooks::types::{ChainhookStore, PoxConfig};
pub use chainhook_sdk::indexer::IndexerConfig;
use chainhook_sdk::observer::EventObserverConfig;
use chainhook_sdk::observer::{EventObserverConfig, PredicatesConfig};
use chainhook_sdk::types::{
BitcoinBlockSignaling, BitcoinNetwork, StacksNetwork, StacksNodeConfig,
};
Expand All @@ -30,6 +30,7 @@ pub struct Config {
pub storage: StorageConfig,
pub pox_config: PoxConfig,
pub http_api: PredicatesApi,
pub predicates: PredicatesConfig,
pub event_sources: Vec<EventSourceConfig>,
pub limits: LimitsConfig,
pub network: IndexerConfig,
Expand Down Expand Up @@ -117,6 +118,9 @@ impl Config {
EventObserverConfig {
bitcoin_rpc_proxy_enabled: true,
registered_chainhooks: ChainhookStore::new(),
predicates_config: PredicatesConfig {
payload_http_request_timeout_ms: self.predicates.payload_http_request_timeout_ms,
},
bitcoind_rpc_username: self.network.bitcoind_rpc_username.clone(),
bitcoind_rpc_password: self.network.bitcoind_rpc_password.clone(),
bitcoind_rpc_url: self.network.bitcoind_rpc_url.clone(),
Expand Down Expand Up @@ -193,6 +197,14 @@ impl Config {
}),
},
},
predicates: match config_file.predicates {
None => PredicatesConfig {
payload_http_request_timeout_ms: None,
},
Some(predicates) => PredicatesConfig {
payload_http_request_timeout_ms: predicates.payload_http_request_timeout_ms,
},
},
event_sources,
limits: LimitsConfig {
max_number_of_stacks_predicates: config_file
Expand Down Expand Up @@ -357,6 +369,9 @@ impl Config {
},
pox_config: PoxConfig::devnet_default(),
http_api: PredicatesApi::Off,
predicates: PredicatesConfig {
payload_http_request_timeout_ms: None,
},
event_sources: vec![],
limits: LimitsConfig {
max_number_of_bitcoin_predicates: BITCOIN_MAX_PREDICATE_REGISTRATION,
Expand Down Expand Up @@ -390,6 +405,9 @@ impl Config {
},
pox_config: PoxConfig::testnet_default(),
http_api: PredicatesApi::Off,
predicates: PredicatesConfig {
payload_http_request_timeout_ms: None,
},
event_sources: vec![EventSourceConfig::StacksTsvUrl(UrlConfig {
file_url: DEFAULT_TESTNET_STACKS_TSV_ARCHIVE.into(),
})],
Expand Down Expand Up @@ -425,6 +443,9 @@ impl Config {
},
pox_config: PoxConfig::mainnet_default(),
http_api: PredicatesApi::Off,
predicates: PredicatesConfig {
payload_http_request_timeout_ms: None,
},
event_sources: vec![EventSourceConfig::StacksTsvUrl(UrlConfig {
file_url: DEFAULT_MAINNET_STACKS_TSV_ARCHIVE.into(),
})],
Expand Down
2 changes: 1 addition & 1 deletion components/chainhook-cli/src/scan/bitcoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ pub async fn execute_predicates_action<'a>(
gather_proofs(&trigger, &mut proofs, config, ctx);
}
let predicate_uuid = &trigger.chainhook.uuid;
match handle_bitcoin_hook_action(trigger, &proofs) {
match handle_bitcoin_hook_action(trigger, &proofs, &config) {
Err(e) => {
warn!(
ctx.expect_logger(),
Expand Down
26 changes: 19 additions & 7 deletions components/chainhook-cli/src/scan/stacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ pub async fn get_canonical_fork_from_tsv(
for result in reader_builder.deserialize() {
line += 1;
let record: Record = result.unwrap();
if let RecordKind::StacksBlockReceived = &record.kind { if let Err(_e) = record_tx.send(Some((record, line))) {
break;
} };
if let RecordKind::StacksBlockReceived = &record.kind {
if let Err(_e) = record_tx.send(Some((record, line))) {
break;
}
};
}
let _ = record_tx.send(None);
})
Expand Down Expand Up @@ -338,7 +340,12 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
apply: hits_per_blocks,
rollback: vec![],
};
let res = match handle_stacks_hook_action(trigger, &proofs, ctx) {
let res = match handle_stacks_hook_action(
trigger,
&proofs,
&config.get_event_observer_config(),
ctx,
) {
Err(e) => {
warn!(
ctx.expect_logger(),
Expand Down Expand Up @@ -487,7 +494,9 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
let mut tsv_line = String::new();
while tsv_current_line < tsv_line_number {
tsv_line.clear();
let bytes_read = tsv_reader.read_line(&mut tsv_line).map_err(|e| e.to_string())?;
let bytes_read = tsv_reader
.read_line(&mut tsv_line)
.map_err(|e| e.to_string())?;
if bytes_read == 0 {
return Err("Unexpected EOF when reading TSV".to_string());
}
Expand Down Expand Up @@ -525,7 +534,8 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
apply: hits_per_blocks,
rollback: vec![],
};
match handle_stacks_hook_action(trigger, &proofs, ctx) {
match handle_stacks_hook_action(trigger, &proofs, &config.get_event_observer_config(), ctx)
{
Err(e) => {
error!(ctx.expect_logger(), "unable to handle action {}", e);
}
Expand Down Expand Up @@ -604,7 +614,9 @@ pub async fn consolidate_local_stacks_chainstate_using_csv(
let mut tsv_line = String::new();
while tsv_current_line < tsv_line_number {
tsv_line.clear();
let bytes_read = tsv_reader.read_line(&mut tsv_line).map_err(|e| e.to_string())?;
let bytes_read = tsv_reader
.read_line(&mut tsv_line)
.map_err(|e| e.to_string())?;
if bytes_read == 0 {
return Err("Unexpected EOF when reading TSV".to_string());
}
Expand Down
26 changes: 11 additions & 15 deletions components/chainhook-cli/src/service/tests/helpers/mock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::service::{
PredicateStatus, Service,
};
use chainhook_sdk::chainhooks::types::PoxConfig;
use chainhook_sdk::observer::PredicatesConfig;
use chainhook_sdk::{
chainhooks::stacks::StacksChainhookSpecificationNetworkMap,
chainhooks::types::{ChainhookInstance, ChainhookSpecificationNetworkMap},
Expand Down Expand Up @@ -82,12 +83,12 @@ pub async fn filter_predicate_status_from_all_predicates(
match matching_predicate {
Some(predicate) => match predicate.get("status") {
Some(status) => {
return serde_json::from_value(status.clone()).map_err(|e| {
format!("failed to parse status {}", e)
});
return serde_json::from_value(status.clone())
.map_err(|e| format!("failed to parse status {}", e));
}
None => {
return Err("no status field on matching get predicates result".to_string())
return Err("no status field on matching get predicates result"
.to_string())
}
},
None => {
Expand All @@ -98,7 +99,9 @@ pub async fn filter_predicate_status_from_all_predicates(
}
}
None => {
return Err("failed to parse get predicate response's result field".to_string())
return Err(
"failed to parse get predicate response's result field".to_string()
)
}
},
None => {
Expand Down Expand Up @@ -267,10 +270,7 @@ pub fn flush_redis(port: u16) {
let client = redis::Client::open(format!("redis://localhost:{port}/"))
.expect("unable to connect to redis");
let mut predicate_db_conn = client.get_connection().expect("unable to connect to redis");
let db_keys: Vec<String> = predicate_db_conn
.scan_match("*")
.unwrap()
.collect();
let db_keys: Vec<String> = predicate_db_conn.scan_match("*").unwrap().collect();
for k in db_keys {
predicate_db_conn.del::<_, ()>(&k).unwrap();
}
Expand All @@ -293,6 +293,7 @@ pub fn get_chainhook_config(
};
Config {
http_api: PredicatesApi::On(api_config),
predicates: PredicatesConfig::default(),
pox_config: PoxConfig::devnet_default(),
storage: StorageConfig {
working_dir: working_dir.into(),
Expand Down Expand Up @@ -343,12 +344,7 @@ pub async fn start_chainhook_service(
);
let _ = hiro_system_kit::nestable_block_on(future);
})
.map_err(|e| {
format!(
"failed to start chainhook service thread, {}",
e
)
})?;
.map_err(|e| format!("failed to start chainhook service thread, {}", e))?;

// Loop to check if the server is ready
let mut attempts = 0;
Expand Down
3 changes: 2 additions & 1 deletion components/chainhook-cli/src/service/tests/observer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{sync::mpsc::channel, thread::sleep, time::Duration};

use chainhook_sdk::{
chainhooks::types::ChainhookStore,
observer::{start_event_observer, EventObserverConfig},
observer::{start_event_observer, EventObserverConfig, PredicatesConfig},
types::{BitcoinNetwork, StacksNodeConfig},
utils::Context,
};
Expand Down Expand Up @@ -190,6 +190,7 @@ async fn it_responds_200_for_unimplemented_endpoints(
});
let config = EventObserverConfig {
registered_chainhooks: ChainhookStore::new(),
predicates_config: PredicatesConfig::default(),
bitcoin_rpc_proxy_enabled: false,
bitcoind_rpc_username: String::new(),
bitcoind_rpc_password: String::new(),
Expand Down
11 changes: 8 additions & 3 deletions components/chainhook-sdk/src/chainhooks/bitcoin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::types::{
append_error_context, validate_txid, ChainhookInstance, ExactMatchingRule, HookAction,
MatchingRule, PoxConfig, TxinPredicate,
};
use crate::utils::{Context, MAX_BLOCK_HEIGHTS_ENTRIES};
use crate::{observer::EventObserverConfig, utils::{Context, MAX_BLOCK_HEIGHTS_ENTRIES}};

use bitcoincore_rpc_json::bitcoin::{address::Payload, Address};
use chainhook_types::{
Expand All @@ -21,7 +21,7 @@ use serde::{de, Deserialize, Deserializer};
use serde_json::Value as JsonValue;
use std::{
collections::{BTreeMap, HashMap, HashSet},
str::FromStr,
str::FromStr, time::Duration,
};

use reqwest::RequestBuilder;
Expand Down Expand Up @@ -760,10 +760,15 @@ pub fn serialize_bitcoin_transactions_to_json(
pub fn handle_bitcoin_hook_action<'a>(
trigger: BitcoinTriggerChainhook<'a>,
proofs: &HashMap<&'a TransactionIdentifier, String>,
config: &EventObserverConfig,
) -> Result<BitcoinChainhookOccurrence, String> {
match &trigger.chainhook.action {
HookAction::HttpPost(http) => {
let client = Client::builder()
let mut client_builder = Client::builder();
if let Some(timeout) = config.predicates_config.payload_http_request_timeout_ms {
client_builder = client_builder.timeout(Duration::from_millis(timeout));
}
let client = client_builder
.build()
.map_err(|e| format!("unable to build http client: {}", e))?;
let host = http.url.to_string();
Expand Down
9 changes: 8 additions & 1 deletion components/chainhook-sdk/src/chainhooks/stacks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::observer::EventObserverConfig;
use crate::utils::{AbstractStacksBlock, Context, MAX_BLOCK_HEIGHTS_ENTRIES};

use super::types::{
Expand All @@ -22,6 +23,7 @@ use schemars::JsonSchema;
use serde_json::Value as JsonValue;
use std::collections::{BTreeMap, HashMap};
use std::io::Cursor;
use std::time::Duration;

use reqwest::RequestBuilder;

Expand Down Expand Up @@ -1325,11 +1327,16 @@ pub fn serialize_stacks_payload_to_json<'a>(
pub fn handle_stacks_hook_action<'a>(
trigger: StacksTriggerChainhook<'a>,
proofs: &HashMap<&'a TransactionIdentifier, String>,
config: &EventObserverConfig,
ctx: &Context,
) -> Result<StacksChainhookOccurrence, String> {
match &trigger.chainhook.action {
HookAction::HttpPost(http) => {
let client = Client::builder()
let mut client_builder = Client::builder();
if let Some(timeout) = config.predicates_config.payload_http_request_timeout_ms {
client_builder = client_builder.timeout(Duration::from_millis(timeout));
}
let client = client_builder
.build()
.map_err(|e| format!("unable to build http client: {}", e))?;
let host = http.url.to_string();
Expand Down
12 changes: 9 additions & 3 deletions components/chainhook-sdk/src/chainhooks/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ use super::{
},
types::{ExactMatchingRule, FileHook},
};
use crate::{chainhooks::stacks::serialize_stacks_payload_to_json, utils::Context};
use crate::{
chainhooks::stacks::serialize_stacks_payload_to_json,
observer::EventObserverConfig,
utils::Context,
};
use crate::{
chainhooks::{
tests::fixtures::{get_expected_occurrence, get_test_event_payload_by_type},
Expand Down Expand Up @@ -735,7 +739,8 @@ fn test_stacks_hook_action_noop() {
logger: None,
tracer: false,
};
let occurrence = handle_stacks_hook_action(trigger, &proofs, &ctx).unwrap();
let occurrence =
handle_stacks_hook_action(trigger, &proofs, &EventObserverConfig::default(), &ctx).unwrap();
if let StacksChainhookOccurrence::Data(data) = occurrence {
assert_eq!(data.apply.len(), 1);
assert_eq!(
Expand Down Expand Up @@ -812,7 +817,8 @@ fn test_stacks_hook_action_file_append() {
logger: None,
tracer: false,
};
let occurrence = handle_stacks_hook_action(trigger, &proofs, &ctx).unwrap();
let occurrence =
handle_stacks_hook_action(trigger, &proofs, &EventObserverConfig::default(), &ctx).unwrap();
if let StacksChainhookOccurrence::File(path, bytes) = occurrence {
assert_eq!(path, "./".to_string());
let json: JsonValue = serde_json::from_slice(&bytes).unwrap();
Expand Down
Loading
Loading