Skip to content

Commit

Permalink
Add script to delete orphaned qdrant data points (#8069)
Browse files Browse the repository at this point in the history
* Add script to delete orphaned qdrant data points

* ✨

* Add data_source_internal_id in document not found in store for core
  • Loading branch information
flvndvd authored Oct 16, 2024
1 parent 92bb861 commit d4ca3b5
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 0 deletions.
22 changes: 22 additions & 0 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ path = "bin/qdrant/migrate_embedder.rs"
name = "qdrant_create_collection"
path = "bin/qdrant/create_collection.rs"

[[bin]]
name = "qdrant_delete_orphaned_points"
path = "bin/qdrant/delete_orphaned_points.rs"

[[bin]]
name = "sqlite-worker"
path = "bin/sqlite_worker.rs"
Expand Down Expand Up @@ -95,3 +99,4 @@ jsonwebtoken = "9.3.0"
rslock = { version = "0.4.0", default-features = false, features = ["tokio-comp"] }
snowflake-connector-rs = { git = "https://github.com/dust-tt/snowflake-connector-rs", rev = "5a16356" }
axum-test = "16.0.0"
csv = "1.3.0"
136 changes: 136 additions & 0 deletions core/bin/qdrant/delete_orphaned_points.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
use anyhow::{anyhow, Result};
use csv::Reader;
use dust::data_sources::data_source::{make_document_id_hash, DataSource};
use dust::data_sources::qdrant::{DustQdrantClient, QdrantClients};
use dust::stores::postgres;
use dust::stores::store::Store;
use qdrant_client::qdrant;
use std::collections::HashMap;
use std::env;
use std::fs::File;

async fn delete_orphaned_points_for_document_id(
store: &Box<dyn Store + Sync + Send>,
ds: &DataSource,
qdrant_client: &DustQdrantClient,
document_id: &str,
) -> Result<()> {
match ds
.retrieve(store.clone(), &document_id, &None, true, &None)
.await
{
Err(e) => Err(e),
Ok(None) => Ok(()),
Ok(Some(_)) => Err(anyhow!("Document still exists. Won't delete.")),
}?;

let document_id_hash = make_document_id_hash(document_id);

let filter = qdrant::Filter {
must: vec![qdrant::FieldCondition {
key: "document_id_hash".to_string(),
r#match: Some(qdrant::Match {
match_value: Some(qdrant::r#match::MatchValue::Keyword(
document_id_hash.to_string(),
)),
}),
..Default::default()
}
.into()],
..Default::default()
};

qdrant_client
.delete_points(&ds.embedder_config(), &ds.internal_id().to_string(), filter)
.await?;

println!(
"deleted point for document_id_hash: {} in data_source_internal_id: {}",
document_id_hash,
ds.internal_id()
);

Ok(())
}

async fn delete_orphaned_points_for_data_source(
store: &Box<dyn Store + Sync + Send>,
qdrant_clients: &QdrantClients,
data_source_internal_id: &str,
document_ids: &[String],
) -> Result<()> {
println!(
"processing data_source_internal_id: {}",
data_source_internal_id
);

let ds = store
.load_data_source_by_internal_id(data_source_internal_id)
.await?
.ok_or_else(|| anyhow!("data source not found"))?;

let qdrant_client = ds.main_qdrant_client(qdrant_clients);

for document_id in document_ids {
if let Err(e) =
delete_orphaned_points_for_document_id(store, &ds, &qdrant_client, document_id).await
{
eprintln!(
"error deleting point for document_id: {} in data_source_internal_id: {}: {}",
document_id, data_source_internal_id, e
);
}
}

println!(
"finished processing data_source_internal_id: {}",
data_source_internal_id
);
Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
let args: Vec<String> = env::args().collect();
if args.len() != 2 {
return Err(anyhow!("Usage: {} <csv_file>", args[0]));
}

let csv_file = &args[1];
let mut rdr = Reader::from_reader(File::open(csv_file)?);

let mut grouped_data: HashMap<String, Vec<String>> = HashMap::new();

for result in rdr.records() {
let record = result?;
let data_source_internal_id = record[0].to_string();
let document_id = record[1].to_string();

grouped_data
.entry(data_source_internal_id)
.or_insert_with(Vec::new)
.push(document_id);
}

let qdrant_clients = QdrantClients::build().await?;

let store: Box<dyn Store + Sync + Send> = match std::env::var("CORE_DATABASE_URI") {
Ok(db_uri) => {
let store = postgres::PostgresStore::new(&db_uri).await?;
Box::new(store)
}
Err(_) => Err(anyhow!("CORE_DATABASE_URI is required (postgres)"))?,
};

for (data_source_internal_id, document_id_hashes) in grouped_data {
delete_orphaned_points_for_data_source(
&store,
&qdrant_clients,
&data_source_internal_id,
&document_id_hashes,
)
.await?;
}

Ok(())
}
1 change: 1 addition & 0 deletions core/src/data_sources/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1194,6 +1194,7 @@ impl DataSource {
data_source_id = %data_source_id,
document_id = %document_id,
document_id_hash = %document_id_hash,
data_source_internal_id = data_source.internal_id(),
panic = true,
"Document not found in store"
);
Expand Down

0 comments on commit d4ca3b5

Please sign in to comment.