diff --git a/core/bin/core_api.rs b/core/bin/core_api.rs index 3a8156f2fe54..e40a5f562fb5 100644 --- a/core/bin/core_api.rs +++ b/core/bin/core_api.rs @@ -35,6 +35,7 @@ use dust::{ blocks::block::BlockType, data_sources::{ data_source::{self, Section}, + node::Node, qdrant::QdrantClients, }, databases::{ @@ -2240,7 +2241,11 @@ async fn tables_upsert( ) .await { - Ok(table) => match state.search_store.index_table(table.clone()).await { + Ok(table) => match state + .search_store + .index_node(Node::from(table.clone())) + .await + { Ok(_) => ( StatusCode::OK, Json(APIResponse { @@ -2964,7 +2969,11 @@ async fn folders_upsert( "Failed to upsert folder", Some(e), ), - Ok(folder) => match state.search_store.index_folder(folder.clone()).await { + Ok(folder) => match state + .search_store + .index_node(Node::from(folder.clone())) + .await + { Ok(_) => ( StatusCode::OK, Json(APIResponse { @@ -3103,15 +3112,19 @@ async fn folders_delete( let project = project::Project::new_from_id(project_id); let result = async { - state + let folder = match state .store .load_data_source_folder(&project, &data_source_id, &folder_id) - .await?; + .await? + { + Some(folder) => folder, + None => return Ok(()), + }; state .store .delete_data_source_folder(&project, &data_source_id, &folder_id) .await?; - state.search_store.delete_node(folder_id).await?; + state.search_store.delete_node(Node::from(folder)).await?; Ok(()) } .await; diff --git a/core/src/data_sources/data_source.rs b/core/src/data_sources/data_source.rs index c74ac1834566..f1fbf0e992eb 100644 --- a/core/src/data_sources/data_source.rs +++ b/core/src/data_sources/data_source.rs @@ -75,7 +75,7 @@ pub struct Chunk { pub expanded_offsets: Vec, } -/// Document is used as a data-strucutre for insertion into the SQL store (no +/// Document is used as a data-structure for insertion into the SQL store (no /// chunks, they are directly inserted in the vector search db). It is also used /// as a result from search (only the retrieved chunks are provided in the /// result). `hash` covers both the original document id and text and the @@ -142,6 +142,7 @@ pub struct Chunk { #[derive(Debug, Serialize, Clone)] pub struct Document { pub data_source_id: String, + pub data_source_internal_id: String, pub created: u64, pub document_id: String, pub timestamp: u64, @@ -163,6 +164,7 @@ pub struct Document { impl Document { pub fn new( data_source_id: &str, + data_source_internal_id: &str, document_id: &str, timestamp: u64, title: &str, @@ -176,6 +178,7 @@ impl Document { ) -> Result { Ok(Document { data_source_id: data_source_id.to_string(), + data_source_internal_id: data_source_internal_id.to_string(), created: utils::now(), document_id: document_id.to_string(), timestamp, @@ -220,6 +223,7 @@ impl From for Node { fn from(document: Document) -> Node { Node::new( &document.data_source_id, + &document.data_source_internal_id, &document.document_id, NodeType::Document, document.timestamp, @@ -692,6 +696,7 @@ impl DataSource { let document = Document::new( &self.data_source_id, + &self.internal_id, document_id, timestamp, title.as_deref().unwrap_or(document_id), @@ -766,7 +771,7 @@ impl DataSource { .await?; // Upsert document in search index. - search_store.index_document(document.clone()).await?; + search_store.index_node(Node::from(document)).await?; // Clean-up old superseded versions. self.scrub_document_superseded_versions(store, &document_id) @@ -1725,6 +1730,14 @@ impl DataSource { search_store: Box, document_id: &str, ) -> Result<()> { + let document = match store + .load_data_source_document(&self.project, &self.data_source_id, document_id, &None) + .await? + { + Some(document) => document, + None => return Ok(()), + }; + // Delete the document in the main embedder collection. self.delete_document_for_embedder(self.embedder_config(), &qdrant_clients, document_id) .await?; @@ -1748,7 +1761,7 @@ impl DataSource { .await?; // Delete document from search index. - search_store.delete_node(document_id.to_string()).await?; + search_store.delete_node(Node::from(document)).await?; // We also scrub it directly. We used to scrub async but now that we store a GCS version // for each data_source_documents entry we can scrub directly at the time of delete. diff --git a/core/src/data_sources/folder.rs b/core/src/data_sources/folder.rs index 9d08f37759c7..027a3e1ec0a4 100644 --- a/core/src/data_sources/folder.rs +++ b/core/src/data_sources/folder.rs @@ -5,6 +5,7 @@ use super::node::{Node, NodeType}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Folder { data_source_id: String, + data_source_internal_id: String, folder_id: String, timestamp: u64, title: String, @@ -16,6 +17,7 @@ pub struct Folder { impl Folder { pub fn new( data_source_id: String, + data_source_internal_id: String, folder_id: String, timestamp: u64, title: String, @@ -25,6 +27,7 @@ impl Folder { ) -> Self { Folder { data_source_id, + data_source_internal_id, folder_id, timestamp, title, @@ -37,6 +40,9 @@ impl Folder { pub fn data_source_id(&self) -> &str { &self.data_source_id } + pub fn data_source_internal_id(&self) -> &str { + &self.data_source_internal_id + } pub fn timestamp(&self) -> u64 { self.timestamp } @@ -61,6 +67,7 @@ impl From for Node { fn from(folder: Folder) -> Node { Node::new( &folder.data_source_id, + &folder.data_source_internal_id, &folder.folder_id, NodeType::Folder, folder.timestamp, diff --git a/core/src/data_sources/node.rs b/core/src/data_sources/node.rs index b6b03e5da25d..177394d48c28 100644 --- a/core/src/data_sources/node.rs +++ b/core/src/data_sources/node.rs @@ -23,6 +23,7 @@ impl fmt::Display for NodeType { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Node { pub data_source_id: String, + pub data_source_internal_id: String, pub node_id: String, pub node_type: NodeType, pub timestamp: u64, @@ -35,6 +36,7 @@ pub struct Node { impl Node { pub fn new( data_source_id: &str, + data_source_internal_id: &str, node_id: &str, node_type: NodeType, timestamp: u64, @@ -45,6 +47,7 @@ impl Node { ) -> Self { Node { data_source_id: data_source_id.to_string(), + data_source_internal_id: data_source_internal_id.to_string(), node_id: node_id.to_string(), node_type, timestamp, @@ -58,6 +61,9 @@ impl Node { pub fn data_source_id(&self) -> &str { &self.data_source_id } + pub fn data_source_internal_id(&self) -> &str { + &self.data_source_internal_id + } pub fn timestamp(&self) -> u64 { self.timestamp } @@ -81,6 +87,7 @@ impl Node { pub fn into_folder(self) -> Folder { Folder::new( self.data_source_id, + self.data_source_internal_id, self.node_id, self.timestamp, self.title, @@ -89,6 +96,11 @@ impl Node { self.mime_type, ) } + + // Computes a globally unique id for the node. + pub fn unique_id(&self) -> String { + format!("{}__{}", self.data_source_internal_id, self.node_id) + } } impl From for Node { diff --git a/core/src/databases/table.rs b/core/src/databases/table.rs index 24a9359fd5d8..f0694e3c7cd1 100644 --- a/core/src/databases/table.rs +++ b/core/src/databases/table.rs @@ -52,6 +52,7 @@ pub fn get_table_type_for_tables(tables: Vec<&Table>) -> Result { pub struct Table { project: Project, data_source_id: String, + data_source_internal_id: String, created: u64, table_id: String, @@ -75,6 +76,7 @@ impl Table { pub fn new( project: Project, data_source_id: String, + data_source_internal_id: String, created: u64, table_id: String, name: String, @@ -93,6 +95,7 @@ impl Table { Table { project, data_source_id, + data_source_internal_id, created, table_id, name, @@ -210,9 +213,7 @@ impl Table { // Delete the table node from the search index. if let Some(search_store) = search_store { - search_store - .delete_node(self.table_id().to_string()) - .await?; + search_store.delete_node(Node::from(self.clone())).await?; } Ok(()) @@ -239,6 +240,7 @@ impl From for Node { fn from(table: Table) -> Node { Node::new( &table.data_source_id, + &table.data_source_internal_id, &table.table_id, NodeType::Table, table.timestamp, @@ -583,6 +585,7 @@ mod tests { let table = Table::new( Project::new_from_id(42), "data_source_id".to_string(), + "data_source_internal_id".to_string(), utils::now(), "table_id".to_string(), "test_dbml".to_string(), diff --git a/core/src/search_stores/indices/data_sources_nodes_2.mappings.json b/core/src/search_stores/indices/data_sources_nodes_2.mappings.json new file mode 100644 index 000000000000..a7ff9c56e81d --- /dev/null +++ b/core/src/search_stores/indices/data_sources_nodes_2.mappings.json @@ -0,0 +1,39 @@ +{ + "dynamic": "strict", + "properties": { + "data_source_id": { + "type": "keyword" + }, + "data_source_internal_id": { + "type": "keyword" + }, + "timestamp": { + "type": "date" + }, + "node_type": { + "type": "keyword" + }, + "node_id": { + "type": "keyword" + }, + "title": { + "type": "text", + "analyzer": "standard", + "fields": { + "edge": { + "type": "text", + "analyzer": "edge_analyzer" + } + } + }, + "parents": { + "type": "keyword" + }, + "parent_id": { + "type": "keyword" + }, + "mime_type": { + "type": "keyword" + } + } +} diff --git a/core/src/search_stores/indices/data_sources_nodes_2.settings.local.json b/core/src/search_stores/indices/data_sources_nodes_2.settings.local.json new file mode 100644 index 000000000000..29121fca2a05 --- /dev/null +++ b/core/src/search_stores/indices/data_sources_nodes_2.settings.local.json @@ -0,0 +1,36 @@ +{ + "number_of_shards": 1, + "number_of_replicas": 0, + "refresh_interval": "30s", + "analysis": { + "analyzer": { + "icu_analyzer": { + "type": "custom", + "tokenizer": "icu_tokenizer", + "filter": [ + "icu_folding", + "lowercase", + "asciifolding", + "preserve_word_delimiter" + ] + }, + "edge_analyzer": { + "type": "custom", + "tokenizer": "icu_tokenizer", + "filter": ["lowercase", "edge_ngram_filter"] + } + }, + "filter": { + "preserve_word_delimiter": { + "type": "word_delimiter", + "split_on_numerics": false, + "split_on_case_change": false + }, + "edge_ngram_filter": { + "type": "edge_ngram", + "min_gram": 2, + "max_gram": 20 + } + } + } +} diff --git a/core/src/search_stores/indices/data_sources_nodes_2.settings.us-central-1.json b/core/src/search_stores/indices/data_sources_nodes_2.settings.us-central-1.json new file mode 100644 index 000000000000..624e7236a703 --- /dev/null +++ b/core/src/search_stores/indices/data_sources_nodes_2.settings.us-central-1.json @@ -0,0 +1,36 @@ +{ + "number_of_shards": 2, + "number_of_replicas": 1, + "refresh_interval": "30s", + "analysis": { + "analyzer": { + "icu_analyzer": { + "type": "custom", + "tokenizer": "icu_tokenizer", + "filter": [ + "icu_folding", + "lowercase", + "asciifolding", + "preserve_word_delimiter" + ] + }, + "edge_analyzer": { + "type": "custom", + "tokenizer": "icu_tokenizer", + "filter": ["lowercase", "edge_ngram_filter"] + } + }, + "filter": { + "preserve_word_delimiter": { + "type": "word_delimiter", + "split_on_numerics": false, + "split_on_case_change": false + }, + "edge_ngram_filter": { + "type": "edge_ngram", + "min_gram": 2, + "max_gram": 20 + } + } + } +} diff --git a/core/src/search_stores/search_store.rs b/core/src/search_stores/search_store.rs index 53d2c9baabb8..4b360c68e1f3 100644 --- a/core/src/search_stores/search_store.rs +++ b/core/src/search_stores/search_store.rs @@ -8,11 +8,7 @@ use elasticsearch::{ use serde_json::json; use url::Url; -use crate::{data_sources::node::Node, databases::table::Table}; -use crate::{ - data_sources::{data_source::Document, folder::Folder}, - utils, -}; +use crate::{data_sources::node::Node, utils}; use tracing::{error, info}; #[derive(serde::Deserialize)] @@ -37,11 +33,7 @@ pub trait SearchStore { ) -> Result>; async fn index_node(&self, node: Node) -> Result<()>; - async fn index_document(&self, document: Document) -> Result<()>; - async fn index_table(&self, table: Table) -> Result<()>; - async fn index_folder(&self, folder: Folder) -> Result<()>; - - async fn delete_node(&self, node_id: String) -> Result<()>; + async fn delete_node(&self, node: Node) -> Result<()>; async fn delete_data_source_nodes(&self, data_source_id: &str) -> Result<()>; fn clone_box(&self) -> Box; @@ -152,7 +144,7 @@ impl SearchStore for ElasticsearchSearchStore { let now = utils::now(); match self .client - .index(IndexParts::IndexId(NODES_INDEX_NAME, &node.node_id)) + .index(IndexParts::IndexId(NODES_INDEX_NAME, &node.unique_id())) .timeout("200ms") .body(node.clone()) .send() @@ -161,7 +153,7 @@ impl SearchStore for ElasticsearchSearchStore { Ok(_) => { info!( duration = utils::now() - now, - node_id = node.node_id, + globally_unique_id = node.unique_id(), "[ElasticsearchSearchStore] Indexed {}", node.node_type.to_string() ); @@ -171,7 +163,7 @@ impl SearchStore for ElasticsearchSearchStore { error!( error = %e, duration = utils::now() - now, - node_id = node.node_id, + globally_unique_id = node.unique_id(), "[ElasticsearchSearchStore] Failed to index {}", node.node_type.to_string() ); @@ -180,24 +172,9 @@ impl SearchStore for ElasticsearchSearchStore { } } - async fn index_document(&self, document: Document) -> Result<()> { - let node = Node::from(document); - self.index_node(node).await - } - - async fn index_table(&self, table: Table) -> Result<()> { - let node = Node::from(table); - self.index_node(node).await - } - - async fn index_folder(&self, folder: Folder) -> Result<()> { - let node = Node::from(folder); - self.index_node(node).await - } - - async fn delete_node(&self, node_id: String) -> Result<()> { + async fn delete_node(&self, node: Node) -> Result<()> { self.client - .delete(DeleteParts::IndexId(NODES_INDEX_NAME, &node_id)) + .delete(DeleteParts::IndexId(NODES_INDEX_NAME, &node.unique_id())) .send() .await?; Ok(()) diff --git a/core/src/stores/postgres.rs b/core/src/stores/postgres.rs index fd4971b6b815..589158efee0d 100644 --- a/core/src/stores/postgres.rs +++ b/core/src/stores/postgres.rs @@ -1307,14 +1307,14 @@ impl Store for PostgresStore { let r = c .query( - "SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", + "select id, internal_id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", &[&project_id, &data_source_id], ) .await?; - let data_source_row_id: i64 = match r.len() { + let (data_source_row_id, data_source_internal_id): (i64, String) = match r.len() { 0 => Err(anyhow!("Unknown DataSource: {}", data_source_id))?, - 1 => r[0].get(0), + 1 => (r[0].get(0), r[0].get(1)), _ => unreachable!(), }; @@ -1393,6 +1393,7 @@ impl Store for PostgresStore { node_mime_type, )) => Ok(Some(Document { data_source_id: data_source_id.clone(), + data_source_internal_id: data_source_internal_id.clone(), created: created as u64, timestamp: timestamp as u64, title: node_title.unwrap_or(document_id.clone()), @@ -1829,14 +1830,14 @@ impl Store for PostgresStore { let r = c .query( - "SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", + "select id, internal_id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", &[&project_id, &data_source_id], ) .await?; - let data_source_row_id: i64 = match r.len() { + let (data_source_row_id, data_source_internal_id): (i64, String) = match r.len() { 0 => Err(anyhow!("Unknown DataSource: {}", data_source_id))?, - 1 => r[0].get(0), + 1 => (r[0].get(0), r[0].get(1)), _ => unreachable!(), }; @@ -1890,6 +1891,7 @@ impl Store for PostgresStore { let document = Document { data_source_id, + data_source_internal_id: data_source_internal_id.to_string(), title, mime_type, created: created as u64, @@ -1945,14 +1947,14 @@ impl Store for PostgresStore { let r = c .query( - "SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", + "select id, internal_id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", &[&project_id, &data_source_id], ) .await?; - let data_source_row_id: i64 = match r.len() { + let (data_source_row_id, data_source_internal_id): (i64, String) = match r.len() { 0 => Err(anyhow!("Unknown DataSource: {}", data_source_id))?, - 1 => r[0].get(0), + 1 => (r[0].get(0), r[0].get(1)), _ => unreachable!(), }; @@ -2048,6 +2050,7 @@ impl Store for PostgresStore { Ok(Document { data_source_id: data_source_id.clone(), + data_source_internal_id: data_source_internal_id.clone(), created: created as u64, timestamp: timestamp as u64, title: node_title.unwrap_or(document_id.clone()), @@ -2599,13 +2602,13 @@ impl Store for PostgresStore { let tx = c.transaction().await?; let r = tx .query( - "SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", + "select id, internal_id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", &[&project_id, &data_source_id], ) .await?; - let data_source_row_id: i64 = match r.len() { + let (data_source_row_id, data_source_internal_id): (i64, String) = match r.len() { 0 => Err(anyhow!("Unknown DataSource: {}", data_source_id))?, - 1 => r[0].get(0), + 1 => (r[0].get(0), r[0].get(1)), _ => unreachable!(), }; @@ -2664,6 +2667,7 @@ impl Store for PostgresStore { let table = Table::new( project, data_source_id, + data_source_internal_id, table_created, upsert_params.table_id, upsert_params.name, @@ -2853,13 +2857,13 @@ impl Store for PostgresStore { // Get the data source row id. let stmt = c .prepare( - "SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", + "select id, internal_id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", ) .await?; let r = c.query(&stmt, &[&project_id, &data_source_id]).await?; - let data_source_row_id: i64 = match r.len() { + let (data_source_row_id, data_source_internal_id): (i64, String) = match r.len() { 0 => Err(anyhow!("Unknown DataSource: {}", data_source_id))?, - 1 => r[0].get(0), + 1 => (r[0].get(0), r[0].get(1)), _ => unreachable!(), }; @@ -2938,6 +2942,7 @@ impl Store for PostgresStore { Ok(Some(Table::new( project.clone(), data_source_id.clone(), + data_source_internal_id.clone(), created as u64, table_id, name, @@ -2974,14 +2979,14 @@ impl Store for PostgresStore { // get the data source row id let r = c .query( - "SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", + "select id, internal_id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", &[&project_id, &data_source_id], ) .await?; - let data_source_row_id: i64 = match r.len() { + let (data_source_row_id, data_source_internal_id): (i64, String) = match r.len() { 0 => Err(anyhow!("Unknown DataSource: {}", data_source_id))?, - 1 => r[0].get(0), + 1 => (r[0].get(0), r[0].get(1)), _ => unreachable!(), }; @@ -3080,6 +3085,7 @@ impl Store for PostgresStore { Ok(Table::new( project.clone(), data_source_id.clone(), + data_source_internal_id.clone(), created as u64, table_id, name, @@ -3181,13 +3187,14 @@ impl Store for PostgresStore { let tx = c.transaction().await?; let r = tx .query( - "SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", + "select id, internal_id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", &[&project_id, &data_source_id], ) .await?; - let data_source_row_id: i64 = match r.len() { + + let (data_source_row_id, data_source_internal_id): (i64, String) = match r.len() { 0 => Err(anyhow!("Unknown DataSource: {}", data_source_id))?, - 1 => r[0].get(0), + 1 => (r[0].get(0), r[0].get(1)), _ => unreachable!(), }; @@ -3218,6 +3225,7 @@ impl Store for PostgresStore { let folder = Folder::new( data_source_id, + data_source_internal_id, upsert_params.folder_id, created as u64, upsert_params.title, @@ -3299,14 +3307,14 @@ impl Store for PostgresStore { // get the data source row id let r = c .query( - "SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", + "select id, internal_id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", &[&project_id, &data_source_id], ) .await?; - let data_source_row_id: i64 = match r.len() { + let (data_source_row_id, data_source_internal_id): (i64, String) = match r.len() { 0 => Err(anyhow!("Unknown DataSource: {}", data_source_id))?, - 1 => r[0].get(0), + 1 => (r[0].get(0), r[0].get(1)), _ => unreachable!(), }; @@ -3395,6 +3403,7 @@ impl Store for PostgresStore { Ok(Folder::new( data_source_id.clone(), + data_source_internal_id.clone(), node_id, timestamp as u64, title, @@ -3460,14 +3469,14 @@ impl Store for PostgresStore { let r = c .query( - "SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", + "select id, internal_id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", &[&project_id, &data_source_id], ) .await?; - let data_source_row_id: i64 = match r.len() { + let (data_source_row_id, data_source_internal_id): (i64, String) = match r.len() { 0 => Err(anyhow!("Unknown DataSource: {}", data_source_id))?, - 1 => r[0].get(0), + 1 => (r[0].get(0), r[0].get(1)), _ => unreachable!(), }; @@ -3500,6 +3509,7 @@ impl Store for PostgresStore { Ok(Some(( Node::new( &data_source_id, + &data_source_internal_id, &node_id, node_type, timestamp as u64,