Skip to content

Commit 148594f

Browse files
authored
Improve IndexWriter customisation via builder (#2562)
* Improve `IndexWriter` customisation via builder * Remove change noise from PR * Correct documentation * Resolve comments and add test
1 parent 8edb439 commit 148594f

File tree

4 files changed

+98
-56
lines changed

4 files changed

+98
-56
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ fastdivide = "0.4.0"
5555
itertools = "0.13.0"
5656
measure_time = "0.9.0"
5757
arc-swap = "1.5.0"
58+
bon = "3.3.1"
5859

5960
columnar = { version = "0.3", path = "./columnar", package = "tantivy-columnar" }
6061
sstable = { version = "0.3", path = "./sstable", package = "tantivy-sstable", optional = true }

src/index/index.rs

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ use crate::directory::MmapDirectory;
1515
use crate::directory::{Directory, ManagedDirectory, RamDirectory, INDEX_WRITER_LOCK};
1616
use crate::error::{DataCorruption, TantivyError};
1717
use crate::index::{IndexMeta, SegmentId, SegmentMeta, SegmentMetaInventory};
18-
use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN};
18+
use crate::indexer::index_writer::{
19+
IndexWriterOptions, MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN,
20+
};
1921
use crate::indexer::segment_updater::save_metas;
2022
use crate::indexer::{IndexWriter, SingleSegmentIndexWriter};
2123
use crate::reader::{IndexReader, IndexReaderBuilder};
@@ -24,8 +26,6 @@ use crate::schema::{Field, FieldType, Schema};
2426
use crate::tokenizer::{TextAnalyzer, TokenizerManager};
2527
use crate::SegmentReader;
2628

27-
const DEFAULT_NUM_MERGE_THREADS: usize = 4;
28-
2929
fn load_metas(
3030
directory: &dyn Directory,
3131
inventory: &SegmentMetaInventory,
@@ -521,30 +521,24 @@ impl Index {
521521
load_metas(self.directory(), &self.inventory)
522522
}
523523

524-
/// Open a new index writer. Attempts to acquire a lockfile.
524+
/// Open a new index writer with the given options. Attempts to acquire a lockfile.
525525
///
526526
/// The lockfile should be deleted on drop, but it is possible
527527
/// that due to a panic or other error, a stale lockfile will be
528528
/// left in the index directory. If you are sure that no other
529529
/// `IndexWriter` on the system is accessing the index directory,
530530
/// it is safe to manually delete the lockfile.
531531
///
532-
/// - `num_threads` defines the number of indexing workers that should work at the same time.
533-
///
534-
/// - `overall_memory_budget_in_bytes` sets the amount of memory allocated for all indexing
535-
/// thread.
536-
///
537-
/// Each thread will receive a budget of `overall_memory_budget_in_bytes / num_threads`.
532+
/// - `options` defines the writer configuration which includes things like buffer sizes,
533+
/// indexer threads, etc...
538534
///
539535
/// # Errors
540-
/// If the lockfile already exists, returns `Error::DirectoryLockBusy` or an `Error::IoError`.
536+
/// If the lockfile already exists, returns `TantivyError::LockFailure`.
541537
/// If the memory arena per thread is too small or too big, returns
542538
/// `TantivyError::InvalidArgument`
543-
pub fn writer_with_num_threads_and_num_merge_threads<D: Document>(
539+
pub fn writer_with_options<D: Document>(
544540
&self,
545-
num_threads: usize,
546-
overall_memory_budget_in_bytes: usize,
547-
num_merge_threads: usize,
541+
options: IndexWriterOptions,
548542
) -> crate::Result<IndexWriter<D>> {
549543
let directory_lock = self
550544
.directory
@@ -560,32 +554,40 @@ impl Index {
560554
),
561555
)
562556
})?;
563-
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads;
564-
IndexWriter::new(
565-
self,
566-
num_threads,
567-
memory_arena_in_bytes_per_thread,
568-
directory_lock,
569-
num_merge_threads,
570-
)
557+
558+
IndexWriter::new(self, options, directory_lock)
571559
}
572560

