Skip to content

Commit a84340b

Browse files
committed
Add independent tests on internal state
1 parent 449c6d5 commit a84340b

File tree

1 file changed

+49
-0
lines changed

1 file changed

+49
-0
lines changed

quickwit/quickwit-ingest/src/ingest_v2/publish_tracker.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,55 @@ mod tests {
186186

187187
use super::*;
188188

189+
#[tokio::test]
190+
async fn test_shard_publish_states() {
191+
let mut shard_publish_states = ShardPublishStates::default();
192+
let notifier = Arc::new(Notify::new());
193+
194+
let shard_id_1 = ShardId::from("test-shard-1");
195+
let shard_id_2 = ShardId::from("test-shard-2");
196+
let shard_id_3 = ShardId::from("test-shard-3");
197+
let shard_id_4 = ShardId::from("test-shard-4"); // not tracked
198+
199+
shard_publish_states.shard_tracked(shard_id_1.clone());
200+
shard_publish_states.shard_tracked(shard_id_2.clone());
201+
shard_publish_states.shard_tracked(shard_id_3.clone());
202+
203+
let notifier_receiver = notifier.clone();
204+
let notified_subscription = notifier_receiver.notified();
205+
206+
shard_publish_states.position_persisted(&shard_id_1, &Position::offset(10usize));
207+
assert_eq!(shard_publish_states.awaiting_count, 1);
208+
shard_publish_states.position_persisted(&shard_id_2, &Position::offset(20usize));
209+
assert_eq!(shard_publish_states.awaiting_count, 2);
210+
shard_publish_states.position_published(&shard_id_1, &Position::offset(15usize), &notifier);
211+
assert_eq!(shard_publish_states.awaiting_count, 1);
212+
shard_publish_states.position_published(&shard_id_2, &Position::offset(20usize), &notifier);
213+
assert_eq!(shard_publish_states.awaiting_count, 0);
214+
215+
// check that only the notification that was subscribed before holds a permit
216+
tokio::time::timeout(Duration::from_millis(100), notifier.notified())
217+
.await
218+
.unwrap_err();
219+
tokio::time::timeout(Duration::from_millis(100), notified_subscription)
220+
.await
221+
.unwrap();
222+
223+
let notified_subscription = notifier_receiver.notified();
224+
shard_publish_states.position_published(&shard_id_3, &Position::offset(10usize), &notifier);
225+
assert_eq!(shard_publish_states.awaiting_count, 0);
226+
shard_publish_states.position_persisted(&shard_id_3, &Position::offset(10usize));
227+
assert_eq!(shard_publish_states.awaiting_count, 0);
228+
// no notification expected here as the shard never becomes AwaitingPublish
229+
tokio::time::timeout(Duration::from_millis(100), notified_subscription)
230+
.await
231+
.unwrap_err();
232+
// shard 4 is not tracked
233+
shard_publish_states.position_published(&shard_id_4, &Position::offset(10usize), &notifier);
234+
assert_eq!(shard_publish_states.awaiting_count, 0);
235+
assert!(shard_publish_states.states.get(&shard_id_4).is_none());
236+
}
237+
189238
#[tokio::test]
190239
async fn test_publish_tracker() {
191240
let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0);

0 commit comments

Comments
 (0)