diff --git a/columnar/src/column/dictionary_encoded.rs b/columnar/src/column/dictionary_encoded.rs index f87603ee72..90cdfc1d2a 100644 --- a/columnar/src/column/dictionary_encoded.rs +++ b/columnar/src/column/dictionary_encoded.rs @@ -30,6 +30,13 @@ impl fmt::Debug for BytesColumn { } impl BytesColumn { + pub fn empty(num_docs: u32) -> BytesColumn { + BytesColumn { + dictionary: Arc::new(Dictionary::empty()), + term_ord_column: Column::build_empty_column(num_docs), + } + } + /// Fills the given `output` buffer with the term associated to the ordinal `ord`. /// /// Returns `false` if the term does not exist (e.g. `term_ord` is greater or equal to the @@ -77,7 +84,7 @@ impl From for BytesColumn { } impl StrColumn { - pub(crate) fn wrap(bytes_column: BytesColumn) -> StrColumn { + pub fn wrap(bytes_column: BytesColumn) -> StrColumn { StrColumn(bytes_column) } diff --git a/columnar/src/dynamic_column.rs b/columnar/src/dynamic_column.rs index 267b8f28db..0c566382eb 100644 --- a/columnar/src/dynamic_column.rs +++ b/columnar/src/dynamic_column.rs @@ -247,7 +247,7 @@ impl DynamicColumnHandle { } /// Returns the `u64` fast field reader reader associated with `fields` of types - /// Str, u64, i64, f64, or datetime. + /// Str, u64, i64, f64, bool, or datetime. /// /// If not, the fastfield reader will returns the u64-value associated with the original /// FastValue. @@ -258,9 +258,12 @@ impl DynamicColumnHandle { let column: BytesColumn = crate::column::open_column_bytes(column_bytes)?; Ok(Some(column.term_ord_column)) } - ColumnType::Bool => Ok(None), ColumnType::IpAddr => Ok(None), - ColumnType::I64 | ColumnType::U64 | ColumnType::F64 | ColumnType::DateTime => { + ColumnType::Bool + | ColumnType::I64 + | ColumnType::U64 + | ColumnType::F64 + | ColumnType::DateTime => { let column = crate::column::open_column_u64::(column_bytes)?; Ok(Some(column)) } diff --git a/src/aggregation/agg_req_with_accessor.rs b/src/aggregation/agg_req_with_accessor.rs index 67cd401284..cee1d65990 100644 --- a/src/aggregation/agg_req_with_accessor.rs +++ b/src/aggregation/agg_req_with_accessor.rs @@ -43,6 +43,11 @@ pub struct AggregationWithAccessor { pub(crate) sub_aggregation: AggregationsWithAccessor, pub(crate) limits: ResourceLimitGuard, pub(crate) column_block_accessor: ColumnBlockAccessor, + /// Used for missing term aggregation, which checks all columns for existence. + /// By convention the missing aggregation is chosen, when this property is set + /// (instead bein set in `agg`). + /// If this needs to used by other aggregations, we need to refactor this. + pub(crate) accessors: Vec>, pub(crate) agg: Aggregation, } @@ -54,38 +59,59 @@ impl AggregationWithAccessor { reader: &SegmentReader, limits: AggregationLimits, ) -> crate::Result> { - let mut missing_value_term_agg = None; - let mut str_dict_column = None; + let add_agg_with_accessor = |accessor: Column, + column_type: ColumnType, + aggs: &mut Vec| + -> crate::Result<()> { + let res = AggregationWithAccessor { + accessor, + accessors: Vec::new(), + field_type: column_type, + sub_aggregation: get_aggs_with_segment_accessor_and_validate( + sub_aggregation, + reader, + &limits, + )?, + agg: agg.clone(), + limits: limits.new_guard(), + missing_value_for_accessor: None, + str_dict_column: None, + column_block_accessor: Default::default(), + }; + aggs.push(res); + Ok(()) + }; + + let mut res: Vec = Vec::new(); use AggregationVariants::*; - let acc_field_types: Vec<(Column, ColumnType)> = match &agg.agg { + match &agg.agg { Range(RangeAggregation { field: field_name, .. - }) => vec![get_ff_reader( - reader, - field_name, - Some(get_numeric_or_date_column_types()), - )?], + }) => { + let (accessor, column_type) = + get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?; + add_agg_with_accessor(accessor, column_type, &mut res)?; + } Histogram(HistogramAggregation { field: field_name, .. - }) => vec![get_ff_reader( - reader, - field_name, - Some(get_numeric_or_date_column_types()), - )?], + }) => { + let (accessor, column_type) = + get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?; + add_agg_with_accessor(accessor, column_type, &mut res)?; + } DateHistogram(DateHistogramAggregationReq { field: field_name, .. - }) => vec![get_ff_reader( - reader, - field_name, - Some(get_numeric_or_date_column_types()), - )?], + }) => { + let (accessor, column_type) = + get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?; + add_agg_with_accessor(accessor, column_type, &mut res)?; + } Terms(TermsAggregation { field: field_name, missing, .. }) => { - missing_value_term_agg = missing.clone(); - str_dict_column = reader.fast_fields().str(field_name)?; + let str_dict_column = reader.fast_fields().str(field_name)?; let allowed_column_types = [ ColumnType::I64, ColumnType::U64, @@ -105,12 +131,76 @@ impl AggregationWithAccessor { Key::F64(_) => ColumnType::F64, }) .unwrap_or(ColumnType::U64); - get_all_ff_reader_or_empty( + let column_and_types = get_all_ff_reader_or_empty( reader, field_name, Some(&allowed_column_types), fallback_type, - )? + )?; + let missing_and_more_than_one_col = column_and_types.len() > 1 && missing.is_some(); + let text_on_non_text_col = column_and_types.len() == 1 + && column_and_types[0].1.numerical_type().is_some() + && missing + .as_ref() + .map(|m| matches!(m, Key::Str(_))) + .unwrap_or(false); + + let use_special_missing_agg = missing_and_more_than_one_col || text_on_non_text_col; + if use_special_missing_agg { + let column_and_types = + get_all_ff_reader_or_empty(reader, field_name, None, fallback_type)?; + + let accessors: Vec = + column_and_types.iter().map(|(a, _)| a.clone()).collect(); + let agg_wit_acc = AggregationWithAccessor { + missing_value_for_accessor: None, + accessor: accessors[0].clone(), + accessors, + field_type: ColumnType::U64, + sub_aggregation: get_aggs_with_segment_accessor_and_validate( + sub_aggregation, + reader, + &limits, + )?, + agg: agg.clone(), + str_dict_column: str_dict_column.clone(), + limits: limits.new_guard(), + column_block_accessor: Default::default(), + }; + res.push(agg_wit_acc); + } + + for (accessor, column_type) in column_and_types { + let missing_value_term_agg = if use_special_missing_agg { + None + } else { + missing.clone() + }; + + let missing_value_for_accessor = + if let Some(missing) = missing_value_term_agg.as_ref() { + get_missing_val(column_type, missing, agg.agg.get_fast_field_name())? + } else { + None + }; + + let agg = AggregationWithAccessor { + missing_value_for_accessor, + accessor, + accessors: Vec::new(), + field_type: column_type, + sub_aggregation: get_aggs_with_segment_accessor_and_validate( + sub_aggregation, + reader, + &limits, + )?, + agg: agg.clone(), + str_dict_column: str_dict_column.clone(), + limits: limits.new_guard(), + column_block_accessor: Default::default(), + }; + res.push(agg); + } } Average(AverageAggregation { field: field_name, .. @@ -130,48 +220,21 @@ impl AggregationWithAccessor { | Sum(SumAggregation { field: field_name, .. }) => { - let (accessor, field_type) = + let (accessor, column_type) = get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?; - - vec![(accessor, field_type)] + add_agg_with_accessor(accessor, column_type, &mut res)?; } Percentiles(percentiles) => { - let (accessor, field_type) = get_ff_reader( + let (accessor, column_type) = get_ff_reader( reader, percentiles.field_name(), Some(get_numeric_or_date_column_types()), )?; - vec![(accessor, field_type)] + add_agg_with_accessor(accessor, column_type, &mut res)?; } }; - let aggs: Vec = acc_field_types - .into_iter() - .map(|(accessor, column_type)| { - let missing_value_for_accessor = - if let Some(missing) = missing_value_term_agg.as_ref() { - get_missing_val(column_type, missing, agg.agg.get_fast_field_name())? - } else { - None - }; - - Ok(AggregationWithAccessor { - missing_value_for_accessor, - accessor, - field_type: column_type, - sub_aggregation: get_aggs_with_segment_accessor_and_validate( - sub_aggregation, - reader, - &limits, - )?, - agg: agg.clone(), - str_dict_column: str_dict_column.clone(), - limits: limits.new_guard(), - column_block_accessor: Default::default(), - }) - }) - .collect::>()?; - Ok(aggs) + Ok(res) } } diff --git a/src/aggregation/bucket/mod.rs b/src/aggregation/bucket/mod.rs index 5404935a95..cd6d980cdd 100644 --- a/src/aggregation/bucket/mod.rs +++ b/src/aggregation/bucket/mod.rs @@ -25,6 +25,7 @@ mod histogram; mod range; mod term_agg; +mod term_missing_agg; use std::collections::HashMap; @@ -32,6 +33,7 @@ pub use histogram::*; pub use range::*; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; pub use term_agg::*; +pub use term_missing_agg::*; /// Order for buckets in a bucket aggregation. #[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize, Default)] diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index 5092d7d3a2..0fa977d1d0 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -1,6 +1,6 @@ use std::fmt::Debug; -use columnar::ColumnType; +use columnar::{BytesColumn, ColumnType, StrColumn}; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; @@ -470,8 +470,10 @@ impl SegmentTermCollector { let term_dict = agg_with_accessor .str_dict_column .as_ref() - .expect("internal error: term dictionary not found for term aggregation"); - + .cloned() + .unwrap_or_else(|| { + StrColumn::wrap(BytesColumn::empty(agg_with_accessor.accessor.num_docs())) + }); let mut buffer = String::new(); for (term_id, doc_count) in entries { let intermediate_entry = into_intermediate_bucket_entry(term_id, doc_count)?; @@ -1811,69 +1813,4 @@ mod tests { Ok(()) } - - #[test] - #[ignore] - // TODO: This is not yet implemented - fn terms_aggregation_missing_mixed_type() -> crate::Result<()> { - let mut schema_builder = Schema::builder(); - let json = schema_builder.add_json_field("json", FAST); - let schema = schema_builder.build(); - let index = Index::create_in_ram(schema); - let mut index_writer = index.writer_for_tests().unwrap(); - // => Segment with all values numeric - index_writer - .add_document(doc!(json => json!({"mixed_type": 10.0}))) - .unwrap(); - index_writer.add_document(doc!())?; - index_writer.commit().unwrap(); - //// => Segment with all values text - index_writer - .add_document(doc!(json => json!({"mixed_type": "blue"}))) - .unwrap(); - index_writer.add_document(doc!())?; - index_writer.commit().unwrap(); - - // => Segment with mixed values - index_writer - .add_document(doc!(json => json!({"mixed_type": "red"}))) - .unwrap(); - index_writer - .add_document(doc!(json => json!({"mixed_type": -20.5}))) - .unwrap(); - index_writer - .add_document(doc!(json => json!({"mixed_type": true}))) - .unwrap(); - index_writer.add_document(doc!())?; - - index_writer.commit().unwrap(); - - let agg_req: Aggregations = serde_json::from_value(json!({ - "replace_null": { - "terms": { - "field": "json.mixed_type", - "missing": "NULL" - }, - }, - "replace_num": { - "terms": { - "field": "json.mixed_type", - "missing": 1337 - }, - }, - })) - .unwrap(); - - let res = exec_request_with_query(agg_req, &index, None)?; - - // text field - assert_eq!(res["replace_null"]["buckets"][0]["key"], "NULL"); - assert_eq!(res["replace_null"]["buckets"][0]["doc_count"], 4); // WRONG should be 3 - assert_eq!(res["replace_num"]["buckets"][0]["key"], 1337.0); - assert_eq!(res["replace_num"]["buckets"][0]["doc_count"], 5); // WRONG should be 3 - assert_eq!(res["replace_null"]["sum_other_doc_count"], 0); - assert_eq!(res["replace_null"]["doc_count_error_upper_bound"], 0); - - Ok(()) - } } diff --git a/src/aggregation/bucket/term_missing_agg.rs b/src/aggregation/bucket/term_missing_agg.rs new file mode 100644 index 0000000000..c85d6c4286 --- /dev/null +++ b/src/aggregation/bucket/term_missing_agg.rs @@ -0,0 +1,476 @@ +use rustc_hash::FxHashMap; + +use crate::aggregation::agg_req_with_accessor::AggregationsWithAccessor; +use crate::aggregation::intermediate_agg_result::{ + IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult, + IntermediateKey, IntermediateTermBucketEntry, IntermediateTermBucketResult, +}; +use crate::aggregation::segment_agg_result::{ + build_segment_agg_collector, SegmentAggregationCollector, +}; + +/// The specialized missing term aggregation. +#[derive(Default, Debug, Clone)] +pub struct TermMissingAgg { + missing_count: u32, + accessor_idx: usize, + sub_agg: Option>, +} +impl TermMissingAgg { + pub(crate) fn new( + accessor_idx: usize, + sub_aggregations: &mut AggregationsWithAccessor, + ) -> crate::Result { + let has_sub_aggregations = !sub_aggregations.is_empty(); + let sub_agg = if has_sub_aggregations { + let sub_aggregation = build_segment_agg_collector(sub_aggregations)?; + Some(sub_aggregation) + } else { + None + }; + + Ok(Self { + accessor_idx, + sub_agg, + ..Default::default() + }) + } +} + +impl SegmentAggregationCollector for TermMissingAgg { + fn add_intermediate_aggregation_result( + self: Box, + agg_with_accessor: &AggregationsWithAccessor, + results: &mut IntermediateAggregationResults, + ) -> crate::Result<()> { + let name = agg_with_accessor.aggs.keys[self.accessor_idx].to_string(); + let agg_with_accessor = &agg_with_accessor.aggs.values[self.accessor_idx]; + let term_agg = agg_with_accessor + .agg + .agg + .as_term() + .expect("TermMissingAgg collector must be term agg req"); + let missing = term_agg + .missing + .as_ref() + .expect("TermMissingAgg collector, but no missing found in agg req") + .clone(); + let mut entries: FxHashMap = + Default::default(); + + let mut missing_entry = IntermediateTermBucketEntry { + doc_count: self.missing_count, + sub_aggregation: Default::default(), + }; + if let Some(sub_agg) = self.sub_agg { + let mut res = IntermediateAggregationResults::default(); + sub_agg.add_intermediate_aggregation_result( + &agg_with_accessor.sub_aggregation, + &mut res, + )?; + missing_entry.sub_aggregation = res; + } + + entries.insert(missing.into(), missing_entry); + + let bucket = IntermediateBucketResult::Terms(IntermediateTermBucketResult { + entries, + sum_other_doc_count: 0, + doc_count_error_upper_bound: 0, + }); + + results.push(name, IntermediateAggregationResult::Bucket(bucket))?; + + Ok(()) + } + + fn collect( + &mut self, + doc: crate::DocId, + agg_with_accessor: &mut AggregationsWithAccessor, + ) -> crate::Result<()> { + let agg = &mut agg_with_accessor.aggs.values[self.accessor_idx]; + let has_value = agg.accessors.iter().any(|acc| acc.index.has_value(doc)); + if !has_value { + self.missing_count += 1; + if let Some(sub_agg) = self.sub_agg.as_mut() { + sub_agg.collect(doc, &mut agg.sub_aggregation)?; + } + } + Ok(()) + } + + fn collect_block( + &mut self, + docs: &[crate::DocId], + agg_with_accessor: &mut AggregationsWithAccessor, + ) -> crate::Result<()> { + for doc in docs { + self.collect(*doc, agg_with_accessor)?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use crate::aggregation::agg_req::Aggregations; + use crate::aggregation::tests::exec_request_with_query; + use crate::schema::{Schema, FAST}; + use crate::Index; + + #[test] + fn terms_aggregation_missing_mixed_type_mult_seg_sub_agg() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let json = schema_builder.add_json_field("json", FAST); + let score = schema_builder.add_f64_field("score", FAST); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + let mut index_writer = index.writer_for_tests().unwrap(); + // => Segment with all values numeric + index_writer + .add_document(doc!(score => 1.0, json => json!({"mixed_type": 10.0}))) + .unwrap(); + index_writer.add_document(doc!(score => 5.0))?; + // index_writer.commit().unwrap(); + //// => Segment with all values text + index_writer + .add_document(doc!(score => 1.0, json => json!({"mixed_type": "blue"}))) + .unwrap(); + index_writer.add_document(doc!(score => 5.0))?; + // index_writer.commit().unwrap(); + + // => Segment with mixed values + index_writer.add_document(doc!(json => json!({"mixed_type": "red"})))?; + index_writer.add_document(doc!(json => json!({"mixed_type": -20.5})))?; + index_writer.add_document(doc!(json => json!({"mixed_type": true})))?; + index_writer.add_document(doc!(score => 5.0))?; + + index_writer.commit().unwrap(); + let agg_req: Aggregations = serde_json::from_value(json!({ + "replace_null": { + "terms": { + "field": "json.mixed_type", + "missing": "NULL" + }, + "aggs": { + "sum_score": { + "sum": { + "field": "score" + } + } + } + }, + })) + .unwrap(); + + let res = exec_request_with_query(agg_req, &index, None)?; + + // text field + assert_eq!(res["replace_null"]["buckets"][0]["key"], "NULL"); + assert_eq!(res["replace_null"]["buckets"][0]["doc_count"], 3); + assert_eq!( + res["replace_null"]["buckets"][0]["sum_score"]["value"], + 15.0 + ); + assert_eq!(res["replace_null"]["sum_other_doc_count"], 0); + assert_eq!(res["replace_null"]["doc_count_error_upper_bound"], 0); + + Ok(()) + } + + #[test] + fn terms_aggregation_missing_mixed_type_sub_agg_reg1() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let json = schema_builder.add_json_field("json", FAST); + let score = schema_builder.add_f64_field("score", FAST); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + let mut index_writer = index.writer_for_tests().unwrap(); + // => Segment with all values numeric + index_writer.add_document(doc!(score => 1.0, json => json!({"mixed_type": 10.0})))?; + index_writer.add_document(doc!(score => 5.0))?; + index_writer.add_document(doc!(score => 5.0))?; + + index_writer.commit().unwrap(); + let agg_req: Aggregations = serde_json::from_value(json!({ + "replace_null": { + "terms": { + "field": "json.mixed_type", + "missing": "NULL" + }, + "aggs": { + "sum_score": { + "sum": { + "field": "score" + } + } + } + }, + })) + .unwrap(); + + let res = exec_request_with_query(agg_req, &index, None)?; + + // text field + assert_eq!(res["replace_null"]["buckets"][0]["key"], "NULL"); + assert_eq!(res["replace_null"]["buckets"][0]["doc_count"], 2); + assert_eq!( + res["replace_null"]["buckets"][0]["sum_score"]["value"], + 10.0 + ); + assert_eq!(res["replace_null"]["sum_other_doc_count"], 0); + assert_eq!(res["replace_null"]["doc_count_error_upper_bound"], 0); + + Ok(()) + } + + #[test] + fn terms_aggregation_missing_mult_seg_empty() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let score = schema_builder.add_f64_field("score", FAST); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + let mut index_writer = index.writer_for_tests().unwrap(); + + index_writer.add_document(doc!(score => 5.0))?; + index_writer.commit().unwrap(); + index_writer.add_document(doc!(score => 5.0))?; + index_writer.commit().unwrap(); + index_writer.add_document(doc!(score => 5.0))?; + + index_writer.commit().unwrap(); + let agg_req: Aggregations = serde_json::from_value(json!({ + "replace_null": { + "terms": { + "field": "json.mixed_type", + "missing": "NULL" + }, + "aggs": { + "sum_score": { + "sum": { + "field": "score" + } + } + } + }, + })) + .unwrap(); + + let res = exec_request_with_query(agg_req, &index, None)?; + + // text field + assert_eq!(res["replace_null"]["buckets"][0]["key"], "NULL"); + assert_eq!(res["replace_null"]["buckets"][0]["doc_count"], 3); + assert_eq!( + res["replace_null"]["buckets"][0]["sum_score"]["value"], + 15.0 + ); + assert_eq!(res["replace_null"]["sum_other_doc_count"], 0); + assert_eq!(res["replace_null"]["doc_count_error_upper_bound"], 0); + + Ok(()) + } + + #[test] + fn terms_aggregation_missing_single_seg_empty() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let score = schema_builder.add_f64_field("score", FAST); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + let mut index_writer = index.writer_for_tests().unwrap(); + + index_writer.add_document(doc!(score => 5.0))?; + index_writer.add_document(doc!(score => 5.0))?; + index_writer.add_document(doc!(score => 5.0))?; + + index_writer.commit().unwrap(); + let agg_req: Aggregations = serde_json::from_value(json!({ + "replace_null": { + "terms": { + "field": "json.mixed_type", + "missing": "NULL" + }, + "aggs": { + "sum_score": { + "sum": { + "field": "score" + } + } + } + }, + })) + .unwrap(); + + let res = exec_request_with_query(agg_req, &index, None)?; + + // text field + assert_eq!(res["replace_null"]["buckets"][0]["key"], "NULL"); + assert_eq!(res["replace_null"]["buckets"][0]["doc_count"], 3); + assert_eq!( + res["replace_null"]["buckets"][0]["sum_score"]["value"], + 15.0 + ); + assert_eq!(res["replace_null"]["sum_other_doc_count"], 0); + assert_eq!(res["replace_null"]["doc_count_error_upper_bound"], 0); + + Ok(()) + } + + #[test] + fn terms_aggregation_missing_mixed_type_mult_seg() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let json = schema_builder.add_json_field("json", FAST); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + let mut index_writer = index.writer_for_tests().unwrap(); + // => Segment with all values numeric + index_writer + .add_document(doc!(json => json!({"mixed_type": 10.0}))) + .unwrap(); + index_writer.add_document(doc!())?; + index_writer.commit().unwrap(); + //// => Segment with all values text + index_writer + .add_document(doc!(json => json!({"mixed_type": "blue"}))) + .unwrap(); + index_writer.add_document(doc!())?; + index_writer.commit().unwrap(); + + // => Segment with mixed values + index_writer + .add_document(doc!(json => json!({"mixed_type": "red"}))) + .unwrap(); + index_writer + .add_document(doc!(json => json!({"mixed_type": -20.5}))) + .unwrap(); + index_writer + .add_document(doc!(json => json!({"mixed_type": true}))) + .unwrap(); + index_writer.add_document(doc!())?; + + index_writer.commit().unwrap(); + let agg_req: Aggregations = serde_json::from_value(json!({ + "replace_null": { + "terms": { + "field": "json.mixed_type", + "missing": "NULL" + }, + }, + "replace_num": { + "terms": { + "field": "json.mixed_type", + "missing": 1337 + }, + }, + })) + .unwrap(); + + let res = exec_request_with_query(agg_req, &index, None)?; + + // text field + assert_eq!(res["replace_null"]["buckets"][0]["key"], "NULL"); + assert_eq!(res["replace_null"]["buckets"][0]["doc_count"], 3); + assert_eq!(res["replace_num"]["buckets"][0]["key"], 1337.0); + assert_eq!(res["replace_num"]["buckets"][0]["doc_count"], 3); + assert_eq!(res["replace_null"]["sum_other_doc_count"], 0); + assert_eq!(res["replace_null"]["doc_count_error_upper_bound"], 0); + + Ok(()) + } + + #[test] + fn terms_aggregation_missing_str_on_numeric_field() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let json = schema_builder.add_json_field("json", FAST); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + let mut index_writer = index.writer_for_tests().unwrap(); + // => Segment with all values numeric + index_writer + .add_document(doc!(json => json!({"mixed_type": 10.0}))) + .unwrap(); + index_writer.add_document(doc!())?; + index_writer.add_document(doc!())?; + + index_writer + .add_document(doc!(json => json!({"mixed_type": -20.5}))) + .unwrap(); + index_writer.add_document(doc!())?; + + index_writer.commit().unwrap(); + + let agg_req: Aggregations = serde_json::from_value(json!({ + "replace_null": { + "terms": { + "field": "json.mixed_type", + "missing": "NULL" + }, + }, + })) + .unwrap(); + + let res = exec_request_with_query(agg_req, &index, None)?; + + // text field + assert_eq!(res["replace_null"]["buckets"][0]["key"], "NULL"); + assert_eq!(res["replace_null"]["buckets"][0]["doc_count"], 3); + assert_eq!(res["replace_null"]["sum_other_doc_count"], 0); + assert_eq!(res["replace_null"]["doc_count_error_upper_bound"], 0); + + Ok(()) + } + + #[test] + fn terms_aggregation_missing_mixed_type_one_seg() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let json = schema_builder.add_json_field("json", FAST); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + let mut index_writer = index.writer_for_tests().unwrap(); + // => Segment with all values numeric + index_writer + .add_document(doc!(json => json!({"mixed_type": 10.0}))) + .unwrap(); + index_writer.add_document(doc!())?; + //// => Segment with all values text + index_writer + .add_document(doc!(json => json!({"mixed_type": "blue"}))) + .unwrap(); + index_writer.add_document(doc!())?; + + // => Segment with mixed values + index_writer + .add_document(doc!(json => json!({"mixed_type": "red"}))) + .unwrap(); + index_writer + .add_document(doc!(json => json!({"mixed_type": -20.5}))) + .unwrap(); + index_writer + .add_document(doc!(json => json!({"mixed_type": true}))) + .unwrap(); + index_writer.add_document(doc!())?; + + index_writer.commit().unwrap(); + + let agg_req: Aggregations = serde_json::from_value(json!({ + "replace_null": { + "terms": { + "field": "json.mixed_type", + "missing": "NULL" + }, + }, + })) + .unwrap(); + + let res = exec_request_with_query(agg_req, &index, None)?; + + // text field + assert_eq!(res["replace_null"]["buckets"][0]["key"], "NULL"); + assert_eq!(res["replace_null"]["buckets"][0]["doc_count"], 3); + assert_eq!(res["replace_null"]["sum_other_doc_count"], 0); + assert_eq!(res["replace_null"]["doc_count_error_upper_bound"], 0); + + Ok(()) + } +} diff --git a/src/aggregation/intermediate_agg_result.rs b/src/aggregation/intermediate_agg_result.rs index 4a20611545..209834d516 100644 --- a/src/aggregation/intermediate_agg_result.rs +++ b/src/aggregation/intermediate_agg_result.rs @@ -111,9 +111,6 @@ impl IntermediateAggregationResults { } /// Convert intermediate result and its aggregation request to the final result. - /// - /// Internal function, AggregationsInternal is used instead Aggregations, which is optimized - /// for internal processing, by splitting metric and buckets into separate groups. pub(crate) fn into_final_result_internal( self, req: &Aggregations, @@ -121,7 +118,14 @@ impl IntermediateAggregationResults { ) -> crate::Result { let mut results: FxHashMap = FxHashMap::default(); for (key, agg_res) in self.aggs_res.into_iter() { - let req = req.get(key.as_str()).unwrap(); + let req = req.get(key.as_str()).unwrap_or_else(|| { + panic!( + "Could not find key {:?} in request keys {:?}. This probably means that \ + add_intermediate_aggregation_result passed the wrong agg object.", + key, + req.keys().collect::>() + ) + }); results.insert(key, agg_res.into_final_result(req, limits)?); } // Handle empty results diff --git a/src/aggregation/segment_agg_result.rs b/src/aggregation/segment_agg_result.rs index 853779689f..e575796477 100644 --- a/src/aggregation/segment_agg_result.rs +++ b/src/aggregation/segment_agg_result.rs @@ -15,6 +15,7 @@ use super::metric::{ SegmentPercentilesCollector, SegmentStatsCollector, SegmentStatsType, StatsAggregation, SumAggregation, }; +use crate::aggregation::bucket::TermMissingAgg; pub(crate) trait SegmentAggregationCollector: CollectorClone + Debug { fn add_intermediate_aggregation_result( @@ -80,12 +81,21 @@ pub(crate) fn build_single_agg_segment_collector( ) -> crate::Result> { use AggregationVariants::*; match &req.agg.agg { - Terms(terms_req) => Ok(Box::new(SegmentTermCollector::from_req_and_validate( - terms_req, - &mut req.sub_aggregation, - req.field_type, - accessor_idx, - )?)), + Terms(terms_req) => { + if req.accessors.is_empty() { + Ok(Box::new(SegmentTermCollector::from_req_and_validate( + terms_req, + &mut req.sub_aggregation, + req.field_type, + accessor_idx, + )?)) + } else { + Ok(Box::new(TermMissingAgg::new( + accessor_idx, + &mut req.sub_aggregation, + )?)) + } + } Range(range_req) => Ok(Box::new(SegmentRangeCollector::from_req_and_validate( range_req, &mut req.sub_aggregation, diff --git a/src/fastfield/readers.rs b/src/fastfield/readers.rs index c8e8b1d4a1..6d6f8512ff 100644 --- a/src/fastfield/readers.rs +++ b/src/fastfield/readers.rs @@ -276,7 +276,7 @@ impl FastFieldReaders { } /// Returns the all `u64` column used to represent any `u64`-mapped typed (String/Bytes term - /// ids, i64, u64, f64, DateTime). + /// ids, i64, u64, f64, bool, DateTime). /// /// In case of JSON, there may be two columns. One for term and one for numerical types. (This /// may change later to 3 types if JSON handles DateTime)