573-
/// Creates a multithreaded writer with 4 merge threads.
561+
/// Open a new index writer. Attempts to acquire a lockfile.
562+
///
563+
/// The lockfile should be deleted on drop, but it is possible
564+
/// that due to a panic or other error, a stale lockfile will be
565+
/// left in the index directory. If you are sure that no other
566+
/// `IndexWriter` on the system is accessing the index directory,
567+
/// it is safe to manually delete the lockfile.
568+
///
569+
/// - `num_threads` defines the number of indexing workers that should work at the same time.
570+
///
571+
/// - `overall_memory_budget_in_bytes` sets the amount of memory allocated for all indexing
572+
/// thread.
573+
///
574+
/// Each thread will receive a budget of `overall_memory_budget_in_bytes / num_threads`.
574575
///
575576
/// # Errors
576-
/// If the lockfile already exists, returns `Error::FileAlreadyExists`.
577+
/// If the lockfile already exists, returns `Error::DirectoryLockBusy` or an `Error::IoError`.
577578
/// If the memory arena per thread is too small or too big, returns
578579
/// `TantivyError::InvalidArgument`
579580
pub fn writer_with_num_threads<D: Document>(
580581
&self,
581582
num_threads: usize,
582583
overall_memory_budget_in_bytes: usize,
583584
) -> crate::Result<IndexWriter<D>> {
584-
self.writer_with_num_threads_and_num_merge_threads(
585-
num_threads,
586-
overall_memory_budget_in_bytes,
587-
DEFAULT_NUM_MERGE_THREADS,
588-
)
585+
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads;
586+
let options = IndexWriterOptions::builder()
587+
.num_worker_threads(num_threads)
588+
.memory_budget_per_thread(memory_arena_in_bytes_per_thread)
589+
.build();
590+
self.writer_with_options(options)
589591
}
590592

591593
/// Helper to create an index writer for tests.

src/indexer/index_writer.rs

Lines changed: 64 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,23 @@ fn error_in_index_worker_thread(context: &str) -> TantivyError {
4545
))
4646
}
4747

