Skip to content

Commit d4e2844

Browse files
DaMandal0rianclaude
andcommitted
core: Consolidate duplicate shard assignment logic
Addressed Copilot review feedback to reduce code duplication: - Extract common shard calculation into `calculate_shard_id()` helper method - Both `get_shard_for_deployment()` and `get_or_create_subgraph_state()` now use the same logic - Fix compilation issue by removing non-existent `deployment_id()` method - Use data source name directly for deployment hash calculation - Fix trigger count variable usage to avoid borrow-after-move error This ensures consistent behavior across all shard assignment operations and improves code maintainability. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent bf3121b commit d4e2844

File tree

1 file changed

+30
-23
lines changed

1 file changed

+30
-23
lines changed

core/src/subgraph/trigger_processor.rs

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,13 @@ impl SubgraphTriggerProcessor {
124124
}
125125
}
126126

127-
/// Get or assign a shard for a deployment using consistent hashing
127+
/// Calculate shard ID for a deployment using consistent hashing
128+
fn calculate_shard_id(&self, deployment: &DeploymentHash) -> usize {
129+
let mut hasher = DefaultHasher::new();
130+
deployment.hash(&mut hasher);
131+
(hasher.finish() as usize) % self.semaphores.len()
132+
}
133+
128134
fn get_shard_for_deployment(&self, deployment: &DeploymentHash) -> usize {
129135
// Check if already assigned
130136
{
@@ -134,10 +140,8 @@ impl SubgraphTriggerProcessor {
134140
}
135141
}
136142

137-
// Assign new shard using DefaultHasher
138-
let mut hasher = DefaultHasher::new();
139-
deployment.hash(&mut hasher);
140-
let shard_id = (hasher.finish() as usize) % self.semaphores.len();
143+
// Assign new shard using consistent hashing
144+
let shard_id = self.calculate_shard_id(deployment);
141145

142146
// Track the assignment
143147
let state = SubgraphState {
@@ -165,10 +169,8 @@ impl SubgraphTriggerProcessor {
165169
return state.clone();
166170
}
167171

168-
// Assign new shard using DefaultHasher
169-
let mut hasher = DefaultHasher::new();
170-
deployment.hash(&mut hasher);
171-
let shard_id = (hasher.finish() as usize) % self.semaphores.len();
172+
// Assign new shard using consistent hashing
173+
let shard_id = self.calculate_shard_id(deployment);
172174

173175
// Track the assignment
174176
let state = SubgraphState {
@@ -205,7 +207,6 @@ impl SubgraphTriggerProcessor {
205207
}
206208
}
207209

208-
209210
/// Get comprehensive metrics for monitoring
210211
pub async fn get_metrics(&self) -> HashMap<String, usize> {
211212
let mut metrics = HashMap::new();
@@ -313,16 +314,17 @@ where
313314
}
314315

315316
// Create a synthetic deployment hash from data source name for consistent sharding.
316-
// This ensures triggers from the same data source/subgraph are always routed to
317+
// This ensures triggers from the same data source/subgraph are always routed to
317318
// the same shard, maintaining cache locality.
318319
let data_source_name = triggers[0].host.data_source().name();
319-
// Use a unique fallback for invalid data source names to avoid sharding hotspots.
320-
let deployment_id = triggers[0].host.deployment_id().as_str();
321-
let deployment_hash = DeploymentHash::new(data_source_name)
322-
.unwrap_or_else(|_| {
323-
let fallback = format!("{}_{}", deployment_id, data_source_name);
324-
DeploymentHash::new(&fallback).unwrap()
325-
});
320+
// Use data source name directly for deployment hash
321+
let deployment_hash = DeploymentHash::new(data_source_name).unwrap_or_else(|_| {
322+
// Use a hash of the name as fallback to ensure valid deployment hash
323+
let mut hasher = DefaultHasher::new();
324+
data_source_name.hash(&mut hasher);
325+
let fallback = format!("deployment_{:x}", hasher.finish());
326+
DeploymentHash::new(&fallback).unwrap()
327+
});
326328

327329
// Determine shard assignment
328330
let shard_id = if self.config.enable_sharding {
@@ -344,15 +346,18 @@ where
344346
self.apply_backpressure(logger, &deployment_hash, projected_queue_depth)
345347
.await;
346348

349+
// Save trigger count before moving triggers into async block
350+
let trigger_count = triggers.len();
351+
347352
// Only increment queue depth after backpressure check passes
348353
subgraph_state
349354
.queue_depth
350-
.fetch_add(triggers.len(), Ordering::Relaxed);
355+
.fetch_add(trigger_count, Ordering::Relaxed);
351356

352357
debug!(logger, "Processing triggers";
353358
"deployment" => deployment_hash.to_string(),
354359
"shard" => shard_id,
355-
"trigger_count" => triggers.len(),
360+
"trigger_count" => trigger_count,
356361
"sharding_enabled" => self.config.enable_sharding
357362
);
358363

@@ -422,11 +427,13 @@ where
422427

423428
// Execute processing and ensure queue depth cleanup regardless of outcome
424429
let result = process_result.await;
425-
430+
426431
// Always decrement queue depth by the number of processed triggers
427432
// This ensures cleanup even if processing failed partway through
428-
if !triggers.is_empty() {
429-
subgraph_state.queue_depth.fetch_sub(triggers.len(), Ordering::Relaxed);
433+
if trigger_count > 0 {
434+
subgraph_state
435+
.queue_depth
436+
.fetch_sub(trigger_count, Ordering::Relaxed);
430437
}
431438

432439
result

0 commit comments

Comments
 (0)