Skip to content

Commit cb73699

Browse files
authored
QdrantDataSourceConfig, main read, shadow write (#2559)
* QdrantDataSourceConfig, main read, shadow write * No failure sequential shadow write * Migration for data_sources core table
1 parent 6aace59 commit cb73699

File tree

4 files changed

+288
-59
lines changed

4 files changed

+288
-59
lines changed

core/src/data_sources/data_source.rs

Lines changed: 165 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use tokio::try_join;
2727
use tokio_stream::{self as stream};
2828
use uuid::Uuid;
2929

30-
use super::qdrant::{QdrantClients, QdrantCluster};
30+
use super::qdrant::{QdrantClients, QdrantDataSourceConfig};
3131

3232
/// A filter to apply to the search query based on `tags`. All documents returned must have at least
3333
/// one tag in `is_in` and none of the tags in `is_not`.
@@ -188,7 +188,7 @@ pub struct DataSourceConfig {
188188
pub extras: Option<Value>,
189189
pub splitter_id: SplitterID,
190190
pub max_chunk_size: usize,
191-
pub use_cache: bool,
191+
pub qdrant_config: Option<QdrantDataSourceConfig>,
192192
}
193193

194194
/// The `data_source_id` is the unique identifier that allows routing to the right data in SQL store
@@ -305,7 +305,7 @@ impl DataSource {
305305
credentials: Credentials,
306306
qdrant_clients: QdrantClients,
307307
) -> Result<()> {
308-
let qdrant_client = qdrant_clients.get(QdrantCluster::Main0);
308+
let qdrant_client = qdrant_clients.main_client(&self.config.qdrant_config);
309309

310310
let mut embedder = provider(self.config.provider_id).embedder(self.config.model_id.clone());
311311
embedder.initialize(credentials).await?;
@@ -425,8 +425,6 @@ impl DataSource {
425425
document_id: String,
426426
parents: Vec<String>,
427427
) -> Result<()> {
428-
let qdrant_client = qdrant_clients.get(QdrantCluster::Main0);
429-
430428
store
431429
.update_data_source_document_parents(
432430
&self.project,
@@ -440,7 +438,7 @@ impl DataSource {
440438
hasher.update(document_id.as_bytes());
441439
let document_id_hash = format!("{}", hasher.finalize().to_hex());
442440

443-
self.update_document_payload(qdrant_client, document_id_hash, "parents", parents)
441+
self.update_document_payload(qdrant_clients, document_id_hash, "parents", parents)
444442
.await?;
445443
Ok(())
446444
}
@@ -453,8 +451,6 @@ impl DataSource {
453451
add_tags: Vec<String>,
454452
remove_tags: Vec<String>,
455453
) -> Result<Vec<String>> {
456-
let qdrant_client = qdrant_clients.get(QdrantCluster::Main0);
457-
458454
let new_tags = store
459455
.update_data_source_document_tags(
460456
&self.project,
@@ -469,18 +465,20 @@ impl DataSource {
469465
hasher.update(document_id.as_bytes());
470466
let document_id_hash = format!("{}", hasher.finalize().to_hex());
471467

472-
self.update_document_payload(qdrant_client, document_id_hash, "tags", new_tags.clone())
468+
self.update_document_payload(qdrant_clients, document_id_hash, "tags", new_tags.clone())
473469
.await?;
474470
Ok(new_tags)
475471
}
476472

477473
async fn update_document_payload(
478474
&self,
479-
qdrant_client: Arc<QdrantClient>,
475+
qdrant_clients: QdrantClients,
480476
document_id_hash: String,
481477
field_name: &str,
482478
field_value: impl Into<Value>,
483479
) -> Result<()> {
480+
let qdrant_client = qdrant_clients.main_client(&self.config.qdrant_config);
481+
484482
let mut payload = Payload::new();
485483
payload.insert(field_name, field_value.into());
486484

@@ -499,6 +497,37 @@ impl DataSource {
499497
})),
500498
};
501499

500+
match qdrant_clients.shadow_write_client(&self.config.qdrant_config) {
501+
Some(qdrant_client) => {
502+
match qdrant_client
503+
.set_payload(
504+
self.qdrant_collection().to_string(),
505+
&points_selector,
506+
payload.clone(),
507+
None,
508+
)
509+
.await
510+
{
511+
Ok(_) => {
512+
utils::done(&format!(
513+
"[SHADOW_WRITE_SUCCESS] Update payload: cluster={:?} collection={}",
514+
qdrant_clients.shadow_write_cluster(&self.config.qdrant_config),
515+
self.qdrant_collection(),
516+
));
517+
}
518+
Err(e) => {
519+
utils::error(&format!(
520+
"[SHADOW_WRITE_FAIL] Update payload: cluster={:?} collection={} error={}",
521+
qdrant_clients.shadow_write_cluster(&self.config.qdrant_config),
522+
self.qdrant_collection(),
523+
e
524+
));
525+
}
526+
}
527+
}
528+
None => (),
529+
}
530+
502531
qdrant_client
503532
.set_payload(
504533
self.qdrant_collection().to_string(),
@@ -507,6 +536,7 @@ impl DataSource {
507536
None,
508537
)
509538
.await?;
539+
510540
Ok(())
511541
}
512542

@@ -523,7 +553,7 @@ impl DataSource {
523553
text: &str,
524554
preserve_system_tags: bool,
525555
) -> Result<Document> {
526-
let qdrant_client = qdrant_clients.get(QdrantCluster::Main0);
556+
let qdrant_client = qdrant_clients.main_client(&self.config.qdrant_config);
527557

528558
// disallow preserve_system_tags=true if tags contains a string starting with the system tag prefix
529559
// prevents having duplicate system tags or have users accidentally add system tags (from UI/API)
@@ -830,27 +860,52 @@ impl DataSource {
830860
document.token_count = Some(document.chunks.len() * self.config.max_chunk_size);
831861

832862
let now = utils::now();
863+
833864
// Clean-up previous document chunks (vector search db).
834-
let _ = qdrant_client
835-
.delete_points(
836-
self.qdrant_collection(),
837-
&qdrant::Filter {
838-
must_not: vec![],
839-
should: vec![],
840-
must: vec![qdrant::FieldCondition {
841-
key: "document_id_hash".to_string(),
842-
r#match: Some(qdrant::Match {
843-
match_value: Some(qdrant::r#match::MatchValue::Keyword(
844-
document_id_hash.clone(),
845-
)),
846-
}),
847-
..Default::default()
865+
let filter: PointsSelector = qdrant::Filter {
866+
must_not: vec![],
867+
should: vec![],
868+
must: vec![qdrant::FieldCondition {
869+
key: "document_id_hash".to_string(),
870+
r#match: Some(qdrant::Match {
871+
match_value: Some(qdrant::r#match::MatchValue::Keyword(
872+
document_id_hash.clone(),
873+
)),
874+
}),
875+
..Default::default()
876+
}
877+
.into()],
878+
}
879+
.into();
880+
881+
match qdrant_clients.shadow_write_client(&self.config.qdrant_config) {
882+
Some(qdrant_client) => {
883+
match qdrant_client
884+
.delete_points(self.qdrant_collection(), &filter, None)
885+
.await
886+
{
887+
Ok(_) => {
888+
utils::done(&format!(
889+
"[SHADOW_WRITE_SUCCESS] Delete points: cluster={:?} collection={}",
890+
qdrant_clients.shadow_write_cluster(&self.config.qdrant_config),
891+
self.qdrant_collection(),
892+
));
893+
}
894+
Err(e) => {
895+
utils::error(&format!(
896+
"[SHADOW_WRITE_FAIL] Delete points: cluster={:?} collection={} error={}",
897+
qdrant_clients.shadow_write_cluster(&self.config.qdrant_config),
898+
self.qdrant_collection(),
899+
e
900+
));
848901
}
849-
.into()],
850902
}
851-
.into(),
852-
None,
853-
)
903+
}
904+
None => (),
905+
}
906+
907+
qdrant_client
908+
.delete_points(self.qdrant_collection(), &filter, None)
854909
.await?;
855910

856911
utils::done(&format!(
@@ -915,7 +970,33 @@ impl DataSource {
915970
let now = utils::now();
916971
let chunk_len = chunk.len();
917972

918-
let _ = qdrant_client
973+
match qdrant_clients.shadow_write_client(&self.config.qdrant_config) {
974+
Some(qdrant_client) => {
975+
match qdrant_client
976+
.upsert_points(self.qdrant_collection(), chunk.clone(), None)
977+
.await
978+
{
979+
Ok(_) => {
980+
utils::done(&format!(
981+
"[SHADOW_WRITE_SUCCESS] Upsert points: cluster={:?} collection={}",
982+
qdrant_clients.shadow_write_cluster(&self.config.qdrant_config),
983+
self.qdrant_collection(),
984+
));
985+
}
986+
Err(e) => {
987+
utils::error(&format!(
988+
"[SHADOW_WRITE_FAIL] Upsert points: cluster={:?} collection={} error={}",
989+
qdrant_clients.shadow_write_cluster(&self.config.qdrant_config),
990+
self.qdrant_collection(),
991+
e
992+
));
993+
}
994+
}
995+
}
996+
None => (),
997+
}
998+
999+
qdrant_client
9191000
.upsert_points(self.qdrant_collection(), chunk, None)
9201001
.await?;
9211002

@@ -953,7 +1034,7 @@ impl DataSource {
9531034
full_text: bool,
9541035
target_document_tokens: Option<usize>,
9551036
) -> Result<Vec<Document>> {
956-
let qdrant_client = qdrant_clients.get(QdrantCluster::Main0);
1037+
let qdrant_client = qdrant_clients.main_client(&self.config.qdrant_config);
9571038

9581039
if top_k > DataSource::MAX_TOP_K_SEARCH {
9591040
return Err(anyhow!("top_k must be <= {}", DataSource::MAX_TOP_K_SEARCH));
@@ -1530,34 +1611,58 @@ impl DataSource {
15301611
qdrant_clients: QdrantClients,
15311612
document_id: &str,
15321613
) -> Result<()> {
1533-
let qdrant_client = qdrant_clients.get(QdrantCluster::Main0);
1614+
let qdrant_client = qdrant_clients.main_client(&self.config.qdrant_config);
15341615
let store = store.clone();
15351616

15361617
let mut hasher = blake3::Hasher::new();
15371618
hasher.update(document_id.as_bytes());
15381619
let document_id_hash = format!("{}", hasher.finalize().to_hex());
15391620

15401621
// Clean-up document chunks (vector search db).
1541-
let _ = qdrant_client
1542-
.delete_points(
1543-
self.qdrant_collection(),
1544-
&qdrant::Filter {
1545-
must_not: vec![],
1546-
should: vec![],
1547-
must: vec![qdrant::FieldCondition {
1548-
key: "document_id_hash".to_string(),
1549-
r#match: Some(qdrant::Match {
1550-
match_value: Some(qdrant::r#match::MatchValue::Keyword(
1551-
document_id_hash.clone(),
1552-
)),
1553-
}),
1554-
..Default::default()
1622+
let filter: PointsSelector = qdrant::Filter {
1623+
must_not: vec![],
1624+
should: vec![],
1625+
must: vec![qdrant::FieldCondition {
1626+
key: "document_id_hash".to_string(),
1627+
r#match: Some(qdrant::Match {
1628+
match_value: Some(qdrant::r#match::MatchValue::Keyword(
1629+
document_id_hash.clone(),
1630+
)),
1631+
}),
1632+
..Default::default()
1633+
}
1634+
.into()],
1635+
}
1636+
.into();
1637+
1638+
match qdrant_clients.shadow_write_client(&self.config.qdrant_config) {
1639+
Some(qdrant_client) => {
1640+
match qdrant_client
1641+
.delete_points(self.qdrant_collection(), &filter, None)
1642+
.await
1643+
{
1644+
Ok(_) => {
1645+
utils::done(&format!(
1646+
"[SHADOW_WRITE_SUCCESS] Delete points: cluster={:?} collection={}",
1647+
qdrant_clients.shadow_write_cluster(&self.config.qdrant_config),
1648+
self.qdrant_collection(),
1649+
));
1650+
}
1651+
Err(e) => {
1652+
utils::error(&format!(
1653+
"[SHADOW_WRITE_FAIL] Delete points: cluster={:?} collection={} error={}",
1654+
qdrant_clients.shadow_write_cluster(&self.config.qdrant_config),
1655+
self.qdrant_collection(),
1656+
e
1657+
));
15551658
}
1556-
.into()],
15571659
}
1558-
.into(),
1559-
None,
1560-
)
1660+
}
1661+
None => (),
1662+
}
1663+
1664+
qdrant_client
1665+
.delete_points(self.qdrant_collection(), &filter, None)
15611666
.await?;
15621667

15631668
// Delete document (SQL)
@@ -1573,7 +1678,16 @@ impl DataSource {
15731678
store: Box<dyn Store + Sync + Send>,
15741679
qdrant_clients: QdrantClients,
15751680
) -> Result<()> {
1576-
let qdrant_client = qdrant_clients.get(QdrantCluster::Main0);
1681+
if qdrant_clients
1682+
.shadow_write_cluster(&self.config.qdrant_config)
1683+
.is_some()
1684+
{
1685+
Err(anyhow!(
1686+
"Cannot delete data source with a shadow_write_cluster set"
1687+
))?;
1688+
}
1689+
1690+
let qdrant_client = qdrant_clients.main_client(&self.config.qdrant_config);
15771691
let store = store.clone();
15781692

15791693
// Delete collection (vector search db).

0 commit comments

Comments
 (0)