@@ -78,7 +78,7 @@ pub struct QueueCoordinator {
78
78
shared_state : QueueSharedState ,
79
79
local_state : QueueLocalState ,
80
80
publish_token : String ,
81
- visible_settings : VisibilitySettings ,
81
+ visibility_settings : VisibilitySettings ,
82
82
}
83
83
84
84
impl fmt:: Debug for QueueCoordinator {
@@ -102,6 +102,9 @@ impl QueueCoordinator {
102
102
metastore : source_runtime. metastore ,
103
103
source_id : source_runtime. pipeline_id . source_id . clone ( ) ,
104
104
index_uid : source_runtime. pipeline_id . index_uid . clone ( ) ,
105
+ reacquire_grace_period : Duration :: from_secs (
106
+ 2 * source_runtime. indexing_setting . commit_timeout_secs as u64 ,
107
+ ) ,
105
108
} ,
106
109
local_state : QueueLocalState :: default ( ) ,
107
110
pipeline_id : source_runtime. pipeline_id ,
@@ -113,7 +116,7 @@ impl QueueCoordinator {
113
116
message_type,
114
117
publish_lock : PublishLock :: default ( ) ,
115
118
publish_token : Ulid :: new ( ) . to_string ( ) ,
116
- visible_settings : VisibilitySettings :: from_commit_timeout (
119
+ visibility_settings : VisibilitySettings :: from_commit_timeout (
117
120
source_runtime. indexing_setting . commit_timeout_secs ,
118
121
) ,
119
122
}
@@ -157,7 +160,7 @@ impl QueueCoordinator {
157
160
async fn poll_messages ( & mut self , ctx : & SourceContext ) -> Result < ( ) , ActorExitStatus > {
158
161
let raw_messages = self
159
162
. queue_receiver
160
- . receive ( 1 , self . visible_settings . deadline_for_receive )
163
+ . receive ( 1 , self . visibility_settings . deadline_for_receive )
161
164
. await ?;
162
165
163
166
let mut format_errors = Vec :: new ( ) ;
@@ -215,7 +218,7 @@ impl QueueCoordinator {
215
218
self . queue . clone ( ) ,
216
219
message. metadata . ack_id . clone ( ) ,
217
220
message. metadata . initial_deadline ,
218
- self . visible_settings . clone ( ) ,
221
+ self . visibility_settings . clone ( ) ,
219
222
) ,
220
223
content : message,
221
224
position,
@@ -254,7 +257,7 @@ impl QueueCoordinator {
254
257
. await ?;
255
258
if in_progress_ref. batch_reader . is_eof ( ) {
256
259
self . local_state
257
- . drop_currently_read ( self . visible_settings . deadline_for_last_extension )
260
+ . drop_currently_read ( self . visibility_settings . deadline_for_last_extension )
258
261
. await ?;
259
262
self . observable_state . num_messages_processed += 1 ;
260
263
}
@@ -319,7 +322,7 @@ mod tests {
319
322
use crate :: source:: doc_file_reader:: file_test_helpers:: { generate_dummy_doc_file, DUMMY_DOC } ;
320
323
use crate :: source:: queue_sources:: memory_queue:: MemoryQueueForTests ;
321
324
use crate :: source:: queue_sources:: message:: PreProcessedPayload ;
322
- use crate :: source:: queue_sources:: shared_state:: shared_state_for_tests:: shared_state_for_tests ;
325
+ use crate :: source:: queue_sources:: shared_state:: shared_state_for_tests:: init_state ;
323
326
use crate :: source:: { SourceActor , BATCH_NUM_BYTES_LIMIT } ;
324
327
325
328
fn setup_coordinator (
@@ -347,7 +350,7 @@ mod tests {
347
350
source_type : SourceType :: Unspecified ,
348
351
storage_resolver : StorageResolver :: for_test ( ) ,
349
352
publish_token : Ulid :: new ( ) . to_string ( ) ,
350
- visible_settings : VisibilitySettings :: from_commit_timeout ( 5 ) ,
353
+ visibility_settings : VisibilitySettings :: from_commit_timeout ( 5 ) ,
351
354
}
352
355
}
353
356
@@ -401,7 +404,7 @@ mod tests {
401
404
#[ tokio:: test]
402
405
async fn test_process_empty_queue ( ) {
403
406
let queue = Arc :: new ( MemoryQueueForTests :: new ( ) ) ;
404
- let shared_state = shared_state_for_tests ( "test-index" , Default :: default ( ) ) ;
407
+ let shared_state = init_state ( "test-index" , Default :: default ( ) ) ;
405
408
let mut coordinator = setup_coordinator ( queue. clone ( ) , shared_state) ;
406
409
let batches = process_messages ( & mut coordinator, queue, & [ ] ) . await ;
407
410
assert_eq ! ( batches. len( ) , 0 ) ;
@@ -410,7 +413,7 @@ mod tests {
410
413
#[ tokio:: test]
411
414
async fn test_process_one_small_message ( ) {
412
415
let queue = Arc :: new ( MemoryQueueForTests :: new ( ) ) ;
413
- let shared_state = shared_state_for_tests ( "test-index" , Default :: default ( ) ) ;
416
+ let shared_state = init_state ( "test-index" , Default :: default ( ) ) ;
414
417
let mut coordinator = setup_coordinator ( queue. clone ( ) , shared_state. clone ( ) ) ;
415
418
let ( dummy_doc_file, _) = generate_dummy_doc_file ( false , 10 ) . await ;
416
419
let test_uri = Uri :: from_str ( dummy_doc_file. path ( ) . to_str ( ) . unwrap ( ) ) . unwrap ( ) ;
@@ -424,7 +427,7 @@ mod tests {
424
427
#[ tokio:: test]
425
428
async fn test_process_one_big_message ( ) {
426
429
let queue = Arc :: new ( MemoryQueueForTests :: new ( ) ) ;
427
- let shared_state = shared_state_for_tests ( "test-index" , Default :: default ( ) ) ;
430
+ let shared_state = init_state ( "test-index" , Default :: default ( ) ) ;
428
431
let mut coordinator = setup_coordinator ( queue. clone ( ) , shared_state) ;
429
432
let lines = BATCH_NUM_BYTES_LIMIT as usize / DUMMY_DOC . len ( ) + 1 ;
430
433
let ( dummy_doc_file, _) = generate_dummy_doc_file ( true , lines) . await ;
@@ -437,7 +440,7 @@ mod tests {
437
440
#[ tokio:: test]
438
441
async fn test_process_two_messages_different_compression ( ) {
439
442
let queue = Arc :: new ( MemoryQueueForTests :: new ( ) ) ;
440
- let shared_state = shared_state_for_tests ( "test-index" , Default :: default ( ) ) ;
443
+ let shared_state = init_state ( "test-index" , Default :: default ( ) ) ;
441
444
let mut coordinator = setup_coordinator ( queue. clone ( ) , shared_state) ;
442
445
let ( dummy_doc_file_1, _) = generate_dummy_doc_file ( false , 10 ) . await ;
443
446
let test_uri_1 = Uri :: from_str ( dummy_doc_file_1. path ( ) . to_str ( ) . unwrap ( ) ) . unwrap ( ) ;
@@ -456,7 +459,7 @@ mod tests {
456
459
#[ tokio:: test]
457
460
async fn test_process_local_duplicate_message ( ) {
458
461
let queue = Arc :: new ( MemoryQueueForTests :: new ( ) ) ;
459
- let shared_state = shared_state_for_tests ( "test-index" , Default :: default ( ) ) ;
462
+ let shared_state = init_state ( "test-index" , Default :: default ( ) ) ;
460
463
let mut coordinator = setup_coordinator ( queue. clone ( ) , shared_state) ;
461
464
let ( dummy_doc_file, _) = generate_dummy_doc_file ( false , 10 ) . await ;
462
465
let test_uri = Uri :: from_str ( dummy_doc_file. path ( ) . to_str ( ) . unwrap ( ) ) . unwrap ( ) ;
@@ -477,11 +480,15 @@ mod tests {
477
480
let partition_id = PreProcessedPayload :: ObjectUri ( test_uri. clone ( ) ) . partition_id ( ) ;
478
481
479
482
let queue = Arc :: new ( MemoryQueueForTests :: new ( ) ) ;
480
- let shared_state = shared_state_for_tests (
483
+ let shared_state = init_state (
481
484
"test-index" ,
482
485
& [ (
483
486
partition_id. clone ( ) ,
484
- ( "existing_token" . to_string ( ) , Position :: eof ( file_size) ) ,
487
+ (
488
+ "existing_token" . to_string ( ) ,
489
+ Position :: eof ( file_size) ,
490
+ false ,
491
+ ) ,
485
492
) ] ,
486
493
) ;
487
494
let mut coordinator = setup_coordinator ( queue. clone ( ) , shared_state. clone ( ) ) ;
@@ -492,30 +499,82 @@ mod tests {
492
499
assert ! ( coordinator. local_state. is_completed( & partition_id) ) ;
493
500
}
494
501
502
+ #[ tokio:: test]
503
+ async fn test_process_existing_messages ( ) {
504
+ let ( dummy_doc_file_1, _) = generate_dummy_doc_file ( false , 10 ) . await ;
505
+ let test_uri_1 = Uri :: from_str ( dummy_doc_file_1. path ( ) . to_str ( ) . unwrap ( ) ) . unwrap ( ) ;
506
+ let partition_id_1 = PreProcessedPayload :: ObjectUri ( test_uri_1. clone ( ) ) . partition_id ( ) ;
507
+
508
+ let ( dummy_doc_file_2, _) = generate_dummy_doc_file ( false , 10 ) . await ;
509
+ let test_uri_2 = Uri :: from_str ( dummy_doc_file_2. path ( ) . to_str ( ) . unwrap ( ) ) . unwrap ( ) ;
510
+ let partition_id_2 = PreProcessedPayload :: ObjectUri ( test_uri_2. clone ( ) ) . partition_id ( ) ;
511
+
512
+ let ( dummy_doc_file_3, _) = generate_dummy_doc_file ( false , 10 ) . await ;
513
+ let test_uri_3 = Uri :: from_str ( dummy_doc_file_3. path ( ) . to_str ( ) . unwrap ( ) ) . unwrap ( ) ;
514
+ let partition_id_3 = PreProcessedPayload :: ObjectUri ( test_uri_3. clone ( ) ) . partition_id ( ) ;
515
+
516
+ let queue = Arc :: new ( MemoryQueueForTests :: new ( ) ) ;
517
+ let shared_state = init_state (
518
+ "test-index" ,
519
+ & [
520
+ (
521
+ partition_id_1. clone ( ) ,
522
+ ( "existing_token_1" . to_string ( ) , Position :: Beginning , true ) ,
523
+ ) ,
524
+ (
525
+ partition_id_2. clone ( ) ,
526
+ (
527
+ "existing_token_2" . to_string ( ) ,
528
+ Position :: offset ( ( DUMMY_DOC . len ( ) + 1 ) * 2 ) ,
529
+ true ,
530
+ ) ,
531
+ ) ,
532
+ (
533
+ partition_id_3. clone ( ) ,
534
+ (
535
+ "existing_token_3" . to_string ( ) ,
536
+ Position :: offset ( ( DUMMY_DOC . len ( ) + 1 ) * 6 ) ,
537
+ false , // should not be processed because not stale yet
538
+ ) ,
539
+ ) ,
540
+ ] ,
541
+ ) ;
542
+ let mut coordinator = setup_coordinator ( queue. clone ( ) , shared_state. clone ( ) ) ;
543
+ let batches = process_messages (
544
+ & mut coordinator,
545
+ queue,
546
+ & [
547
+ ( & test_uri_1, "ack-id-1" ) ,
548
+ ( & test_uri_2, "ack-id-2" ) ,
549
+ ( & test_uri_3, "ack-id-3" ) ,
550
+ ] ,
551
+ )
552
+ . await ;
553
+ assert_eq ! ( batches. len( ) , 2 ) ;
554
+ assert_eq ! ( batches. iter( ) . map( |b| b. docs. len( ) ) . sum:: <usize >( ) , 18 ) ;
555
+ assert ! ( coordinator. local_state. is_awaiting_commit( & partition_id_1) ) ;
556
+ assert ! ( coordinator. local_state. is_awaiting_commit( & partition_id_2) ) ;
557
+ }
558
+
495
559
#[ tokio:: test]
496
560
async fn test_process_multiple_coordinator ( ) {
497
561
let queue = Arc :: new ( MemoryQueueForTests :: new ( ) ) ;
498
- let shared_state = shared_state_for_tests ( "test-index" , Default :: default ( ) ) ;
499
- let mut proc_1 = setup_coordinator ( queue. clone ( ) , shared_state. clone ( ) ) ;
500
- let mut proc_2 = setup_coordinator ( queue. clone ( ) , shared_state. clone ( ) ) ;
562
+ let shared_state = init_state ( "test-index" , Default :: default ( ) ) ;
563
+ let mut coord_1 = setup_coordinator ( queue. clone ( ) , shared_state. clone ( ) ) ;
564
+ let mut coord_2 = setup_coordinator ( queue. clone ( ) , shared_state. clone ( ) ) ;
501
565
let ( dummy_doc_file, _) = generate_dummy_doc_file ( false , 10 ) . await ;
502
566
let test_uri = Uri :: from_str ( dummy_doc_file. path ( ) . to_str ( ) . unwrap ( ) ) . unwrap ( ) ;
503
567
let partition_id = PreProcessedPayload :: ObjectUri ( test_uri. clone ( ) ) . partition_id ( ) ;
504
568
505
- let batches_1 = process_messages ( & mut proc_1 , queue. clone ( ) , & [ ( & test_uri, "ack1" ) ] ) . await ;
506
- let batches_2 = process_messages ( & mut proc_2 , queue, & [ ( & test_uri, "ack2" ) ] ) . await ;
569
+ let batches_1 = process_messages ( & mut coord_1 , queue. clone ( ) , & [ ( & test_uri, "ack1" ) ] ) . await ;
570
+ let batches_2 = process_messages ( & mut coord_2 , queue, & [ ( & test_uri, "ack2" ) ] ) . await ;
507
571
508
572
assert_eq ! ( batches_1. len( ) , 1 ) ;
509
573
assert_eq ! ( batches_1[ 0 ] . docs. len( ) , 10 ) ;
510
- assert ! ( proc_1. local_state. is_awaiting_commit( & partition_id) ) ;
511
- // proc_2 doesn't know for sure what is happening with the message
512
- // (proc_1 might have crashed), so it just acquires it and takes over
513
- // processing
514
- //
515
- // TODO: this test should fail once we implement the grace
516
- // period before a partition can be re-acquired
517
- assert_eq ! ( batches_2. len( ) , 1 ) ;
518
- assert_eq ! ( batches_2[ 0 ] . docs. len( ) , 10 ) ;
519
- assert ! ( proc_2. local_state. is_awaiting_commit( & partition_id) ) ;
574
+ assert ! ( coord_1. local_state. is_awaiting_commit( & partition_id) ) ;
575
+ // proc_2 learns from shared state that the message is likely still
576
+ // being processed and skips it
577
+ assert_eq ! ( batches_2. len( ) , 0 ) ;
578
+ assert ! ( !coord_2. local_state. is_tracked( & partition_id) ) ;
520
579
}
521
580
}
0 commit comments