From b00b771bb82ffc11c9c1f89bc2e0a9849ce8e0fe Mon Sep 17 00:00:00 2001 From: Philippe Rolet Date: Wed, 18 Dec 2024 11:10:02 +0100 Subject: [PATCH] [Keyword search] Index tables and folders in elasticsearch (#9466) * refactor: generic method to index node * fix search clause * deny unkn fields * upsert tables * upsert folders * avoid double clone --- core/bin/core_api.rs | 66 ++++++++++++++++---------- core/src/data_sources/data_source.rs | 2 +- core/src/data_sources/folder.rs | 17 +++++++ core/src/data_sources/node.rs | 27 +++++++---- core/src/search_stores/search_store.rs | 53 +++++++++++++++------ 5 files changed, 115 insertions(+), 50 deletions(-) diff --git a/core/bin/core_api.rs b/core/bin/core_api.rs index 0494fda9926d..1cc2716ddf7d 100644 --- a/core/bin/core_api.rs +++ b/core/bin/core_api.rs @@ -2202,21 +2202,27 @@ async fn tables_upsert( ) .await { - Ok(table) => ( - StatusCode::OK, - Json(APIResponse { - error: None, - response: Some(json!({ "table": table })), - }), - ), - Err(e) => { - return error_response( + Ok(table) => match state.search_store.index_table(table.clone()).await { + Ok(_) => ( + StatusCode::OK, + Json(APIResponse { + error: None, + response: Some(json!({ "table": table })), + }), + ), + Err(e) => error_response( StatusCode::INTERNAL_SERVER_ERROR, "internal_server_error", - "Failed to upsert table", + "Failed to index table", Some(e), - ) - } + ), + }, + Err(e) => error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "internal_server_error", + "Failed to upsert table", + Some(e), + ), } } @@ -2890,23 +2896,29 @@ async fn folders_upsert( ) .await { - Err(e) => { - return error_response( + Err(e) => error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "internal_server_error", + "Failed to upsert folder", + Some(e), + ), + Ok(folder) => match state.search_store.index_folder(folder.clone()).await { + Ok(_) => ( + StatusCode::OK, + Json(APIResponse { + error: None, + response: Some(json!({ + "folder": folder + })), + }), + ), + Err(e) => error_response( StatusCode::INTERNAL_SERVER_ERROR, "internal_server_error", - "Failed to upsert folder", + "Failed to index folder", Some(e), - ) - } - Ok(folder) => ( - StatusCode::OK, - Json(APIResponse { - error: None, - response: Some(json!({ - "folder": folder - })), - }), - ), + ), + }, } } @@ -3077,6 +3089,7 @@ async fn folders_delete( } #[derive(serde::Deserialize)] +#[serde(deny_unknown_fields)] struct NodesSearchPayload { query: String, filter: Vec, @@ -3573,6 +3586,7 @@ fn main() { post(databases_query_run), ) .route("/sqlite_workers", delete(sqlite_workers_delete)) + // Folders .route( "/projects/:project_id/data_sources/:data_source_id/folders", diff --git a/core/src/data_sources/data_source.rs b/core/src/data_sources/data_source.rs index 04909ae03867..817287fcc23b 100644 --- a/core/src/data_sources/data_source.rs +++ b/core/src/data_sources/data_source.rs @@ -761,7 +761,7 @@ impl DataSource { .await?; // Upsert document in search index. - search_store.index_document(&document).await?; + search_store.index_document(document.clone()).await?; // Clean-up old superseded versions. self.scrub_document_superseded_versions(store, &document_id) diff --git a/core/src/data_sources/folder.rs b/core/src/data_sources/folder.rs index ddef480f9b5e..360cd56272ee 100644 --- a/core/src/data_sources/folder.rs +++ b/core/src/data_sources/folder.rs @@ -1,5 +1,7 @@ use serde::{Deserialize, Serialize}; +use super::node::{Node, NodeType}; + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Folder { data_source_id: String, @@ -51,3 +53,18 @@ impl Folder { &self.parents } } + +impl From for Node { + fn from(folder: Folder) -> Node { + Node::new( + &folder.data_source_id, + &folder.folder_id, + NodeType::Folder, + folder.timestamp, + &folder.title, + FOLDER_MIMETYPE, + folder.parent_id, + folder.parents, + ) + } +} diff --git a/core/src/data_sources/node.rs b/core/src/data_sources/node.rs index 77bc7c20b587..552cf1d9b484 100644 --- a/core/src/data_sources/node.rs +++ b/core/src/data_sources/node.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Serialize}; +use std::fmt; use super::folder::Folder; @@ -9,16 +10,26 @@ pub enum NodeType { Folder, } +impl fmt::Display for NodeType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + NodeType::Document => write!(f, "Document"), + NodeType::Table => write!(f, "Table"), + NodeType::Folder => write!(f, "Folder"), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Node { - data_source_id: String, - node_id: String, - node_type: NodeType, - timestamp: u64, - title: String, - mime_type: String, - parent_id: Option, - parents: Vec, + pub data_source_id: String, + pub node_id: String, + pub node_type: NodeType, + pub timestamp: u64, + pub title: String, + pub mime_type: String, + pub parent_id: Option, + pub parents: Vec, } impl Node { diff --git a/core/src/search_stores/search_store.rs b/core/src/search_stores/search_store.rs index d6077f3f4e43..27bd47d7f255 100644 --- a/core/src/search_stores/search_store.rs +++ b/core/src/search_stores/search_store.rs @@ -8,8 +8,11 @@ use elasticsearch::{ use serde_json::json; use url::Url; -use crate::data_sources::node::Node; -use crate::{data_sources::data_source::Document, utils}; +use crate::{data_sources::node::Node, databases::table::Table}; +use crate::{ + data_sources::{data_source::Document, folder::Folder}, + utils, +}; use tracing::{error, info}; #[derive(serde::Deserialize)] @@ -32,7 +35,10 @@ pub trait SearchStore { filter: Vec, options: Option, ) -> Result>; - async fn index_document(&self, document: &Document) -> Result<()>; + async fn index_document(&self, document: Document) -> Result<()>; + async fn index_table(&self, table: Table) -> Result<()>; + async fn index_folder(&self, folder: Folder) -> Result<()>; + async fn index_node(&self, node: Node) -> Result<()>; fn clone_box(&self) -> Box; } @@ -76,24 +82,38 @@ const NODES_INDEX_NAME: &str = "core.data_sources_nodes"; #[async_trait] impl SearchStore for ElasticsearchSearchStore { - async fn index_document(&self, document: &Document) -> Result<()> { - let node = Node::from(document.clone()); + async fn index_document(&self, document: Document) -> Result<()> { + let node = Node::from(document); + self.index_node(node).await + } + + async fn index_table(&self, table: Table) -> Result<()> { + let node = Node::from(table); + self.index_node(node).await + } + async fn index_folder(&self, folder: Folder) -> Result<()> { + let node = Node::from(folder); + self.index_node(node).await + } + + async fn index_node(&self, node: Node) -> Result<()> { // todo(kw-search): fail on error let now = utils::now(); match self .client - .index(IndexParts::IndexId(NODES_INDEX_NAME, &document.document_id)) + .index(IndexParts::IndexId(NODES_INDEX_NAME, &node.node_id)) .timeout("200ms") - .body(node) + .body(node.clone()) .send() .await { Ok(_) => { info!( duration = utils::now() - now, - document_id = document.document_id, - "[ElasticsearchSearchStore] Indexed document" + node_id = node.node_id, + "[ElasticsearchSearchStore] Indexed {}", + node.node_type.to_string() ); Ok(()) } @@ -101,8 +121,9 @@ impl SearchStore for ElasticsearchSearchStore { error!( error = %e, duration = utils::now() - now, - document_id = document.document_id, - "[ElasticsearchSearchStore] Failed to index document" + node_id = node.node_id, + "[ElasticsearchSearchStore] Failed to index {}", + node.node_type.to_string() ); Ok(()) } @@ -118,12 +139,14 @@ impl SearchStore for ElasticsearchSearchStore { // First, collect all datasource_ids and their corresponding view_filters let mut filter_conditions = Vec::new(); for f in filter { + let mut must_clause = Vec::new(); + must_clause.push(json!({ "term": { "data_source_id": f.data_source_id } })); + if !f.view_filter.is_empty() { + must_clause.push(json!({ "terms": { "parents": f.view_filter } })); + } filter_conditions.push(json!({ "bool": { - "must": [ - { "term": { "data_source_id": f.data_source_id } }, - { "terms": { "parents": f.view_filter } } - ] + "must": must_clause } })); }