Skip to content

Commit

Permalink
[Keyword search] Globally unique id as elasticsearch index (#9558)
Browse files Browse the repository at this point in the history
* [Keyword search] Globally unique id as elasticsearch index

Description
---
Node id is not globally unique, since if e.g. the same notion is
plugged on two workspaces, the same page will have the same node id in
both workspaces

Fixes #9527

Risks
---
none, index is not used yet and we'll backfill ES from scratch when all
data source folders are already backfilled too

Deploy
---
core

* updated delete methods

* better separator

* use ds internal id

* fix

* refactor

* correct column name

* add ds internal id to mapping
  • Loading branch information
philipperolet authored Dec 22, 2024
1 parent 3bc927e commit 7a06a48
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 68 deletions.
23 changes: 18 additions & 5 deletions core/bin/core_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use dust::{
blocks::block::BlockType,
data_sources::{
data_source::{self, Section},
node::Node,
qdrant::QdrantClients,
},
databases::{
Expand Down Expand Up @@ -2240,7 +2241,11 @@ async fn tables_upsert(
)
.await
{
Ok(table) => match state.search_store.index_table(table.clone()).await {
Ok(table) => match state
.search_store
.index_node(Node::from(table.clone()))
.await
{
Ok(_) => (
StatusCode::OK,
Json(APIResponse {
Expand Down Expand Up @@ -2964,7 +2969,11 @@ async fn folders_upsert(
"Failed to upsert folder",
Some(e),
),
Ok(folder) => match state.search_store.index_folder(folder.clone()).await {
Ok(folder) => match state
.search_store
.index_node(Node::from(folder.clone()))
.await
{
Ok(_) => (
StatusCode::OK,
Json(APIResponse {
Expand Down Expand Up @@ -3103,15 +3112,19 @@ async fn folders_delete(
let project = project::Project::new_from_id(project_id);

let result = async {
state
let folder = match state
.store
.load_data_source_folder(&project, &data_source_id, &folder_id)
.await?;
.await?
{
Some(folder) => folder,
None => return Ok(()),
};
state
.store
.delete_data_source_folder(&project, &data_source_id, &folder_id)
.await?;
state.search_store.delete_node(folder_id).await?;
state.search_store.delete_node(Node::from(folder)).await?;
Ok(())
}
.await;
Expand Down
19 changes: 16 additions & 3 deletions core/src/data_sources/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub struct Chunk {
pub expanded_offsets: Vec<usize>,
}

/// Document is used as a data-strucutre for insertion into the SQL store (no
/// Document is used as a data-structure for insertion into the SQL store (no
/// chunks, they are directly inserted in the vector search db). It is also used
/// as a result from search (only the retrieved chunks are provided in the
/// result). `hash` covers both the original document id and text and the
Expand Down Expand Up @@ -142,6 +142,7 @@ pub struct Chunk {
#[derive(Debug, Serialize, Clone)]
pub struct Document {
pub data_source_id: String,
pub data_source_internal_id: String,
pub created: u64,
pub document_id: String,
pub timestamp: u64,
Expand All @@ -163,6 +164,7 @@ pub struct Document {
impl Document {
pub fn new(
data_source_id: &str,
data_source_internal_id: &str,
document_id: &str,
timestamp: u64,
title: &str,
Expand All @@ -176,6 +178,7 @@ impl Document {
) -> Result<Self> {
Ok(Document {
data_source_id: data_source_id.to_string(),
data_source_internal_id: data_source_internal_id.to_string(),
created: utils::now(),
document_id: document_id.to_string(),
timestamp,
Expand Down Expand Up @@ -220,6 +223,7 @@ impl From<Document> for Node {
fn from(document: Document) -> Node {
Node::new(
&document.data_source_id,
&document.data_source_internal_id,
&document.document_id,
NodeType::Document,
document.timestamp,
Expand Down Expand Up @@ -692,6 +696,7 @@ impl DataSource {

let document = Document::new(
&self.data_source_id,
&self.internal_id,
document_id,
timestamp,
title.as_deref().unwrap_or(document_id),
Expand Down Expand Up @@ -766,7 +771,7 @@ impl DataSource {
.await?;

// Upsert document in search index.
search_store.index_document(document.clone()).await?;
search_store.index_node(Node::from(document)).await?;

// Clean-up old superseded versions.
self.scrub_document_superseded_versions(store, &document_id)
Expand Down Expand Up @@ -1725,6 +1730,14 @@ impl DataSource {
search_store: Box<dyn SearchStore + Sync + Send>,
document_id: &str,
) -> Result<()> {
let document = match store
.load_data_source_document(&self.project, &self.data_source_id, document_id, &None)
.await?
{
Some(document) => document,
None => return Ok(()),
};

// Delete the document in the main embedder collection.
self.delete_document_for_embedder(self.embedder_config(), &qdrant_clients, document_id)
.await?;
Expand All @@ -1748,7 +1761,7 @@ impl DataSource {
.await?;

// Delete document from search index.
search_store.delete_node(document_id.to_string()).await?;
search_store.delete_node(Node::from(document)).await?;

// We also scrub it directly. We used to scrub async but now that we store a GCS version
// for each data_source_documents entry we can scrub directly at the time of delete.
Expand Down
7 changes: 7 additions & 0 deletions core/src/data_sources/folder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::node::{Node, NodeType};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Folder {
data_source_id: String,
data_source_internal_id: String,
folder_id: String,
timestamp: u64,
title: String,
Expand All @@ -16,6 +17,7 @@ pub struct Folder {
impl Folder {
pub fn new(
data_source_id: String,
data_source_internal_id: String,
folder_id: String,
timestamp: u64,
title: String,
Expand All @@ -25,6 +27,7 @@ impl Folder {
) -> Self {
Folder {
data_source_id,
data_source_internal_id,
folder_id,
timestamp,
title,
Expand All @@ -37,6 +40,9 @@ impl Folder {
pub fn data_source_id(&self) -> &str {
&self.data_source_id
}
pub fn data_source_internal_id(&self) -> &str {
&self.data_source_internal_id
}
pub fn timestamp(&self) -> u64 {
self.timestamp
}
Expand All @@ -61,6 +67,7 @@ impl From<Folder> for Node {
fn from(folder: Folder) -> Node {
Node::new(
&folder.data_source_id,
&folder.data_source_internal_id,
&folder.folder_id,
NodeType::Folder,
folder.timestamp,
Expand Down
12 changes: 12 additions & 0 deletions core/src/data_sources/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ impl fmt::Display for NodeType {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Node {
pub data_source_id: String,
pub data_source_internal_id: String,
pub node_id: String,
pub node_type: NodeType,
pub timestamp: u64,
Expand All @@ -35,6 +36,7 @@ pub struct Node {
impl Node {
pub fn new(
data_source_id: &str,
data_source_internal_id: &str,
node_id: &str,
node_type: NodeType,
timestamp: u64,
Expand All @@ -45,6 +47,7 @@ impl Node {
) -> Self {
Node {
data_source_id: data_source_id.to_string(),
data_source_internal_id: data_source_internal_id.to_string(),
node_id: node_id.to_string(),
node_type,
timestamp,
Expand All @@ -58,6 +61,9 @@ impl Node {
pub fn data_source_id(&self) -> &str {
&self.data_source_id
}
pub fn data_source_internal_id(&self) -> &str {
&self.data_source_internal_id
}
pub fn timestamp(&self) -> u64 {
self.timestamp
}
Expand All @@ -81,6 +87,7 @@ impl Node {
pub fn into_folder(self) -> Folder {
Folder::new(
self.data_source_id,
self.data_source_internal_id,
self.node_id,
self.timestamp,
self.title,
Expand All @@ -89,6 +96,11 @@ impl Node {
self.mime_type,
)
}

// Computes a globally unique id for the node.
pub fn unique_id(&self) -> String {
format!("{}__{}", self.data_source_internal_id, self.node_id)
}
}

impl From<serde_json::Value> for Node {
Expand Down
9 changes: 6 additions & 3 deletions core/src/databases/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub fn get_table_type_for_tables(tables: Vec<&Table>) -> Result<TableType> {
pub struct Table {
project: Project,
data_source_id: String,
data_source_internal_id: String,
created: u64,

table_id: String,
Expand All @@ -75,6 +76,7 @@ impl Table {
pub fn new(
project: Project,
data_source_id: String,
data_source_internal_id: String,
created: u64,
table_id: String,
name: String,
Expand All @@ -93,6 +95,7 @@ impl Table {
Table {
project,
data_source_id,
data_source_internal_id,
created,
table_id,
name,
Expand Down Expand Up @@ -210,9 +213,7 @@ impl Table {

// Delete the table node from the search index.
if let Some(search_store) = search_store {
search_store
.delete_node(self.table_id().to_string())
.await?;
search_store.delete_node(Node::from(self.clone())).await?;
}

Ok(())
Expand All @@ -239,6 +240,7 @@ impl From<Table> for Node {
fn from(table: Table) -> Node {
Node::new(
&table.data_source_id,
&table.data_source_internal_id,
&table.table_id,
NodeType::Table,
table.timestamp,
Expand Down Expand Up @@ -583,6 +585,7 @@ mod tests {
let table = Table::new(
Project::new_from_id(42),
"data_source_id".to_string(),
"data_source_internal_id".to_string(),
utils::now(),
"table_id".to_string(),
"test_dbml".to_string(),
Expand Down
39 changes: 39 additions & 0 deletions core/src/search_stores/indices/data_sources_nodes_2.mappings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{
"dynamic": "strict",
"properties": {
"data_source_id": {
"type": "keyword"
},
"data_source_internal_id": {
"type": "keyword"
},
"timestamp": {
"type": "date"
},
"node_type": {
"type": "keyword"
},
"node_id": {
"type": "keyword"
},
"title": {
"type": "text",
"analyzer": "standard",
"fields": {
"edge": {
"type": "text",
"analyzer": "edge_analyzer"
}
}
},
"parents": {
"type": "keyword"
},
"parent_id": {
"type": "keyword"
},
"mime_type": {
"type": "keyword"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"number_of_shards": 1,
"number_of_replicas": 0,
"refresh_interval": "30s",
"analysis": {
"analyzer": {
"icu_analyzer": {
"type": "custom",
"tokenizer": "icu_tokenizer",
"filter": [
"icu_folding",
"lowercase",
"asciifolding",
"preserve_word_delimiter"
]
},
"edge_analyzer": {
"type": "custom",
"tokenizer": "icu_tokenizer",
"filter": ["lowercase", "edge_ngram_filter"]
}
},
"filter": {
"preserve_word_delimiter": {
"type": "word_delimiter",
"split_on_numerics": false,
"split_on_case_change": false
},
"edge_ngram_filter": {
"type": "edge_ngram",
"min_gram": 2,
"max_gram": 20
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"number_of_shards": 2,
"number_of_replicas": 1,
"refresh_interval": "30s",
"analysis": {
"analyzer": {
"icu_analyzer": {
"type": "custom",
"tokenizer": "icu_tokenizer",
"filter": [
"icu_folding",
"lowercase",
"asciifolding",
"preserve_word_delimiter"
]
},
"edge_analyzer": {
"type": "custom",
"tokenizer": "icu_tokenizer",
"filter": ["lowercase", "edge_ngram_filter"]
}
},
"filter": {
"preserve_word_delimiter": {
"type": "word_delimiter",
"split_on_numerics": false,
"split_on_case_change": false
},
"edge_ngram_filter": {
"type": "edge_ngram",
"min_gram": 2,
"max_gram": 20
}
}
}
}
Loading

0 comments on commit 7a06a48

Please sign in to comment.