Skip to content

Commit 7fe2532

Browse files
committed
graph,node,docs: Improve log store configurability and error handling
Address code review feedback to improve flexibility, debuggability, docs
1 parent 5e3bb06 commit 7fe2532

File tree

6 files changed

+53
-12
lines changed

6 files changed

+53
-12
lines changed

docs/log-store.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,6 @@ query {
567567
```graphql
568568
query {
569569
_logs(
570-
subgraphId: "QmYourSubgraphDeploymentHash"
571570
level: ERROR
572571
from: "2024-01-01T00:00:00Z"
573572
to: "2024-01-31T23:59:59Z"

graph/src/components/log_store/elasticsearch.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,18 @@ pub struct ElasticsearchLogStore {
1616
password: Option<String>,
1717
client: Client,
1818
index: String,
19+
timeout: Duration,
1920
}
2021

2122
impl ElasticsearchLogStore {
22-
pub fn new(config: ElasticLoggingConfig, index: String) -> Self {
23+
pub fn new(config: ElasticLoggingConfig, index: String, timeout: Duration) -> Self {
2324
Self {
2425
endpoint: config.endpoint,
2526
username: config.username,
2627
password: config.password,
2728
client: config.client,
2829
index,
30+
timeout,
2931
}
3032
}
3133

@@ -97,7 +99,7 @@ impl ElasticsearchLogStore {
9799
.client
98100
.post(&url)
99101
.json(&query_body)
100-
.timeout(Duration::from_secs(10));
102+
.timeout(self.timeout);
101103

102104
// Add basic auth if credentials provided
103105
if let (Some(username), Some(password)) = (&self.username, &self.password) {
@@ -112,12 +114,17 @@ impl ElasticsearchLogStore {
112114

113115
if !response.status().is_success() {
114116
let status = response.status();
115-
// Do not include the response body in the error to avoid leaking
116-
// sensitive Elasticsearch internals
117-
return Err(LogStoreError::QueryFailed(anyhow::anyhow!(
118-
"Elasticsearch query failed with status {}",
119-
status
120-
)));
117+
// Include response body in error context for debugging
118+
// The body is part of the error chain but not the main error message to avoid
119+
// leaking sensitive Elasticsearch internals in logs
120+
let body_text = response
121+
.text()
122+
.await
123+
.unwrap_or_else(|_| "<failed to read response body>".to_string());
124+
return Err(LogStoreError::QueryFailed(
125+
anyhow::anyhow!("Elasticsearch query failed with status {}", status)
126+
.context(format!("Response body: {}", body_text)),
127+
));
121128
}
122129

123130
let response_body: ElasticsearchResponse = response.json().await.map_err(|e| {

graph/src/components/log_store/mod.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub enum LogStoreConfig {
3737
username: Option<String>,
3838
password: Option<String>,
3939
index: String,
40+
timeout_secs: u64,
4041
},
4142

4243
/// Loki (Grafana's log aggregation system)
@@ -134,9 +135,11 @@ impl LogStoreFactory {
134135
username,
135136
password,
136137
index,
138+
timeout_secs,
137139
} => {
140+
let timeout = std::time::Duration::from_secs(timeout_secs);
138141
let client = reqwest::Client::builder()
139-
.timeout(std::time::Duration::from_secs(10))
142+
.timeout(timeout)
140143
.build()
141144
.map_err(|e| LogStoreError::InitializationFailed(e.into()))?;
142145

@@ -148,7 +151,7 @@ impl LogStoreFactory {
148151
};
149152

150153
Ok(Arc::new(elasticsearch::ElasticsearchLogStore::new(
151-
config, index,
154+
config, index, timeout,
152155
)))
153156
}
154157

@@ -219,11 +222,21 @@ impl LogStoreFactory {
219222
"subgraph",
220223
);
221224

225+
// Default: 10 seconds query timeout
226+
// Configurable via GRAPH_LOG_STORE_ELASTICSEARCH_TIMEOUT environment variable
227+
let timeout_secs = config::read_u64_with_fallback(
228+
&logger,
229+
"GRAPH_LOG_STORE_ELASTICSEARCH_TIMEOUT",
230+
"GRAPH_ELASTICSEARCH_TIMEOUT",
231+
10,
232+
);
233+
222234
Ok(LogStoreConfig::Elasticsearch {
223235
endpoint,
224236
username,
225237
password,
226238
index,
239+
timeout_secs,
227240
})
228241
}
229242

@@ -264,13 +277,17 @@ impl LogStoreFactory {
264277
})
265278
.map(PathBuf::from)?;
266279

280+
// Default: 100MB per file (104857600 bytes)
281+
// Configurable via GRAPH_LOG_STORE_FILE_MAX_SIZE environment variable
267282
let max_file_size = config::read_u64_with_fallback(
268283
&logger,
269284
"GRAPH_LOG_STORE_FILE_MAX_SIZE",
270285
"GRAPH_LOG_FILE_MAX_SIZE",
271-
100 * 1024 * 1024, // 100MB default
286+
100 * 1024 * 1024,
272287
);
273288

289+
// Default: 30 days retention
290+
// Configurable via GRAPH_LOG_STORE_FILE_RETENTION_DAYS environment variable
274291
let retention_days = config::read_u32_with_fallback(
275292
&logger,
276293
"GRAPH_LOG_STORE_FILE_RETENTION_DAYS",

graph/src/log/factory.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ impl LoggerFactory {
122122
username,
123123
password,
124124
index,
125+
..
125126
}) => {
126127
// Build ElasticLoggingConfig on-demand
127128
let elastic_config = ElasticLoggingConfig {

node/src/launcher.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,12 @@ pub async fn run(
470470
.or_else(|| std::env::var("GRAPH_ELASTIC_SEARCH_INDEX").ok())
471471
.unwrap_or_else(|| "subgraph".to_string());
472472

473+
let timeout_secs = std::env::var("GRAPH_LOG_STORE_ELASTICSEARCH_TIMEOUT")
474+
.or_else(|_| std::env::var("GRAPH_ELASTICSEARCH_TIMEOUT"))
475+
.ok()
476+
.and_then(|s| s.parse::<u64>().ok())
477+
.unwrap_or(10);
478+
473479
graph::components::log_store::LogStoreConfig::Elasticsearch {
474480
endpoint,
475481
username: opt
@@ -481,6 +487,7 @@ pub async fn run(
481487
.clone()
482488
.or_else(|| opt.elasticsearch_password.clone()),
483489
index,
490+
timeout_secs,
484491
}
485492
})
486493
}
@@ -522,12 +529,19 @@ pub async fn run(
522529
.or_else(|| std::env::var("GRAPH_ELASTIC_SEARCH_INDEX").ok())
523530
.unwrap_or_else(|| "subgraph".to_string());
524531

532+
let timeout_secs = std::env::var("GRAPH_LOG_STORE_ELASTICSEARCH_TIMEOUT")
533+
.or_else(|_| std::env::var("GRAPH_ELASTICSEARCH_TIMEOUT"))
534+
.ok()
535+
.and_then(|s| s.parse::<u64>().ok())
536+
.unwrap_or(10);
537+
525538
Some(
526539
graph::components::log_store::LogStoreConfig::Elasticsearch {
527540
endpoint: opt.elasticsearch_url.clone().unwrap(),
528541
username: opt.elasticsearch_user.clone(),
529542
password: opt.elasticsearch_password.clone(),
530543
index,
544+
timeout_secs,
531545
},
532546
)
533547
} else {

node/src/log_config_provider.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ mod tests {
147147
username: Some("user".to_string()),
148148
password: Some("pass".to_string()),
149149
index: "test-index".to_string(),
150+
timeout_secs: 10,
150151
};
151152

152153
let provider = LogStoreConfigProvider::new(LogStoreConfigSources {
@@ -161,6 +162,7 @@ mod tests {
161162
username,
162163
password,
163164
index,
165+
..
164166
}) = config
165167
{
166168
assert_eq!(endpoint, "http://localhost:9200");
@@ -182,6 +184,7 @@ mod tests {
182184
username: None,
183185
password: None,
184186
index: "test-index".to_string(),
187+
timeout_secs: 10,
185188
};
186189

187190
let provider = LogStoreConfigProvider::new(LogStoreConfigSources {

0 commit comments

Comments
 (0)