48+
#[derive(Clone, bon::Builder)]
49+
/// A builder for creating a new [IndexWriter] for an index.
50+
pub struct IndexWriterOptions {
51+
#[builder(default = MEMORY_BUDGET_NUM_BYTES_MIN)]
52+
/// The memory budget per indexer thread.
53+
///
54+
/// When an indexer thread has buffered this much data in memory
55+
/// it will flush the segment to disk (although this is not searchable until commit is called.)
56+
memory_budget_per_thread: usize,
57+
#[builder(default = 1)]
58+
/// The number of indexer worker threads to use.
59+
num_worker_threads: usize,
60+
#[builder(default = 4)]
61+
/// Defines the number of merger threads to use.
62+
num_merge_threads: usize,
63+
}
64+
4865
/// `IndexWriter` is the user entry-point to add document to an index.
4966
///
5067
/// It manages a small number of indexing thread, as well as a shared
@@ -58,8 +75,7 @@ pub struct IndexWriter<D: Document = TantivyDocument> {
5875

5976
index: Index,
6077

61-
// The memory budget per thread, after which a commit is triggered.
62-
memory_budget_in_bytes_per_thread: usize,
78+
options: IndexWriterOptions,
6379

6480
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,
6581

@@ -70,9 +86,6 @@ pub struct IndexWriter<D: Document = TantivyDocument> {
7086

7187
worker_id: usize,
7288

73-
num_threads: usize,
74-
num_merge_threads: usize,
75-
7689
delete_queue: DeleteQueue,
7790

7891
stamper: Stamper,
@@ -266,24 +279,27 @@ impl<D: Document> IndexWriter<D> {
266279
/// `TantivyError::InvalidArgument`
267280
pub(crate) fn new(
268281
index: &Index,
269-
num_threads: usize,
270-
memory_budget_in_bytes_per_thread: usize,
282+
options: IndexWriterOptions,
271283
directory_lock: DirectoryLock,
272-
num_merge_threads: usize,
273284
) -> crate::Result<Self> {
274-
if memory_budget_in_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
285+
if options.memory_budget_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
275286
let err_msg = format!(
276287
"The memory arena in bytes per thread needs to be at least \
277288
{MEMORY_BUDGET_NUM_BYTES_MIN}."
278289
);
279290
return Err(TantivyError::InvalidArgument(err_msg));
280291
}
281-
if memory_budget_in_bytes_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
292+
if options.memory_budget_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
282293
let err_msg = format!(
283294
"The memory arena in bytes per thread cannot exceed {MEMORY_BUDGET_NUM_BYTES_MAX}"
284295
);
285296
return Err(TantivyError::InvalidArgument(err_msg));
286297
}
298+
if options.num_worker_threads == 0 {
299+
let err_msg = "At least one worker thread is required, got 0".to_string();
300+
return Err(TantivyError::InvalidArgument(err_msg));
301+
}
302+
287303
let (document_sender, document_receiver) =
288304
crossbeam_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
289305

@@ -297,23 +313,20 @@ impl<D: Document> IndexWriter<D> {
297313
index.clone(),
298314
stamper.clone(),
299315
&delete_queue.cursor(),
300-
num_merge_threads,
316+
options.num_merge_threads,
301317
)?;
302318

303319
let mut index_writer = Self {
304320
_directory_lock: Some(directory_lock),
305321

306-
memory_budget_in_bytes_per_thread,
322+
options: options.clone(),
307323
index: index.clone(),
308324
index_writer_status: IndexWriterStatus::from(document_receiver),
309325
operation_sender: document_sender,
310326

311327
segment_updater,
312328

313329
workers_join_handle: vec![],
314-
num_threads,
315-
316-
num_merge_threads,
317330

318331
delete_queue,
319332

@@ -406,7 +419,7 @@ impl<D: Document> IndexWriter<D> {
406419

407420
let mut delete_cursor = self.delete_queue.cursor();
408421

409-
let mem_budget = self.memory_budget_in_bytes_per_thread;
422+
let mem_budget = self.options.memory_budget_per_thread;
410423
let index = self.index.clone();
411424
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
412425
.name(format!("thrd-tantivy-index{}", self.worker_id))
@@ -459,7 +472,7 @@ impl<D: Document> IndexWriter<D> {
459472
}
460473

461474
fn start_workers(&mut self) -> crate::Result<()> {
462-
for _ in 0..self.num_threads {
475+
for _ in 0..self.options.num_worker_threads {
463476
self.add_indexing_worker()?;
464477
}
465478
Ok(())
@@ -561,13 +574,7 @@ impl<D: Document> IndexWriter<D> {
561574
.take()
562575
.expect("The IndexWriter does not have any lock. This is a bug, please report.");
563576

564-
let new_index_writer = IndexWriter::new(
565-
&self.index,
566-
self.num_threads,
567-
self.memory_budget_in_bytes_per_thread,
568-
directory_lock,
569-
self.num_merge_threads,
570-
)?;
577+
let new_index_writer = IndexWriter::new(&self.index, self.options.clone(), directory_lock)?;
571578

572579
// the current `self` is dropped right away because of this call.
573580
//
@@ -821,7 +828,7 @@ mod tests {
821828
use crate::directory::error::LockError;
822829
use crate::error::*;
823830
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
824-
use crate::indexer::NoMergePolicy;
831+
use crate::indexer::{IndexWriterOptions, NoMergePolicy};
825832
use crate::query::{QueryParser, TermQuery};
826833
use crate::schema::{
827834
self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, JsonObjectOptions,
@@ -2542,4 +2549,36 @@ mod tests {
25422549
index_writer.commit().unwrap();
25432550
Ok(())
25442551
}
2552+
2553+
#[test]
2554+
fn test_writer_options_validation() {
2555+
let mut schema_builder = Schema::builder();
2556+
let field = schema_builder.add_bool_field("example", STORED);
2557+
let index = Index::create_in_ram(schema_builder.build());
2558+
2559+
let opt_wo_threads = IndexWriterOptions::builder().num_worker_threads(0).build();
2560+
let result = index.writer_with_options::<TantivyDocument>(opt_wo_threads);
2561+
assert!(result.is_err(), "Writer should reject 0 thread count");
2562+
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
2563+
2564+
let opt_with_low_memory = IndexWriterOptions::builder()
2565+
.memory_budget_per_thread(10 << 10)
2566+
.build();
2567+
let result = index.writer_with_options::<TantivyDocument>(opt_with_low_memory);
2568+
assert!(
2569+
result.is_err(),
2570+
"Writer should reject options with too low memory size"
2571+
);
2572+
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
2573+
2574+
let opt_with_low_memory = IndexWriterOptions::builder()
2575+
.memory_budget_per_thread(5 << 30)
2576+
.build();
2577+
let result = index.writer_with_options::<TantivyDocument>(opt_with_low_memory);
2578+
assert!(
2579+
result.is_err(),
2580+
"Writer should reject options with too high memory size"
2581+
);
2582+
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
2583+
}
25452584
}

src/indexer/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ mod stamper;
3131
use crossbeam_channel as channel;
3232
use smallvec::SmallVec;
3333

34-
pub use self::index_writer::IndexWriter;
34+
pub use self::index_writer::{IndexWriter, IndexWriterOptions};
3535
pub use self::log_merge_policy::LogMergePolicy;
3636
pub use self::merge_operation::MergeOperation;
3737
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};

0 commit comments

Comments
 (0)