@@ -27,8 +27,8 @@ const CMD_COMPACT: u8 = 0x02;
27
27
28
28
const DEFAULT_LOG_ITEM_BATCH_CAP : usize = 64 ;
29
29
const MAX_LOG_BATCH_BUFFER_CAP : usize = 8 * 1024 * 1024 ;
30
- // 2GiB, The maximum content length accepted by lz4 compression.
31
- const MAX_LOG_ENTRIES_SIZE_PER_BATCH : usize = i32 :: MAX as usize ;
30
+ // LZ4_MAX_INPUT_SIZE = 0x7E000000
31
+ const MAX_LOG_ENTRIES_SIZE_PER_BATCH : usize = 0x7E000000 ;
32
32
33
33
/// `MessageExt` trait allows for probing log index from a specific type of
34
34
/// protobuf messages.
@@ -421,15 +421,15 @@ impl LogItemBatch {
421
421
422
422
#[ inline]
423
423
pub fn pop_save_point ( & mut self ) {
424
- self . save_points . pop ( ) ;
424
+ self . save_points . pop ( ) . unwrap ( ) ;
425
425
}
426
426
427
427
#[ inline]
428
- pub fn rollback ( & mut self ) {
429
- let ( a, b, c) = self . save_points . last ( ) . unwrap ( ) ;
430
- self . items . truncate ( * a) ;
431
- self . item_size = * b;
432
- self . entries_size = * c;
428
+ pub fn rollback_to_save_point ( & mut self ) {
429
+ let ( a, b, c) = self . save_points . pop ( ) . unwrap ( ) ;
430
+ self . items . truncate ( a) ;
431
+ self . item_size = b;
432
+ self . entries_size = c;
433
433
}
434
434
435
435
pub ( crate ) fn finish_populate ( & mut self , compression_type : CompressionType ) {
@@ -675,15 +675,16 @@ impl LogBatch {
675
675
#[ inline]
676
676
pub fn pop_save_point ( & mut self ) {
677
677
assert ! ( self . buf_state == BufState :: Open ) ;
678
+ self . item_batch . pop_save_point ( ) ;
678
679
self . save_points . pop ( ) ;
679
680
}
680
681
681
682
/// No-op if there's no save point to rollback to.
682
683
#[ inline]
683
- pub fn rollback ( & mut self ) {
684
+ pub fn rollback_to_save_point ( & mut self ) {
684
685
assert ! ( self . buf_state == BufState :: Open ) ;
685
- self . item_batch . rollback ( ) ;
686
- self . buf . truncate ( * self . save_points . last ( ) . unwrap ( ) ) ;
686
+ self . item_batch . rollback_to_save_point ( ) ;
687
+ self . buf . truncate ( self . save_points . pop ( ) . unwrap ( ) ) ;
687
688
}
688
689
689
690
/// Adds some protobuf log entries into the log batch.
@@ -1001,7 +1002,9 @@ fn verify_checksum_with_signature(buf: &[u8], signature: Option<u32>) -> Result<
1001
1002
mod tests {
1002
1003
use super :: * ;
1003
1004
use crate :: pipe_log:: { LogQueue , Version } ;
1004
- use crate :: test_util:: { catch_unwind_silent, generate_entries, generate_entry_indexes_opt} ;
1005
+ use crate :: test_util:: {
1006
+ catch_unwind_silent, generate_entries, generate_entry_indexes_opt, PanicGuard ,
1007
+ } ;
1005
1008
use protobuf:: parse_from_bytes;
1006
1009
use raft:: eraftpb:: Entry ;
1007
1010
use strum:: IntoEnumIterator ;
@@ -1438,6 +1441,57 @@ mod tests {
1438
1441
1439
1442
#[ test]
1440
1443
fn test_save_point ( ) {
1444
+ let ops = [
1445
+ |b : & mut LogBatch | {
1446
+ b. add_entries :: < Entry > ( 1 , & generate_entries ( 1 , 11 , None ) )
1447
+ . unwrap ( )
1448
+ } ,
1449
+ |b : & mut LogBatch | {
1450
+ b. add_entries :: < Entry > ( 7 , & generate_entries ( 1 , 11 , Some ( & vec ! [ b'x' ; 1024 ] ) ) )
1451
+ . unwrap ( )
1452
+ } ,
1453
+ |b : & mut LogBatch | b. add_command ( 17 , Command :: Clean ) ,
1454
+ |b : & mut LogBatch | b. put ( 27 , b"key27" . to_vec ( ) , b"value27" . to_vec ( ) ) ,
1455
+ |b : & mut LogBatch | b. delete ( 37 , b"key37" . to_vec ( ) ) ,
1456
+ |b : & mut LogBatch | b. add_command ( 47 , Command :: Compact { index : 777 } ) ,
1457
+ |b : & mut LogBatch | {
1458
+ b. add_entries :: < Entry > ( 57 , & generate_entries ( 1 , 51 , None ) )
1459
+ . unwrap ( )
1460
+ } ,
1461
+ ] ;
1462
+ for start in 0 ..ops. len ( ) {
1463
+ for stripe in 1 ..=5 {
1464
+ for num in ops. len ( ) ..ops. len ( ) * 5 {
1465
+ for repeat in 1 ..=5 {
1466
+ let _guard = PanicGuard :: with_prompt ( format ! (
1467
+ "case: [{}, {}, {}, {}]" ,
1468
+ start, stripe, num, repeat
1469
+ ) ) ;
1470
+ let mut to_verify = Vec :: new ( ) ;
1471
+ let mut batch = LogBatch :: default ( ) ;
1472
+ let mut op_idx = start;
1473
+ let mut total_op = 0 ;
1474
+ for _ in 0 ..num {
1475
+ for _ in 0 ..repeat {
1476
+ ops[ op_idx % ops. len ( ) ] ( & mut batch) ;
1477
+ to_verify. push ( batch. clone ( ) ) ;
1478
+ batch. set_save_point ( ) ;
1479
+ total_op += 1 ;
1480
+ if total_op % 5 == 0 {
1481
+ to_verify. pop ( ) . unwrap ( ) ;
1482
+ batch. pop_save_point ( ) ;
1483
+ }
1484
+ }
1485
+ op_idx += stripe;
1486
+ }
1487
+ while let Some ( b) = to_verify. pop ( ) {
1488
+ batch. rollback_to_save_point ( ) ;
1489
+ assert_eq ! ( batch, b) ;
1490
+ }
1491
+ }
1492
+ }
1493
+ }
1494
+ }
1441
1495
}
1442
1496
1443
1497
#[ cfg( feature = "nightly" ) ]
0 commit comments