Skip to content

Commit

Permalink
Start advertising shards on persist, fetch, or recovery (#5072)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Jun 4, 2024
1 parent 5b90428 commit 6fc5a64
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 37 deletions.
38 changes: 30 additions & 8 deletions quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl BroadcastLocalShardsTask {
.shards
.iter()
.filter_map(|(queue_id, shard)| {
if !shard.is_replica() {
if shard.is_advertisable && !shard.is_replica() {
Some((queue_id.clone(), shard.shard_state))
} else {
None
Expand Down Expand Up @@ -479,22 +479,44 @@ mod tests {
let mut state_guard = state.lock_partially().await.unwrap();

let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
let queue_id_00 = queue_id(&index_uid, "test-source", &ShardId::from(0));
let shard_00 = IngesterShard::new_solo(
ShardState::Open,
Position::Beginning,
Position::Beginning,
Instant::now(),
);
state_guard.shards.insert(queue_id_00.clone(), shard_00);

let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1));
let shard = IngesterShard::new_solo(
let mut shard_01 = IngesterShard::new_solo(
ShardState::Open,
Position::Beginning,
Position::Beginning,
Instant::now(),
);
state_guard.shards.insert(queue_id_01.clone(), shard);
shard_01.is_advertisable = true;
state_guard.shards.insert(queue_id_01.clone(), shard_01);

let rate_limiter = RateLimiter::from_settings(RateLimiterSettings::default());
let rate_meter = RateMeter::default();
let queue_id_02 = queue_id(&index_uid, "test-source", &ShardId::from(2));
let mut shard_02 = IngesterShard::new_replica(
NodeId::from("test-leader"),
ShardState::Open,
Position::Beginning,
Position::Beginning,
Instant::now(),
);
shard_02.is_advertisable = true;
state_guard.shards.insert(queue_id_02.clone(), shard_02);

state_guard
.rate_trackers
.insert(queue_id_01.clone(), (rate_limiter, rate_meter));
for queue_id in [queue_id_00, queue_id_01, queue_id_02] {
let rate_limiter = RateLimiter::from_settings(RateLimiterSettings::default());
let rate_meter = RateMeter::default();

state_guard
.rate_trackers
.insert(queue_id, (rate_limiter, rate_meter));
}
drop(state_guard);

let new_snapshot = task.snapshot_local_shards().await.unwrap();
Expand Down
73 changes: 45 additions & 28 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,6 @@ impl Ingester {
};
return Ok(persist_response);
}

// first verify if we would locally accept each subrequest
{
let mut total_requested_capacity = bytesize::ByteSize::b(0);
Expand All @@ -491,6 +490,11 @@ impl Ingester {
persist_failures.push(persist_failure);
continue;
};
// A router can only know about a newly opened shard if it has been informed by the
// control plane, which confirms that the shard was correctly opened in the
// metastore.
shard.is_advertisable = true;

if shard.is_closed() {
let persist_failure = PersistFailure {
subrequest_id: subrequest.subrequest_id,
Expand Down Expand Up @@ -597,7 +601,6 @@ impl Ingester {
}
}
}

// replicate to the follower
{
let mut replicate_futures = FuturesUnordered::new();
Expand Down Expand Up @@ -846,17 +849,22 @@ impl Ingester {
open_fetch_stream_request: OpenFetchStreamRequest,
) -> IngestV2Result<ServiceStream<IngestV2Result<FetchMessage>>> {
let queue_id = open_fetch_stream_request.queue_id();
let shard_status_rx = self
.state
.lock_partially()
.await?
.shards
.get(&queue_id)
.ok_or_else(|| IngestV2Error::ShardNotFound {
shard_id: open_fetch_stream_request.shard_id().clone(),
})?
.shard_status_rx
.clone();

let mut state_guard = self.state.lock_partially().await?;

let shard =
state_guard
.shards
.get_mut(&queue_id)
.ok_or_else(|| IngestV2Error::ShardNotFound {
shard_id: open_fetch_stream_request.shard_id().clone(),
})?;
// An indexer can only know about a newly opened shard if it has been scheduled by the
// control plane, which confirms that the shard was correctly opened in the
// metastore.
shard.is_advertisable = true;

let shard_status_rx = shard.shard_status_rx.clone();
let mrecordlog = self.state.mrecordlog();
let (service_stream, _fetch_task_handle) = FetchStreamTask::spawn(
open_fetch_stream_request,
Expand Down Expand Up @@ -1478,6 +1486,7 @@ mod tests {
solo_shard_02.assert_is_closed();
solo_shard_02.assert_replication_position(Position::offset(1u64));
solo_shard_02.assert_truncation_position(Position::offset(0u64));
assert!(solo_shard_02.is_advertisable);

state_guard
.mrecordlog
Expand All @@ -1495,21 +1504,32 @@ mod tests {
let mut state_guard = ingester.state.lock_fully().await.unwrap();
let index_uid: IndexUid = IndexUid::for_test("test-index", 0);

let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1));
let shard = IngesterShard::new_solo(
let queue_id_00 = queue_id(&index_uid, "test-source", &ShardId::from(0));
let shard_00 = IngesterShard::new_solo(
ShardState::Open,
Position::Beginning,
Position::Beginning,
Instant::now(),
);
state_guard.shards.insert(queue_id_01.clone(), shard);

let rate_limiter = RateLimiter::from_settings(RateLimiterSettings::default());
let rate_meter = RateMeter::default();
state_guard
.rate_trackers
.insert(queue_id_01.clone(), (rate_limiter, rate_meter));
state_guard.shards.insert(queue_id_00.clone(), shard_00);

let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1));
let mut shard_01 = IngesterShard::new_solo(
ShardState::Open,
Position::Beginning,
Position::Beginning,
Instant::now(),
);
shard_01.is_advertisable = true;
state_guard.shards.insert(queue_id_01.clone(), shard_01);

for queue_id in [&queue_id_00, &queue_id_01] {
let rate_limiter = RateLimiter::from_settings(RateLimiterSettings::default());
let rate_meter = RateMeter::default();
state_guard
.rate_trackers
.insert(queue_id.clone(), (rate_limiter, rate_meter));
}
drop(state_guard);

tokio::time::sleep(Duration::from_millis(100)).await;
Expand Down Expand Up @@ -2548,12 +2568,9 @@ mod tests {
.await
.unwrap();

state_guard
.shards
.get(&queue_id)
.unwrap()
.notify_shard_status();

let shard = state_guard.shards.get(&queue_id).unwrap();
assert!(shard.is_advertisable);
shard.notify_shard_status();
drop(state_guard);

let fetch_response = fetch_stream.next().await.unwrap().unwrap();
Expand Down
15 changes: 15 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ pub(super) struct IngesterShard {
pub replication_position_inclusive: Position,
/// Position up to which the shard has been truncated.
pub truncation_position_inclusive: Position,
/// Whether the shard should be advertised to other nodes (routers) via gossip.
///
/// Because shards are created in multiple steps, (e.g., init shard on leader, create shard in
/// metastore), we must receive a "signal" from the control plane confirming that a shard
/// was successfully opened before advertising it. Currently, this confirmation comes in the
/// form of `PersistRequest` or `FetchRequest`.
pub is_advertisable: bool,
pub shard_status_tx: watch::Sender<ShardStatus>,
pub shard_status_rx: watch::Receiver<ShardStatus>,
/// Instant at which the shard was last written to.
Expand All @@ -65,6 +72,7 @@ impl IngesterShard {
shard_state,
replication_position_inclusive,
truncation_position_inclusive,
is_advertisable: false,
shard_status_tx,
shard_status_rx,
last_write_instant: now,
Expand All @@ -85,6 +93,9 @@ impl IngesterShard {
shard_state,
replication_position_inclusive,
truncation_position_inclusive,
// This is irrelevant for replica shards since they are not advertised via gossip
// anyway.
is_advertisable: false,
shard_status_tx,
shard_status_rx,
last_write_instant: now,
Expand All @@ -104,6 +115,7 @@ impl IngesterShard {
shard_state,
replication_position_inclusive,
truncation_position_inclusive,
is_advertisable: false,
shard_status_tx,
shard_status_rx,
last_write_instant: now,
Expand Down Expand Up @@ -240,6 +252,7 @@ mod tests {
primary_shard.truncation_position_inclusive,
Position::Beginning
);
assert!(!primary_shard.is_advertisable);
}

#[test]
Expand All @@ -265,6 +278,7 @@ mod tests {
replica_shard.truncation_position_inclusive,
Position::Beginning
);
assert!(!replica_shard.is_advertisable);
}

#[test]
Expand All @@ -286,5 +300,6 @@ mod tests {
solo_shard.truncation_position_inclusive,
Position::Beginning
);
assert!(!solo_shard.is_advertisable);
}
}
4 changes: 3 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,14 @@ impl IngesterState {
} else {
Position::offset(*position_range.start() - 1)
};
let solo_shard = IngesterShard::new_solo(
let mut solo_shard = IngesterShard::new_solo(
ShardState::Closed,
replication_position_inclusive,
truncation_position_inclusive,
now,
);
// We want to advertise the shard as read-only right away.
solo_shard.is_advertisable = true;
inner_guard.shards.insert(queue_id.clone(), solo_shard);

let rate_limiter = RateLimiter::from_settings(rate_limiter_settings);
Expand Down

0 comments on commit 6fc5a64

Please sign in to comment.