Skip to content

Commit

Permalink
[Keyword search] Index tables and folders in elasticsearch (#9466)
Browse files Browse the repository at this point in the history
* refactor: generic method to index node

* fix search clause

* deny unkn fields

* upsert tables

* upsert folders

* avoid double clone
  • Loading branch information
philipperolet authored Dec 18, 2024
1 parent 61042dd commit b00b771
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 50 deletions.
66 changes: 40 additions & 26 deletions core/bin/core_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2202,21 +2202,27 @@ async fn tables_upsert(
)
.await
{
Ok(table) => (
StatusCode::OK,
Json(APIResponse {
error: None,
response: Some(json!({ "table": table })),
}),
),
Err(e) => {
return error_response(
Ok(table) => match state.search_store.index_table(table.clone()).await {
Ok(_) => (
StatusCode::OK,
Json(APIResponse {
error: None,
response: Some(json!({ "table": table })),
}),
),
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
"Failed to upsert table",
"Failed to index table",
Some(e),
)
}
),
},
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
"Failed to upsert table",
Some(e),
),
}
}

Expand Down Expand Up @@ -2890,23 +2896,29 @@ async fn folders_upsert(
)
.await
{
Err(e) => {
return error_response(
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
"Failed to upsert folder",
Some(e),
),
Ok(folder) => match state.search_store.index_folder(folder.clone()).await {
Ok(_) => (
StatusCode::OK,
Json(APIResponse {
error: None,
response: Some(json!({
"folder": folder
})),
}),
),
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
"Failed to upsert folder",
"Failed to index folder",
Some(e),
)
}
Ok(folder) => (
StatusCode::OK,
Json(APIResponse {
error: None,
response: Some(json!({
"folder": folder
})),
}),
),
),
},
}
}

Expand Down Expand Up @@ -3077,6 +3089,7 @@ async fn folders_delete(
}

#[derive(serde::Deserialize)]
#[serde(deny_unknown_fields)]
struct NodesSearchPayload {
query: String,
filter: Vec<DatasourceViewFilter>,
Expand Down Expand Up @@ -3573,6 +3586,7 @@ fn main() {
post(databases_query_run),
)
.route("/sqlite_workers", delete(sqlite_workers_delete))

// Folders
.route(
"/projects/:project_id/data_sources/:data_source_id/folders",
Expand Down
2 changes: 1 addition & 1 deletion core/src/data_sources/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ impl DataSource {
.await?;

// Upsert document in search index.
search_store.index_document(&document).await?;
search_store.index_document(document.clone()).await?;

// Clean-up old superseded versions.
self.scrub_document_superseded_versions(store, &document_id)
Expand Down
17 changes: 17 additions & 0 deletions core/src/data_sources/folder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use serde::{Deserialize, Serialize};

use super::node::{Node, NodeType};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Folder {
data_source_id: String,
Expand Down Expand Up @@ -51,3 +53,18 @@ impl Folder {
&self.parents
}
}

impl From<Folder> for Node {
fn from(folder: Folder) -> Node {
Node::new(
&folder.data_source_id,
&folder.folder_id,
NodeType::Folder,
folder.timestamp,
&folder.title,
FOLDER_MIMETYPE,
folder.parent_id,
folder.parents,
)
}
}
27 changes: 19 additions & 8 deletions core/src/data_sources/node.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use serde::{Deserialize, Serialize};
use std::fmt;

use super::folder::Folder;

Expand All @@ -9,16 +10,26 @@ pub enum NodeType {
Folder,
}

impl fmt::Display for NodeType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
NodeType::Document => write!(f, "Document"),
NodeType::Table => write!(f, "Table"),
NodeType::Folder => write!(f, "Folder"),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Node {
data_source_id: String,
node_id: String,
node_type: NodeType,
timestamp: u64,
title: String,
mime_type: String,
parent_id: Option<String>,
parents: Vec<String>,
pub data_source_id: String,
pub node_id: String,
pub node_type: NodeType,
pub timestamp: u64,
pub title: String,
pub mime_type: String,
pub parent_id: Option<String>,
pub parents: Vec<String>,
}

impl Node {
Expand Down
53 changes: 38 additions & 15 deletions core/src/search_stores/search_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ use elasticsearch::{
use serde_json::json;
use url::Url;

use crate::data_sources::node::Node;
use crate::{data_sources::data_source::Document, utils};
use crate::{data_sources::node::Node, databases::table::Table};
use crate::{
data_sources::{data_source::Document, folder::Folder},
utils,
};
use tracing::{error, info};

#[derive(serde::Deserialize)]
Expand All @@ -32,7 +35,10 @@ pub trait SearchStore {
filter: Vec<DatasourceViewFilter>,
options: Option<NodesSearchOptions>,
) -> Result<Vec<Node>>;
async fn index_document(&self, document: &Document) -> Result<()>;
async fn index_document(&self, document: Document) -> Result<()>;
async fn index_table(&self, table: Table) -> Result<()>;
async fn index_folder(&self, folder: Folder) -> Result<()>;
async fn index_node(&self, node: Node) -> Result<()>;
fn clone_box(&self) -> Box<dyn SearchStore + Sync + Send>;
}

Expand Down Expand Up @@ -76,33 +82,48 @@ const NODES_INDEX_NAME: &str = "core.data_sources_nodes";

#[async_trait]
impl SearchStore for ElasticsearchSearchStore {
async fn index_document(&self, document: &Document) -> Result<()> {
let node = Node::from(document.clone());
async fn index_document(&self, document: Document) -> Result<()> {
let node = Node::from(document);
self.index_node(node).await
}

async fn index_table(&self, table: Table) -> Result<()> {
let node = Node::from(table);
self.index_node(node).await
}

async fn index_folder(&self, folder: Folder) -> Result<()> {
let node = Node::from(folder);
self.index_node(node).await
}

async fn index_node(&self, node: Node) -> Result<()> {
// todo(kw-search): fail on error
let now = utils::now();
match self
.client
.index(IndexParts::IndexId(NODES_INDEX_NAME, &document.document_id))
.index(IndexParts::IndexId(NODES_INDEX_NAME, &node.node_id))
.timeout("200ms")
.body(node)
.body(node.clone())
.send()
.await
{
Ok(_) => {
info!(
duration = utils::now() - now,
document_id = document.document_id,
"[ElasticsearchSearchStore] Indexed document"
node_id = node.node_id,
"[ElasticsearchSearchStore] Indexed {}",
node.node_type.to_string()
);
Ok(())
}
Err(e) => {
error!(
error = %e,
duration = utils::now() - now,
document_id = document.document_id,
"[ElasticsearchSearchStore] Failed to index document"
node_id = node.node_id,
"[ElasticsearchSearchStore] Failed to index {}",
node.node_type.to_string()
);
Ok(())
}
Expand All @@ -118,12 +139,14 @@ impl SearchStore for ElasticsearchSearchStore {
// First, collect all datasource_ids and their corresponding view_filters
let mut filter_conditions = Vec::new();
for f in filter {
let mut must_clause = Vec::new();
must_clause.push(json!({ "term": { "data_source_id": f.data_source_id } }));
if !f.view_filter.is_empty() {
must_clause.push(json!({ "terms": { "parents": f.view_filter } }));
}
filter_conditions.push(json!({
"bool": {
"must": [
{ "term": { "data_source_id": f.data_source_id } },
{ "terms": { "parents": f.view_filter } }
]
"must": must_clause
}
}));
}
Expand Down

0 comments on commit b00b771

Please sign in to comment.