Skip to content

Commit dff022b

Browse files
committed
NOBUG Added setting merge policy.
1 parent cefb09c commit dff022b

File tree

6 files changed

+84
-14
lines changed

6 files changed

+84
-14
lines changed

src/indexer/index_writer.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use core::SerializableSegment;
55
use core::Index;
66
use core::Segment;
77
use std::thread::JoinHandle;
8+
use indexer::{MergePolicy, DefaultMergePolicy};
89
use indexer::SegmentWriter;
910
use super::directory_lock::DirectoryLock;
1011
use std::clone::Clone;
@@ -15,6 +16,7 @@ use indexer::merger::IndexMerger;
1516
use core::SegmentId;
1617
use datastruct::stacker::Heap;
1718
use std::mem::swap;
19+
use std::sync::{Arc, Mutex};
1820
use chan;
1921
use core::SegmentMeta;
2022
use super::segment_updater::{SegmentUpdater, SegmentUpdate, SegmentUpdateSender};
@@ -53,6 +55,8 @@ pub struct IndexWriter {
5355
// lifetime of the lock with that of the IndexWriter.
5456
_directory_lock: DirectoryLock,
5557

58+
_merge_policy: Arc<Mutex<Box<MergePolicy>>>,
59+
5660
index: Index,
5761
heap_size_in_bytes_per_thread: usize,
5862

@@ -204,12 +208,16 @@ impl IndexWriter {
204208
let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) =
205209
chan::sync(PIPELINE_MAX_SIZE_IN_DOCS);
206210

207-
let (segment_update_sender, segment_update_thread) = SegmentUpdater::start_updater(index.clone());
211+
let merge_policy: Arc<Mutex<Box<MergePolicy>>> = Arc::new(Mutex::new(box DefaultMergePolicy::default()));
212+
213+
let (segment_update_sender, segment_update_thread) = SegmentUpdater::start_updater(index.clone(), merge_policy.clone());
208214

209215
let mut index_writer = IndexWriter {
210216

211217
_directory_lock: directory_lock,
212218

219+
_merge_policy: merge_policy,
220+
213221
heap_size_in_bytes_per_thread: heap_size_in_bytes_per_thread,
214222
index: index.clone(),
215223

@@ -229,7 +237,18 @@ impl IndexWriter {
229237
try!(index_writer.start_workers());
230238
Ok(index_writer)
231239
}
232-
240+
241+
242+
/// Returns a clone of the index_writer merge policy.
243+
pub fn get_merge_policy(&self) -> Box<MergePolicy> {
244+
self._merge_policy.lock().unwrap().box_clone()
245+
}
246+
247+
/// Set the merge policy.
248+
pub fn set_merge_policy(&self, merge_policy: Box<MergePolicy>) {
249+
*self._merge_policy.lock().unwrap() = merge_policy;
250+
}
251+
233252
fn start_workers(&mut self) -> Result<()> {
234253
for _ in 0..self.num_threads {
235254
try!(self.add_indexing_worker());
@@ -445,6 +464,7 @@ mod tests {
445464
use Index;
446465
use Term;
447466
use Error;
467+
use indexer::NoMergePolicy;
448468

449469
#[test]
450470
fn test_lockfile_stops_duplicates() {
@@ -456,6 +476,17 @@ mod tests {
456476
_ => panic!("Expected FileAlreadyExists error"),
457477
}
458478
}
479+
480+
#[test]
481+
fn test_set_merge_policy() {
482+
let schema_builder = schema::SchemaBuilder::default();
483+
let index = Index::create_in_ram(schema_builder.build());
484+
let index_writer = index.writer(40_000_000).unwrap();
485+
assert_eq!(format!("{:?}", index_writer.get_merge_policy()), "LogMergePolicy { min_merge_size: 8, min_layer_size: 10000, level_log_size: 0.75 }");
486+
let merge_policy = box NoMergePolicy::default();
487+
index_writer.set_merge_policy(merge_policy);
488+
assert_eq!(format!("{:?}", index_writer.get_merge_policy()), "NoMergePolicy");
489+
}
459490

460491
#[test]
461492
fn test_lockfile_released_on_drop() {

src/indexer/log_merge_policy.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ const DEFAULT_LEVEL_LOG_SIZE: f64 = 0.75;
88
const DEFAULT_MIN_LAYER_SIZE: u32 = 10_000;
99
const DEFAULT_MIN_MERGE_SIZE: usize = 8;
1010

11+
12+
/// LogMergePolicy tries tries to merge segments that have a similar number of
13+
/// documents.
14+
#[derive(Debug, Clone)]
1115
pub struct LogMergePolicy {
1216
min_merge_size: usize,
1317
min_layer_size: u32,
@@ -20,7 +24,7 @@ impl LogMergePolicy {
2024
}
2125

2226
/// Set the minimum number of segment that may be merge together.
23-
pub fn set_min_merge_size(&mut self, min_merge_size: usize) {
27+
pub fn set_min_merge_size(&mut self, min_merge_size: usize) {
2428
self.min_merge_size = min_merge_size;
2529
}
2630

@@ -30,7 +34,6 @@ impl LogMergePolicy {
3034
self.min_layer_size = min_layer_size;
3135
}
3236

33-
3437
/// Set the ratio between two consecutive levels.
3538
///
3639
/// Segment are group in levels according to their sizes.
@@ -83,6 +86,10 @@ impl MergePolicy for LogMergePolicy {
8386

8487
result
8588
}
89+
90+
fn box_clone(&self) -> Box<MergePolicy> {
91+
box self.clone()
92+
}
8693
}
8794

8895
impl Default for LogMergePolicy {

src/indexer/merge_policy.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,30 @@
11
use core::SegmentId;
22
use core::SegmentMeta;
33
use std::marker;
4+
use std::fmt::Debug;
45

56

7+
/// Set of segment suggested for a merge.
68
#[derive(Debug, Clone)]
79
pub struct MergeCandidate(pub Vec<SegmentId>);
810

9-
pub trait MergePolicy: marker::Send {
11+
12+
/// The Merge policy defines which segments should be merged.
13+
///
14+
/// Every time a the list of segments changes, the segment updater
15+
/// asks the merge policy if some segments should be merged.
16+
pub trait MergePolicy: marker::Send + Debug {
17+
/// Given the list of segment metas, returns the list of merge candidates.
18+
///
19+
/// This call happens on the segment updater thread, and will block
20+
/// other segment updates, so all implementations should happen rapidly.
1021
fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec<MergeCandidate>;
22+
/// Returns a boxed clone of the MergePolicy.
23+
fn box_clone(&self) -> Box<MergePolicy>;
1124
}
1225

26+
/// Never merge segments.
27+
#[derive(Debug)]
1328
pub struct NoMergePolicy;
1429

1530
impl Default for NoMergePolicy {
@@ -18,10 +33,13 @@ impl Default for NoMergePolicy {
1833
}
1934
}
2035

21-
2236
impl MergePolicy for NoMergePolicy {
2337
fn compute_merge_candidates(&self, _segments: &[SegmentMeta]) -> Vec<MergeCandidate> {
2438
Vec::new()
2539
}
40+
41+
fn box_clone(&self) -> Box<MergePolicy> {
42+
box NoMergePolicy
43+
}
2644
}
2745

src/indexer/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,6 @@ pub use self::log_merge_policy::LogMergePolicy;
1717
pub use self::merge_policy::{NoMergePolicy, MergeCandidate, MergePolicy};
1818
pub use self::segment_manager::SegmentManager;
1919

20+
21+
/// Alias for the default merge policy, which is the LogMergePolicy.
2022
pub type DefaultMergePolicy = LogMergePolicy;

src/indexer/segment_updater.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22

33
use chan;
44
use core::Index;
5+
use std::sync::Mutex;
56
use core::Segment;
67
use core::SegmentId;
78
use core::SegmentMeta;
89
use std::mem;
910
use core::SerializableSegment;
10-
use indexer::{DefaultMergePolicy, MergePolicy};
11+
use indexer::MergePolicy;
1112
use indexer::MergeCandidate;
1213
use indexer::merger::IndexMerger;
1314
use indexer::SegmentSerializer;
@@ -135,20 +136,20 @@ pub struct SegmentUpdater {
135136
segment_update_receiver: SegmentUpdateReceiver,
136137
segment_update_sender: SegmentUpdateSender,
137138
segment_manager_arc: Arc<SegmentManager>,
138-
merge_policy: Box<MergePolicy>,
139+
merge_policy: Arc<Mutex<Box<MergePolicy>>>,
139140
merging_thread_id: usize,
140141
merging_threads: HashMap<usize, JoinHandle<(Vec<SegmentId>, SegmentMeta)> >,
141142
}
142143

143144

144145
impl SegmentUpdater {
145146

146-
pub fn start_updater(index: Index) -> (SegmentUpdateSender, JoinHandle<()>) {
147-
let segment_updater = SegmentUpdater::new(index);
147+
pub fn start_updater(index: Index, merge_policy: Arc<Mutex<Box<MergePolicy>>>) -> (SegmentUpdateSender, JoinHandle<()>) {
148+
let segment_updater = SegmentUpdater::new(index, merge_policy);
148149
(segment_updater.segment_update_sender.clone(), segment_updater.start())
149150
}
150151

151-
fn new(index: Index) -> SegmentUpdater {
152+
fn new(index: Index, merge_policy: Arc<Mutex<Box<MergePolicy>>>) -> SegmentUpdater {
152153
let segment_manager_arc = get_segment_manager(&index);
153154
let (segment_update_sender, segment_update_receiver): (SegmentUpdateSender, SegmentUpdateReceiver) = chan::async();
154155
SegmentUpdater {
@@ -157,7 +158,7 @@ impl SegmentUpdater {
157158
segment_update_sender: segment_update_sender,
158159
segment_update_receiver: segment_update_receiver,
159160
segment_manager_arc: segment_manager_arc,
160-
merge_policy: Box::new(DefaultMergePolicy::default()), // TODO make that configurable
161+
merge_policy: merge_policy,
161162
merging_thread_id: 0,
162163
merging_threads: HashMap::new(),
163164
}
@@ -236,8 +237,9 @@ impl SegmentUpdater {
236237
let (committed_segments, uncommitted_segments) = get_segment_ready_for_commit(segment_manager);
237238
// Committed segments cannot be merged with uncommitted_segments.
238239
// We therefore consider merges using these two sets of segments independantly.
239-
let mut merge_candidates = self.merge_policy.compute_merge_candidates(&uncommitted_segments);
240-
let committed_merge_candidates = self.merge_policy.compute_merge_candidates(&committed_segments);
240+
let merge_policy_lock = self.merge_policy.lock().unwrap();
241+
let mut merge_candidates = merge_policy_lock.compute_merge_candidates(&uncommitted_segments);
242+
let committed_merge_candidates = merge_policy_lock.compute_merge_candidates(&committed_segments);
241243
merge_candidates.extend_from_slice(&committed_merge_candidates[..]);
242244
merge_candidates
243245
}

src/lib.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,16 @@ pub use postings::Postings;
112112
pub use postings::SegmentPostingsOption;
113113

114114

115+
116+
/// Tantivy's makes it possible to personalize when
117+
/// the indexer should merge its segments
118+
pub mod merge_policy {
119+
pub use indexer::MergePolicy;
120+
pub use indexer::LogMergePolicy;
121+
pub use indexer::NoMergePolicy;
122+
pub use indexer::DefaultMergePolicy;
123+
}
124+
115125
/// u32 identifying a document within a segment.
116126
/// Documents have their doc id assigned incrementally,
117127
/// as they are added in the segment.

0 commit comments

Comments
 (0)