From 3a46c22c6b5535f9ff7ea6e2f7efa9ce6200d79c Mon Sep 17 00:00:00 2001 From: Sujan Reddy Date: Fri, 14 Mar 2025 12:36:52 -0500 Subject: [PATCH 01/21] Added arrow as third party dependency --- .gitmodules | 3 +++ CMakeLists.txt | 1 + src/cpp/include/partition_manager.h | 1 + src/cpp/src/partition_manager.cpp | 1 + 4 files changed, 6 insertions(+) diff --git a/.gitmodules b/.gitmodules index 265bcb27..7a3d29ba 100644 --- a/.gitmodules +++ b/.gitmodules @@ -7,3 +7,6 @@ [submodule "src/cpp/third_party/concurrentqueue"] path = src/cpp/third_party/concurrentqueue url = https://github.com/cameron314/concurrentqueue.git +[submodule "src/cpp/third_party/arrow"] + path = src/cpp/third_party/arrow + url = https://github.com/apache/arrow.git diff --git a/CMakeLists.txt b/CMakeLists.txt index d54fa262..0b92d04c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -103,6 +103,7 @@ target_include_directories(${PROJECT_NAME} ${TORCH_INCLUDE_DIRS} ${project_INCLUDE_DIR} ${project_THIRD_PARTY_DIR}/concurrentqueue/ + ${project_THIRD_PARTY_DIR}/arrow/cpp/src faiss ) diff --git a/src/cpp/include/partition_manager.h b/src/cpp/include/partition_manager.h index 3c0da1e7..d1332621 100644 --- a/src/cpp/include/partition_manager.h +++ b/src/cpp/include/partition_manager.h @@ -9,6 +9,7 @@ #include #include +#include class QuakeIndex; diff --git a/src/cpp/src/partition_manager.cpp b/src/cpp/src/partition_manager.cpp index aac447dc..394f1909 100644 --- a/src/cpp/src/partition_manager.cpp +++ b/src/cpp/src/partition_manager.cpp @@ -11,6 +11,7 @@ #include #include #include "quake_index.h" +#include using std::runtime_error; From 2063b5d94f696bd5c112e83a1040dbddc5e78c22 Mon Sep 17 00:00:00 2001 From: Sujan Reddy Date: Sun, 16 Mar 2025 22:44:32 -0500 Subject: [PATCH 02/21] Added build with arrow tables functionality --- .gitmodules | 5 +---- CMakeLists.txt | 5 ++++- src/cpp/include/clustering.h | 4 +++- src/cpp/include/common.h | 7 +++++++ src/cpp/include/dynamic_inverted_list.h | 24 +++++++++++++++++++++++- src/cpp/include/index_partition.h | 3 ++- src/cpp/include/quake_index.h | 2 +- src/cpp/src/clustering.cpp | 15 +++++++++++++-- src/cpp/src/dynamic_inverted_list.cpp | 15 +++++++++++++-- src/cpp/src/index_partition.cpp | 5 ++++- src/cpp/src/partition_manager.cpp | 4 +++- src/cpp/src/quake_index.cpp | 6 ++++-- test/cpp/CMakeLists.txt | 9 ++++++++- 13 files changed, 86 insertions(+), 18 deletions(-) diff --git a/.gitmodules b/.gitmodules index 7a3d29ba..1147a27b 100644 --- a/.gitmodules +++ b/.gitmodules @@ -6,7 +6,4 @@ url = https://github.com/pybind/pybind11.git [submodule "src/cpp/third_party/concurrentqueue"] path = src/cpp/third_party/concurrentqueue - url = https://github.com/cameron314/concurrentqueue.git -[submodule "src/cpp/third_party/arrow"] - path = src/cpp/third_party/arrow - url = https://github.com/apache/arrow.git + url = https://github.com/cameron314/concurrentqueue.git \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 0b92d04c..32db4ded 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -81,11 +81,13 @@ add_compile_definitions(_GLIBCXX_USE_CXX11_ABI=0) # Find Required Packages # --------------------------------------------------------------- find_package(Torch REQUIRED) +find_package(Arrow REQUIRED) # Corrected to uppercase 'Arrow' find_package(Python3 COMPONENTS Development Interpreter REQUIRED) message(STATUS "Torch include dir: ${TORCH_INCLUDE_DIRS}") message(STATUS "Torch libraries: ${TORCH_LIBRARIES}") message(STATUS "Python include dir: ${Python3_INCLUDE_DIRS}") +message(STATUS "Arrow include dir: ${ARROW_INCLUDE_DIR}") set(PYTHON_INCLUDE_DIR ${Python3_INCLUDE_DIRS}) @@ -103,7 +105,7 @@ target_include_directories(${PROJECT_NAME} ${TORCH_INCLUDE_DIRS} ${project_INCLUDE_DIR} ${project_THIRD_PARTY_DIR}/concurrentqueue/ - ${project_THIRD_PARTY_DIR}/arrow/cpp/src + ${ARROW_INCLUDE_DIR} faiss ) @@ -139,6 +141,7 @@ else() endif() target_link_libraries(${PROJECT_NAME} PUBLIC ${LINK_LIBS}) +target_link_libraries(${PROJECT_NAME} PUBLIC Arrow::arrow_shared) IF(CMAKE_BUILD_TYPE MATCHES Debug AND QUAKE_USE_TSAN) message("Using thread sanitizer") diff --git a/src/cpp/include/clustering.h b/src/cpp/include/clustering.h index d0980d38..9c8926ae 100644 --- a/src/cpp/include/clustering.h +++ b/src/cpp/include/clustering.h @@ -14,6 +14,8 @@ shared_ptr kmeans(Tensor vectors, int n_clusters, MetricType metric_type, int niter = 5, - Tensor initial_centroids = Tensor()); + std::vector> data_frames = {}, + Tensor initial_centroids = Tensor() + ); #endif //CLUSTERING_H diff --git a/src/cpp/include/common.h b/src/cpp/include/common.h index efdbfa53..48267909 100644 --- a/src/cpp/include/common.h +++ b/src/cpp/include/common.h @@ -33,6 +33,12 @@ #include #include +#include +#include +#include +#include +#include + #ifdef QUAKE_USE_NUMA #include #include @@ -256,6 +262,7 @@ struct Clustering { Tensor partition_ids; vector vectors; vector vector_ids; + vector>> data_frames; int64_t ntotal() const { int64_t n = 0; diff --git a/src/cpp/include/dynamic_inverted_list.h b/src/cpp/include/dynamic_inverted_list.h index f38974e2..34b863e2 100644 --- a/src/cpp/include/dynamic_inverted_list.h +++ b/src/cpp/include/dynamic_inverted_list.h @@ -135,6 +135,7 @@ namespace faiss { * @param n_entry Number of entries to add. * @param ids Pointer to the vector IDs. * @param codes Pointer to the encoded vectors. + * @param data_frames Arrow data frames. * @return Number of entries added. * @throws std::runtime_error if the partition does not exist. */ @@ -142,7 +143,28 @@ namespace faiss { size_t list_no, size_t n_entry, const idx_t *ids, - const uint8_t *codes) override; + const uint8_t *codes, + std::vector> data_frames + ); + + /** + * @brief Append new entries (codes and IDs) to a partition. + * + * @param list_no Partition number. + * @param n_entry Number of entries to add. + * @param ids Pointer to the vector IDs. + * @param codes Pointer to the encoded vectors. + * @param data_frames Optional Arrow data frames. + * @return Number of entries added. + * @throws std::runtime_error if the partition does not exist. + */ + size_t add_entries( + size_t list_no, + size_t n_entry, + const idx_t *ids, + const uint8_t *codes + ) ; + /** * @brief Update existing entries in a partition. diff --git a/src/cpp/include/index_partition.h b/src/cpp/include/index_partition.h index 134282ec..b1bc730b 100644 --- a/src/cpp/include/index_partition.h +++ b/src/cpp/include/index_partition.h @@ -27,6 +27,7 @@ class IndexPartition { uint8_t* codes_ = nullptr; ///< Pointer to the encoded vectors (raw memory block) idx_t* ids_ = nullptr; ///< Pointer to the vector IDs + std::vector> data_frames_; /// Default constructor. IndexPartition() = default; @@ -86,7 +87,7 @@ class IndexPartition { * @param new_ids Pointer to the new vector IDs. * @param new_codes Pointer to the new encoded vectors. */ - void append(int64_t n_entry, const idx_t* new_ids, const uint8_t* new_codes); + void append(int64_t n_entry, const idx_t* new_ids, const uint8_t* new_codes, std::vector> data_frames={}); /** * @brief Update existing entries in place. diff --git a/src/cpp/include/quake_index.h b/src/cpp/include/quake_index.h index c0cccc21..7aa057aa 100644 --- a/src/cpp/include/quake_index.h +++ b/src/cpp/include/quake_index.h @@ -47,7 +47,7 @@ class QuakeIndex { * @param build_params Parameters for building the index. * @return Timing information for the build. */ - shared_ptr build(Tensor x, Tensor ids, shared_ptr build_params); + shared_ptr build(Tensor x, Tensor ids, shared_ptr build_params, std::vector> data_frame = {}); /** * @brief Search for vectors in the index. diff --git a/src/cpp/src/clustering.cpp b/src/cpp/src/clustering.cpp index b8825bc1..05070138 100644 --- a/src/cpp/src/clustering.cpp +++ b/src/cpp/src/clustering.cpp @@ -13,6 +13,7 @@ shared_ptr kmeans(Tensor vectors, int n_clusters, MetricType metric_type, int niter, + std::vector> data_frames, Tensor /* initial_centroids */) { // Ensure enough vectors are available and sizes match. assert(vectors.size(0) >= n_clusters * 2); @@ -52,10 +53,19 @@ shared_ptr kmeans(Tensor vectors, // Partition vectors and ids by cluster. vector cluster_vectors(n_clusters); vector cluster_ids(n_clusters); + vector>> cluster_data_frames(n_clusters); + + for (int i = 0; i < n_clusters; i++) { - cluster_vectors[i] = vectors.index({assignments == i}); - cluster_ids[i] = ids.index({assignments == i}); + auto mask = (assignments == i); + cluster_vectors[i] = vectors.index({mask}); + cluster_ids[i] = ids.index({mask}); + } + for(int j=0;j(); + cluster_data_frames[cluster_id].push_back(data_frames[j]); } + Tensor partition_ids = torch::arange(n_clusters, torch::kInt64); shared_ptr clustering = std::make_shared(); @@ -63,6 +73,7 @@ shared_ptr kmeans(Tensor vectors, clustering->partition_ids = partition_ids; clustering->vectors = cluster_vectors; clustering->vector_ids = cluster_ids; + clustering->data_frames = cluster_data_frames; delete index_ptr; return clustering; diff --git a/src/cpp/src/dynamic_inverted_list.cpp b/src/cpp/src/dynamic_inverted_list.cpp index a8b070fd..42af604a 100644 --- a/src/cpp/src/dynamic_inverted_list.cpp +++ b/src/cpp/src/dynamic_inverted_list.cpp @@ -152,7 +152,9 @@ namespace faiss { size_t list_no, size_t n_entry, const idx_t *ids, - const uint8_t *codes) { + const uint8_t *codes, + std::vector> data_frames + ) { if (n_entry == 0) { return 0; } @@ -168,10 +170,19 @@ namespace faiss { part->set_code_size(static_cast(code_size)); } - part->append((int64_t) n_entry, ids, codes); + part->append((int64_t) n_entry, ids, codes, data_frames); return n_entry; } + size_t DynamicInvertedLists::add_entries( + size_t list_no, + size_t n_entry, + const idx_t *ids, + const uint8_t *codes + ) { + return add_entries(list_no, n_entry, ids, codes, {}); + } + void DynamicInvertedLists::update_entries( size_t list_no, size_t offset, diff --git a/src/cpp/src/index_partition.cpp b/src/cpp/src/index_partition.cpp index 3a767f5f..5ba15241 100644 --- a/src/cpp/src/index_partition.cpp +++ b/src/cpp/src/index_partition.cpp @@ -49,12 +49,15 @@ void IndexPartition::set_code_size(int64_t code_size) { code_size_ = code_size; } -void IndexPartition::append(int64_t n_entry, const idx_t* new_ids, const uint8_t* new_codes) { +void IndexPartition::append(int64_t n_entry, const idx_t* new_ids, const uint8_t* new_codes, std::vector> data_frames) { if (n_entry <= 0) return; ensure_capacity(num_vectors_ + n_entry); const size_t code_bytes = static_cast(code_size_); std::memcpy(codes_ + num_vectors_ * code_bytes, new_codes, n_entry * code_bytes); std::memcpy(ids_ + num_vectors_, new_ids, n_entry * sizeof(idx_t)); + if(!data_frames.empty()) { + data_frames_.insert(data_frames_.end(), data_frames.begin(), data_frames.end()); + } num_vectors_ += n_entry; } diff --git a/src/cpp/src/partition_manager.cpp b/src/cpp/src/partition_manager.cpp index 394f1909..5358d808 100644 --- a/src/cpp/src/partition_manager.cpp +++ b/src/cpp/src/partition_manager.cpp @@ -78,6 +78,7 @@ void PartitionManager::init_partitions( for (int64_t i = 0; i < nlist; i++) { Tensor v = clustering->vectors[i]; Tensor id = clustering->vector_ids[i]; + std::vector> data_frames = clustering->data_frames[i]; if (v.size(0) != id.size(0)) { throw runtime_error("[PartitionManager] init_partitions: mismatch in v.size(0) vs id.size(0)."); } @@ -104,7 +105,8 @@ void PartitionManager::init_partitions( partition_ids_accessor[i], count, id.data_ptr(), - as_uint8_ptr(v) + as_uint8_ptr(v), + data_frames ); if (debug_) { std::cout << "[PartitionManager] init_partitions: Added " << count diff --git a/src/cpp/src/quake_index.cpp b/src/cpp/src/quake_index.cpp index 85e80e8b..fd0531da 100644 --- a/src/cpp/src/quake_index.cpp +++ b/src/cpp/src/quake_index.cpp @@ -25,7 +25,7 @@ QuakeIndex::~QuakeIndex() { maintenance_policy_params_ = nullptr; } -shared_ptr QuakeIndex::build(Tensor x, Tensor ids, shared_ptr build_params) { +shared_ptr QuakeIndex::build(Tensor x, Tensor ids, shared_ptr build_params, std::vector> data_frames) { build_params_ = build_params; metric_ = str_to_metric_type(build_params_->metric); @@ -45,7 +45,8 @@ shared_ptr QuakeIndex::build(Tensor x, Tensor ids, shared_ptrnlist, metric_, - build_params_->niter + build_params_->niter, + data_frames ); auto e1 = std::chrono::high_resolution_clock::now(); timing_info->train_time_us = std::chrono::duration_cast(e1 - s1).count(); @@ -71,6 +72,7 @@ shared_ptr QuakeIndex::build(Tensor x, Tensor ids, shared_ptrcentroids = x.mean(0, true); clustering->vectors = {x}; clustering->vector_ids = {ids}; + clustering->data_frames = {data_frames}; partition_manager_->init_partitions(parent_, clustering); } diff --git a/test/cpp/CMakeLists.txt b/test/cpp/CMakeLists.txt index e21a8a3f..a45605ec 100644 --- a/test/cpp/CMakeLists.txt +++ b/test/cpp/CMakeLists.txt @@ -6,7 +6,14 @@ ADD_EXECUTABLE(quake_tests ${SRCS}) TARGET_LINK_LIBRARIES(quake_tests ${PROJECT_NAME} - gtest gtest_main + gtest gtest_main + Arrow::arrow_shared +) + +# Include directories propagated from dependencies +target_include_directories(quake_tests + PRIVATE + ${ARROW_INCLUDE_DIR} # Arrow headers (if not already handled by Arrow::Arrow) ) add_test(NAME quake_tests COMMAND quake_tests WORKING_DIRECTORY ${QUAKE_TEST_HOME}) \ No newline at end of file From e1aa5ac5171856e99d509b633922e643d616d158 Mon Sep 17 00:00:00 2001 From: Sujan Reddy Date: Sun, 16 Mar 2025 22:47:38 -0500 Subject: [PATCH 03/21] updated stress tests to include attributes --- test/cpp/quake_index.cpp | 46 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/test/cpp/quake_index.cpp b/test/cpp/quake_index.cpp index e16a1d6b..e3fa5fad 100644 --- a/test/cpp/quake_index.cpp +++ b/test/cpp/quake_index.cpp @@ -7,6 +7,12 @@ #include #include "quake_index.h" #include +#include +#include +#include +#include +#include +#include // Helper functions for random data static torch::Tensor generate_random_data(int64_t num_vectors, int64_t dim) { @@ -17,6 +23,29 @@ static torch::Tensor generate_sequential_ids(int64_t count, int64_t start = 0) { return torch::arange(start, start + count, torch::kInt64); } +std::vector> generate_data_frame(int64_t num_vectors) { + arrow::MemoryPool* pool = arrow::default_memory_pool(); + std::vector> tables; + + for (int64_t i = 0; i < num_vectors; i++) { + arrow::DoubleBuilder price_builder(pool); + price_builder.Append(static_cast(i) * 1.5); + + std::shared_ptr price_array; + price_builder.Finish(&price_array); + + std::vector> schema_vector = { + arrow::field("price", arrow::float64()) + }; + + auto schema = std::make_shared(schema_vector); + auto table = arrow::Table::Make(schema, {price_array}); + tables.push_back(table); + } + + return tables; +} + class QuakeIndexTest : public ::testing::Test { protected: // Example parameters @@ -32,6 +61,9 @@ class QuakeIndexTest : public ::testing::Test { // Query vectors torch::Tensor query_vectors_; + // Arrow data + std::vector> data_frames_; + void SetUp() override { // Generate random data data_vectors_ = generate_random_data(num_vectors_, dimension_); @@ -40,6 +72,9 @@ class QuakeIndexTest : public ::testing::Test { // Queries query_vectors_ = generate_random_data(num_queries_, dimension_); + + // Arrow data + data_frames_ = generate_data_frame(num_vectors_); } }; @@ -64,7 +99,7 @@ TEST_F(QuakeIndexTest, BuildTest) { build_params->metric = "l2"; build_params->niter = 5; // small kmeans iteration - auto timing_info = index.build(data_vectors_, data_ids_, build_params); + auto timing_info = index.build(data_vectors_, data_ids_, build_params, data_frames_); // Check that we created partition_manager_, parent_, etc. EXPECT_NE(index.partition_manager_, nullptr); @@ -260,6 +295,7 @@ TEST(QuakeIndexStressTest, LargeBuildTest) { int64_t num_vectors = 1e6; // 1 million vectors auto data_vectors = generate_random_data(num_vectors, dimension); auto data_ids = generate_sequential_ids(num_vectors, 0); + auto data_frames = generate_data_frame(num_vectors); QuakeIndex index; @@ -270,7 +306,7 @@ TEST(QuakeIndexStressTest, LargeBuildTest) { build_params->niter = 5; auto t0 = std::chrono::high_resolution_clock::now(); - auto timing_info = index.build(data_vectors, data_ids, build_params); + auto timing_info = index.build(data_vectors, data_ids, build_params, data_frames); auto t1 = std::chrono::high_resolution_clock::now(); // Check that the build completed and that we didn't crash @@ -297,6 +333,7 @@ TEST(QuakeIndexStressTest, RepeatedBuildSearchTest) { // Pre-generate data auto data_vectors = generate_random_data(num_vectors, dimension); auto data_ids = generate_sequential_ids(num_vectors, 1000); + auto data_frames = generate_data_frame(num_vectors); auto query_vectors = generate_random_data(num_queries, dimension); for (int i = 0; i < iteration_count; i++) { @@ -307,7 +344,7 @@ TEST(QuakeIndexStressTest, RepeatedBuildSearchTest) { build_params->niter = 3; // Build index - index.build(data_vectors, data_ids, build_params); + index.build(data_vectors, data_ids, build_params, data_frames); // Query index auto search_params = std::make_shared(); @@ -421,6 +458,7 @@ TEST(QuakeIndexStressTest, HighDimensionTest) { int64_t num_vectors = 5000; auto data_vectors = generate_random_data(num_vectors, dimension); auto data_ids = generate_sequential_ids(num_vectors); + auto data_frames = generate_data_frame(num_vectors); QuakeIndex index; auto build_params = std::make_shared(); @@ -430,7 +468,7 @@ TEST(QuakeIndexStressTest, HighDimensionTest) { build_params->niter = 3; // If your system doesn’t have enough memory for bigger tests, reduce num_vectors or dimension. - auto timing_info = index.build(data_vectors, data_ids, build_params); + auto timing_info = index.build(data_vectors, data_ids, build_params, data_frames); ASSERT_NE(timing_info, nullptr); EXPECT_EQ(index.ntotal(), num_vectors); From c0aece380cf2d7df66d4089a0d3212f6efdecce2 Mon Sep 17 00:00:00 2001 From: Sujan Reddy Date: Sun, 16 Mar 2025 22:59:20 -0500 Subject: [PATCH 04/21] renamed duplicate function --- test/cpp/benchmark.cpp | 39 +++++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/test/cpp/benchmark.cpp b/test/cpp/benchmark.cpp index ee85e5b3..0bc86d65 100644 --- a/test/cpp/benchmark.cpp +++ b/test/cpp/benchmark.cpp @@ -42,6 +42,29 @@ static Tensor generate_ids(int64_t num, int64_t start = 0) { return torch::arange(start, start + num, torch::kInt64); } +static std::vector> generate_data_frame(int64_t num_vectors) { + arrow::MemoryPool* pool = arrow::default_memory_pool(); + std::vector> tables; + + for (int64_t i = 0; i < num_vectors; i++) { + arrow::DoubleBuilder price_builder(pool); + price_builder.Append(static_cast(i) * 1.5); + + std::shared_ptr price_array; + price_builder.Finish(&price_array); + + std::vector> schema_vector = { + arrow::field("price", arrow::float64()) + }; + + auto schema = std::make_shared(schema_vector); + auto table = arrow::Table::Make(schema, {price_array}); + tables.push_back(table); + } + + return tables; +} + // // ===== Quake BENCHMARK FIXTURES ===== // @@ -52,14 +75,16 @@ class QuakeSerialFlatBenchmark : public ::testing::Test { std::shared_ptr index_; Tensor data_; Tensor ids_; + std::vector> data_frame_; void SetUp() override { data_ = generate_data(NUM_VECTORS, DIM); ids_ = generate_ids(NUM_VECTORS); + data_frame_ = generate_data_frame(NUM_VECTORS); index_ = std::make_shared(); auto build_params = std::make_shared(); build_params->nlist = 1; // flat index build_params->metric = "l2"; - index_->build(data_, ids_, build_params); + index_->build(data_, ids_, build_params, data_frame_); } }; @@ -69,16 +94,18 @@ class QuakeWorkerFlatBenchmark : public ::testing::Test { std::shared_ptr index_; Tensor data_; Tensor ids_; + std::vector> data_frame_; void SetUp() override { data_ = generate_data(NUM_VECTORS, DIM); ids_ = generate_ids(NUM_VECTORS); + data_frame_ = generate_data_frame(NUM_VECTORS); index_ = std::make_shared(); auto build_params = std::make_shared(); build_params->nlist = 1; // flat index build_params->metric = "l2"; // Use as many workers as hardware concurrency build_params->num_workers = std::thread::hardware_concurrency(); - index_->build(data_, ids_, build_params); + index_->build(data_, ids_, build_params, data_frame_); } }; @@ -89,15 +116,17 @@ class QuakeSerialIVFBenchmark : public ::testing::Test { std::shared_ptr index_; Tensor data_; Tensor ids_; + std::vector> data_frame_; void SetUp() override { data_ = generate_data(NUM_VECTORS, DIM); ids_ = generate_ids(NUM_VECTORS); + data_frame_ = generate_data_frame(NUM_VECTORS); index_ = std::make_shared(); auto build_params = std::make_shared(); build_params->nlist = N_LIST; // IVF index build_params->metric = "l2"; build_params->niter = 3; - index_->build(data_, ids_, build_params); + index_->build(data_, ids_, build_params, data_frame_); } }; @@ -107,9 +136,11 @@ class QuakeWorkerIVFBenchmark : public ::testing::Test { std::shared_ptr index_; Tensor data_; Tensor ids_; + std::vector> data_frame_; void SetUp() override { data_ = generate_data(NUM_VECTORS, DIM); ids_ = generate_ids(NUM_VECTORS); + data_frame_ = generate_data_frame(NUM_VECTORS); index_ = std::make_shared(); auto build_params = std::make_shared(); build_params->nlist = N_LIST; // IVF index @@ -117,7 +148,7 @@ class QuakeWorkerIVFBenchmark : public ::testing::Test { build_params->niter = 3; // Use as many workers as hardware concurrency build_params->num_workers = std::thread::hardware_concurrency(); - index_->build(data_, ids_, build_params); + index_->build(data_, ids_, build_params, data_frame_); } }; From 147dbe127f3aa6dd6073582b70e22efef500b788 Mon Sep 17 00:00:00 2001 From: Sujan Reddy Date: Sun, 16 Mar 2025 23:03:13 -0500 Subject: [PATCH 05/21] added libarrow as conda dependency --- environments/ubuntu-latest/conda.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/environments/ubuntu-latest/conda.yaml b/environments/ubuntu-latest/conda.yaml index 24c3bdc5..1ab6789c 100644 --- a/environments/ubuntu-latest/conda.yaml +++ b/environments/ubuntu-latest/conda.yaml @@ -11,6 +11,7 @@ dependencies: - faiss-cpu - matplotlib - pytest + - libarrow-all=19.0.1 - pip - pip: - sphinx From 29e46145308a2589cc7b9fe42d537681d7d8efc1 Mon Sep 17 00:00:00 2001 From: Sujan Reddy Date: Tue, 18 Mar 2025 16:52:13 -0500 Subject: [PATCH 06/21] updated data structure for storing tables --- src/cpp/include/clustering.h | 2 +- src/cpp/include/common.h | 2 +- src/cpp/include/dynamic_inverted_list.h | 2 +- src/cpp/include/index_partition.h | 4 +- src/cpp/include/quake_index.h | 2 +- src/cpp/src/clustering.cpp | 57 ++++++++++++++++++---- src/cpp/src/dynamic_inverted_list.cpp | 4 +- src/cpp/src/index_partition.cpp | 7 +-- src/cpp/src/partition_manager.cpp | 4 +- src/cpp/src/quake_index.cpp | 6 +-- test/cpp/benchmark.cpp | 63 ++++++++++++++----------- test/cpp/quake_index.cpp | 52 +++++++++++--------- 12 files changed, 131 insertions(+), 74 deletions(-) diff --git a/src/cpp/include/clustering.h b/src/cpp/include/clustering.h index 9c8926ae..4b2552f3 100644 --- a/src/cpp/include/clustering.h +++ b/src/cpp/include/clustering.h @@ -14,7 +14,7 @@ shared_ptr kmeans(Tensor vectors, int n_clusters, MetricType metric_type, int niter = 5, - std::vector> data_frames = {}, + std::shared_ptr attributes_table = nullptr, Tensor initial_centroids = Tensor() ); diff --git a/src/cpp/include/common.h b/src/cpp/include/common.h index 48267909..ebd488b1 100644 --- a/src/cpp/include/common.h +++ b/src/cpp/include/common.h @@ -262,7 +262,7 @@ struct Clustering { Tensor partition_ids; vector vectors; vector vector_ids; - vector>> data_frames; + vector> attributes_tables; int64_t ntotal() const { int64_t n = 0; diff --git a/src/cpp/include/dynamic_inverted_list.h b/src/cpp/include/dynamic_inverted_list.h index 34b863e2..91c4bba9 100644 --- a/src/cpp/include/dynamic_inverted_list.h +++ b/src/cpp/include/dynamic_inverted_list.h @@ -144,7 +144,7 @@ namespace faiss { size_t n_entry, const idx_t *ids, const uint8_t *codes, - std::vector> data_frames + std::shared_ptr attributes_table ); /** diff --git a/src/cpp/include/index_partition.h b/src/cpp/include/index_partition.h index b1bc730b..c6789ef4 100644 --- a/src/cpp/include/index_partition.h +++ b/src/cpp/include/index_partition.h @@ -27,7 +27,7 @@ class IndexPartition { uint8_t* codes_ = nullptr; ///< Pointer to the encoded vectors (raw memory block) idx_t* ids_ = nullptr; ///< Pointer to the vector IDs - std::vector> data_frames_; + std::vector> attributes_tables_ = {}; /// Default constructor. IndexPartition() = default; @@ -87,7 +87,7 @@ class IndexPartition { * @param new_ids Pointer to the new vector IDs. * @param new_codes Pointer to the new encoded vectors. */ - void append(int64_t n_entry, const idx_t* new_ids, const uint8_t* new_codes, std::vector> data_frames={}); + void append(int64_t n_entry, const idx_t* new_ids, const uint8_t* new_codes, std::shared_ptr attributes_table=nullptr); /** * @brief Update existing entries in place. diff --git a/src/cpp/include/quake_index.h b/src/cpp/include/quake_index.h index 7aa057aa..9d776b6b 100644 --- a/src/cpp/include/quake_index.h +++ b/src/cpp/include/quake_index.h @@ -47,7 +47,7 @@ class QuakeIndex { * @param build_params Parameters for building the index. * @return Timing information for the build. */ - shared_ptr build(Tensor x, Tensor ids, shared_ptr build_params, std::vector> data_frame = {}); + shared_ptr build(Tensor x, Tensor ids, shared_ptr build_params, std::shared_ptr attributes_table = nullptr); /** * @brief Search for vectors in the index. diff --git a/src/cpp/src/clustering.cpp b/src/cpp/src/clustering.cpp index 05070138..0b091041 100644 --- a/src/cpp/src/clustering.cpp +++ b/src/cpp/src/clustering.cpp @@ -7,13 +7,16 @@ #include "clustering.h" #include #include "faiss/Clustering.h" +#include +#include +#include shared_ptr kmeans(Tensor vectors, Tensor ids, int n_clusters, MetricType metric_type, int niter, - std::vector> data_frames, + std::shared_ptr attributes_table, Tensor /* initial_centroids */) { // Ensure enough vectors are available and sizes match. assert(vectors.size(0) >= n_clusters * 2); @@ -53,18 +56,56 @@ shared_ptr kmeans(Tensor vectors, // Partition vectors and ids by cluster. vector cluster_vectors(n_clusters); vector cluster_ids(n_clusters); - vector>> cluster_data_frames(n_clusters); - + vector> cluster_attributes_tables(n_clusters); for (int i = 0; i < n_clusters; i++) { auto mask = (assignments == i); cluster_vectors[i] = vectors.index({mask}); cluster_ids[i] = ids.index({mask}); + + if(attributes_table == nullptr) { + cluster_attributes_tables[i] = nullptr; + continue; + } + + auto cluster_ids_tensor = cluster_ids[i]; // Assuming this is a tensor with IDs + std::vector cluster_ids_vec(cluster_ids_tensor.data(), + cluster_ids_tensor.data() + cluster_ids_tensor.numel()); + + // Convert to Arrow Array + arrow::Int64Builder id_builder; + id_builder.AppendValues(cluster_ids_vec); + std::shared_ptr cluster_ids_array; + id_builder.Finish(&cluster_ids_array); + + // Get the "id" column from the attributes table + std::shared_ptr id_column = attributes_table->GetColumnByName("id"); + + auto lookup_options = std::make_shared(cluster_ids_array); + // Apply set lookup to filter rows + auto result = arrow::compute::CallFunction( + "index_in", + {id_column->chunk(0)}, + lookup_options.get() + ); + + auto index_array = std::static_pointer_cast(result->make_array()); + + auto mask_result = arrow::compute::CallFunction( + "not_equal", + {index_array, arrow::MakeScalar(-1)} + ); + + // Convert result to a Boolean mask + auto mask_table = std::static_pointer_cast(mask_result->make_array()); + + // Filter the table using the mask + auto filtered_table_result = arrow::compute::Filter(attributes_table, mask_table); + + cluster_attributes_tables[i] = filtered_table_result->table(); + std::cout<ToString()<(); - cluster_data_frames[cluster_id].push_back(data_frames[j]); - } + Tensor partition_ids = torch::arange(n_clusters, torch::kInt64); @@ -73,7 +114,7 @@ shared_ptr kmeans(Tensor vectors, clustering->partition_ids = partition_ids; clustering->vectors = cluster_vectors; clustering->vector_ids = cluster_ids; - clustering->data_frames = cluster_data_frames; + clustering->attributes_tables = cluster_attributes_tables; delete index_ptr; return clustering; diff --git a/src/cpp/src/dynamic_inverted_list.cpp b/src/cpp/src/dynamic_inverted_list.cpp index 42af604a..4807a281 100644 --- a/src/cpp/src/dynamic_inverted_list.cpp +++ b/src/cpp/src/dynamic_inverted_list.cpp @@ -153,7 +153,7 @@ namespace faiss { size_t n_entry, const idx_t *ids, const uint8_t *codes, - std::vector> data_frames + shared_ptr attributes_table ) { if (n_entry == 0) { return 0; @@ -170,7 +170,7 @@ namespace faiss { part->set_code_size(static_cast(code_size)); } - part->append((int64_t) n_entry, ids, codes, data_frames); + part->append((int64_t) n_entry, ids, codes, attributes_table); return n_entry; } diff --git a/src/cpp/src/index_partition.cpp b/src/cpp/src/index_partition.cpp index 5ba15241..cac42430 100644 --- a/src/cpp/src/index_partition.cpp +++ b/src/cpp/src/index_partition.cpp @@ -49,15 +49,16 @@ void IndexPartition::set_code_size(int64_t code_size) { code_size_ = code_size; } -void IndexPartition::append(int64_t n_entry, const idx_t* new_ids, const uint8_t* new_codes, std::vector> data_frames) { +void IndexPartition::append(int64_t n_entry, const idx_t* new_ids, const uint8_t* new_codes, std::shared_ptr attributes_table) { if (n_entry <= 0) return; ensure_capacity(num_vectors_ + n_entry); const size_t code_bytes = static_cast(code_size_); std::memcpy(codes_ + num_vectors_ * code_bytes, new_codes, n_entry * code_bytes); std::memcpy(ids_ + num_vectors_, new_ids, n_entry * sizeof(idx_t)); - if(!data_frames.empty()) { - data_frames_.insert(data_frames_.end(), data_frames.begin(), data_frames.end()); + if(attributes_table!=nullptr){ + attributes_tables_.push_back(attributes_table); } + num_vectors_ += n_entry; } diff --git a/src/cpp/src/partition_manager.cpp b/src/cpp/src/partition_manager.cpp index 5358d808..211ddc06 100644 --- a/src/cpp/src/partition_manager.cpp +++ b/src/cpp/src/partition_manager.cpp @@ -78,7 +78,7 @@ void PartitionManager::init_partitions( for (int64_t i = 0; i < nlist; i++) { Tensor v = clustering->vectors[i]; Tensor id = clustering->vector_ids[i]; - std::vector> data_frames = clustering->data_frames[i]; + std::shared_ptr attributes_table = clustering->attributes_tables[i]; if (v.size(0) != id.size(0)) { throw runtime_error("[PartitionManager] init_partitions: mismatch in v.size(0) vs id.size(0)."); } @@ -106,7 +106,7 @@ void PartitionManager::init_partitions( count, id.data_ptr(), as_uint8_ptr(v), - data_frames + attributes_table ); if (debug_) { std::cout << "[PartitionManager] init_partitions: Added " << count diff --git a/src/cpp/src/quake_index.cpp b/src/cpp/src/quake_index.cpp index fd0531da..0895750a 100644 --- a/src/cpp/src/quake_index.cpp +++ b/src/cpp/src/quake_index.cpp @@ -25,7 +25,7 @@ QuakeIndex::~QuakeIndex() { maintenance_policy_params_ = nullptr; } -shared_ptr QuakeIndex::build(Tensor x, Tensor ids, shared_ptr build_params, std::vector> data_frames) { +shared_ptr QuakeIndex::build(Tensor x, Tensor ids, shared_ptr build_params, std::shared_ptr attributes_table) { build_params_ = build_params; metric_ = str_to_metric_type(build_params_->metric); @@ -46,7 +46,7 @@ shared_ptr QuakeIndex::build(Tensor x, Tensor ids, shared_ptrnlist, metric_, build_params_->niter, - data_frames + attributes_table ); auto e1 = std::chrono::high_resolution_clock::now(); timing_info->train_time_us = std::chrono::duration_cast(e1 - s1).count(); @@ -72,7 +72,7 @@ shared_ptr QuakeIndex::build(Tensor x, Tensor ids, shared_ptrcentroids = x.mean(0, true); clustering->vectors = {x}; clustering->vector_ids = {ids}; - clustering->data_frames = {data_frames}; + clustering->attributes_tables = {attributes_table}; partition_manager_->init_partitions(parent_, clustering); } diff --git a/test/cpp/benchmark.cpp b/test/cpp/benchmark.cpp index 0bc86d65..50501204 100644 --- a/test/cpp/benchmark.cpp +++ b/test/cpp/benchmark.cpp @@ -42,27 +42,34 @@ static Tensor generate_ids(int64_t num, int64_t start = 0) { return torch::arange(start, start + num, torch::kInt64); } -static std::vector> generate_data_frame(int64_t num_vectors) { +static std::shared_ptr generate_data_frame(int64_t num_vectors, torch::Tensor ids) { arrow::MemoryPool* pool = arrow::default_memory_pool(); - std::vector> tables; - for (int64_t i = 0; i < num_vectors; i++) { - arrow::DoubleBuilder price_builder(pool); - price_builder.Append(static_cast(i) * 1.5); - - std::shared_ptr price_array; - price_builder.Finish(&price_array); + // Builders for the "price" and "id" columns + arrow::DoubleBuilder price_builder(pool); + arrow::Int64Builder id_builder(pool); - std::vector> schema_vector = { - arrow::field("price", arrow::float64()) - }; - - auto schema = std::make_shared(schema_vector); - auto table = arrow::Table::Make(schema, {price_array}); - tables.push_back(table); + // Append values to the builders + for (int64_t i = 0; i < num_vectors; i++) { + price_builder.Append(static_cast(i) * 1.5); // Price column + id_builder.Append(ids[i].item()); // ID column from the input tensor } - return tables; + // Finalize the arrays + std::shared_ptr price_array; + std::shared_ptr id_array; + price_builder.Finish(&price_array); + id_builder.Finish(&id_array); + + // Define the schema with two fields: "price" and "id" + std::vector> schema_vector = { + arrow::field("id", arrow::int64()), + arrow::field("price", arrow::float64()), + }; + auto schema = std::make_shared(schema_vector); + + // Create and return the table with both columns + return arrow::Table::Make(schema, {id_array, price_array}); } // @@ -75,16 +82,16 @@ class QuakeSerialFlatBenchmark : public ::testing::Test { std::shared_ptr index_; Tensor data_; Tensor ids_; - std::vector> data_frame_; + std::shared_ptr attributes_table_; void SetUp() override { data_ = generate_data(NUM_VECTORS, DIM); ids_ = generate_ids(NUM_VECTORS); - data_frame_ = generate_data_frame(NUM_VECTORS); + attributes_table_ = generate_data_frame(NUM_VECTORS, ids_); index_ = std::make_shared(); auto build_params = std::make_shared(); build_params->nlist = 1; // flat index build_params->metric = "l2"; - index_->build(data_, ids_, build_params, data_frame_); + index_->build(data_, ids_, build_params, attributes_table_); } }; @@ -94,18 +101,18 @@ class QuakeWorkerFlatBenchmark : public ::testing::Test { std::shared_ptr index_; Tensor data_; Tensor ids_; - std::vector> data_frame_; + std::shared_ptr attributes_table_; void SetUp() override { data_ = generate_data(NUM_VECTORS, DIM); ids_ = generate_ids(NUM_VECTORS); - data_frame_ = generate_data_frame(NUM_VECTORS); + attributes_table_ = generate_data_frame(NUM_VECTORS, ids_); index_ = std::make_shared(); auto build_params = std::make_shared(); build_params->nlist = 1; // flat index build_params->metric = "l2"; // Use as many workers as hardware concurrency build_params->num_workers = std::thread::hardware_concurrency(); - index_->build(data_, ids_, build_params, data_frame_); + index_->build(data_, ids_, build_params, attributes_table_); } }; @@ -116,17 +123,17 @@ class QuakeSerialIVFBenchmark : public ::testing::Test { std::shared_ptr index_; Tensor data_; Tensor ids_; - std::vector> data_frame_; + std::shared_ptr attributes_table_; void SetUp() override { data_ = generate_data(NUM_VECTORS, DIM); ids_ = generate_ids(NUM_VECTORS); - data_frame_ = generate_data_frame(NUM_VECTORS); + attributes_table_ = generate_data_frame(NUM_VECTORS, ids_); index_ = std::make_shared(); auto build_params = std::make_shared(); build_params->nlist = N_LIST; // IVF index build_params->metric = "l2"; build_params->niter = 3; - index_->build(data_, ids_, build_params, data_frame_); + index_->build(data_, ids_, build_params, attributes_table_); } }; @@ -136,11 +143,11 @@ class QuakeWorkerIVFBenchmark : public ::testing::Test { std::shared_ptr index_; Tensor data_; Tensor ids_; - std::vector> data_frame_; + std::shared_ptr attributes_table_; void SetUp() override { data_ = generate_data(NUM_VECTORS, DIM); ids_ = generate_ids(NUM_VECTORS); - data_frame_ = generate_data_frame(NUM_VECTORS); + attributes_table_ = generate_data_frame(NUM_VECTORS, ids_); index_ = std::make_shared(); auto build_params = std::make_shared(); build_params->nlist = N_LIST; // IVF index @@ -148,7 +155,7 @@ class QuakeWorkerIVFBenchmark : public ::testing::Test { build_params->niter = 3; // Use as many workers as hardware concurrency build_params->num_workers = std::thread::hardware_concurrency(); - index_->build(data_, ids_, build_params, data_frame_); + index_->build(data_, ids_, build_params, attributes_table_); } }; diff --git a/test/cpp/quake_index.cpp b/test/cpp/quake_index.cpp index e3fa5fad..99223505 100644 --- a/test/cpp/quake_index.cpp +++ b/test/cpp/quake_index.cpp @@ -13,6 +13,7 @@ #include #include #include +#include // Helper functions for random data static torch::Tensor generate_random_data(int64_t num_vectors, int64_t dim) { @@ -23,27 +24,34 @@ static torch::Tensor generate_sequential_ids(int64_t count, int64_t start = 0) { return torch::arange(start, start + count, torch::kInt64); } -std::vector> generate_data_frame(int64_t num_vectors) { +static std::shared_ptr generate_data_frame(int64_t num_vectors, torch::Tensor ids) { arrow::MemoryPool* pool = arrow::default_memory_pool(); - std::vector> tables; - for (int64_t i = 0; i < num_vectors; i++) { - arrow::DoubleBuilder price_builder(pool); - price_builder.Append(static_cast(i) * 1.5); - - std::shared_ptr price_array; - price_builder.Finish(&price_array); + // Builders for the "price" and "id" columns + arrow::DoubleBuilder price_builder(pool); + arrow::Int64Builder id_builder(pool); - std::vector> schema_vector = { - arrow::field("price", arrow::float64()) - }; - - auto schema = std::make_shared(schema_vector); - auto table = arrow::Table::Make(schema, {price_array}); - tables.push_back(table); + // Append values to the builders + for (int64_t i = 0; i < num_vectors; i++) { + price_builder.Append(static_cast(i) * 1.5); // Price column + id_builder.Append(ids[i].item()); // ID column from the input tensor } - return tables; + // Finalize the arrays + std::shared_ptr price_array; + std::shared_ptr id_array; + price_builder.Finish(&price_array); + id_builder.Finish(&id_array); + + // Define the schema with two fields: "price" and "id" + std::vector> schema_vector = { + arrow::field("id", arrow::int64()), + arrow::field("price", arrow::float64()), + }; + auto schema = std::make_shared(schema_vector); + + // Create and return the table with both columns + return arrow::Table::Make(schema, {id_array, price_array}); } class QuakeIndexTest : public ::testing::Test { @@ -62,7 +70,7 @@ class QuakeIndexTest : public ::testing::Test { torch::Tensor query_vectors_; // Arrow data - std::vector> data_frames_; + std::shared_ptr attributes_table; void SetUp() override { // Generate random data @@ -74,7 +82,7 @@ class QuakeIndexTest : public ::testing::Test { query_vectors_ = generate_random_data(num_queries_, dimension_); // Arrow data - data_frames_ = generate_data_frame(num_vectors_); + attributes_table = generate_data_frame(num_vectors_, data_ids_); } }; @@ -99,7 +107,7 @@ TEST_F(QuakeIndexTest, BuildTest) { build_params->metric = "l2"; build_params->niter = 5; // small kmeans iteration - auto timing_info = index.build(data_vectors_, data_ids_, build_params, data_frames_); + auto timing_info = index.build(data_vectors_, data_ids_, build_params, attributes_table); // Check that we created partition_manager_, parent_, etc. EXPECT_NE(index.partition_manager_, nullptr); @@ -295,7 +303,7 @@ TEST(QuakeIndexStressTest, LargeBuildTest) { int64_t num_vectors = 1e6; // 1 million vectors auto data_vectors = generate_random_data(num_vectors, dimension); auto data_ids = generate_sequential_ids(num_vectors, 0); - auto data_frames = generate_data_frame(num_vectors); + auto data_frames = generate_data_frame(num_vectors, data_ids); QuakeIndex index; @@ -333,7 +341,7 @@ TEST(QuakeIndexStressTest, RepeatedBuildSearchTest) { // Pre-generate data auto data_vectors = generate_random_data(num_vectors, dimension); auto data_ids = generate_sequential_ids(num_vectors, 1000); - auto data_frames = generate_data_frame(num_vectors); + auto data_frames = generate_data_frame(num_vectors, data_ids); auto query_vectors = generate_random_data(num_queries, dimension); for (int i = 0; i < iteration_count; i++) { @@ -458,7 +466,7 @@ TEST(QuakeIndexStressTest, HighDimensionTest) { int64_t num_vectors = 5000; auto data_vectors = generate_random_data(num_vectors, dimension); auto data_ids = generate_sequential_ids(num_vectors); - auto data_frames = generate_data_frame(num_vectors); + auto data_frames = generate_data_frame(num_vectors, data_ids); QuakeIndex index; auto build_params = std::make_shared(); From b8012db637fd739dab5359d1e42a6d0d63845694 Mon Sep 17 00:00:00 2001 From: Patron Date: Sat, 22 Mar 2025 19:00:11 -0500 Subject: [PATCH 07/21] modified gitignore --- .gitignore | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index ed8ebf58..37bbc2a3 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ -__pycache__ \ No newline at end of file +__pycache__ +.vscode +build/ +quake.egg-info/ \ No newline at end of file From 8281295be668c1a7ff7f6c02d5907112c02c8b57 Mon Sep 17 00:00:00 2001 From: Sujan Reddy Date: Tue, 1 Apr 2025 11:43:36 -0500 Subject: [PATCH 08/21] index partition bug fix --- src/cpp/include/index_partition.h | 2 +- src/cpp/src/clustering.cpp | 1 - src/cpp/src/index_partition.cpp | 10 +++++++--- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/cpp/include/index_partition.h b/src/cpp/include/index_partition.h index c6789ef4..29d61626 100644 --- a/src/cpp/include/index_partition.h +++ b/src/cpp/include/index_partition.h @@ -27,7 +27,7 @@ class IndexPartition { uint8_t* codes_ = nullptr; ///< Pointer to the encoded vectors (raw memory block) idx_t* ids_ = nullptr; ///< Pointer to the vector IDs - std::vector> attributes_tables_ = {}; + std::shared_ptr attributes_table_ = {}; /// Default constructor. IndexPartition() = default; diff --git a/src/cpp/src/clustering.cpp b/src/cpp/src/clustering.cpp index 0b091041..3505de3b 100644 --- a/src/cpp/src/clustering.cpp +++ b/src/cpp/src/clustering.cpp @@ -103,7 +103,6 @@ shared_ptr kmeans(Tensor vectors, auto filtered_table_result = arrow::compute::Filter(attributes_table, mask_table); cluster_attributes_tables[i] = filtered_table_result->table(); - std::cout<ToString()<(code_size_); std::memcpy(codes_ + num_vectors_ * code_bytes, new_codes, n_entry * code_bytes); std::memcpy(ids_ + num_vectors_, new_ids, n_entry * sizeof(idx_t)); - if(attributes_table!=nullptr){ - attributes_tables_.push_back(attributes_table); + // append attributes_table to attributes_table_ + if (attributes_table_ == nullptr) { + attributes_table_ = attributes_table; + } else { + // Concatenate the new attributes table with the existing one + auto concatenated_table = arrow::ConcatenateTables({attributes_table_, attributes_table}); + attributes_table_ = concatenated_table.ValueOrDie(); } - num_vectors_ += n_entry; } From 3db5f45ca4c4bec19282e2a088cf2fa14b5cd38e Mon Sep 17 00:00:00 2001 From: Patron Date: Tue, 1 Apr 2025 11:51:40 -0500 Subject: [PATCH 09/21] wip --- src/cpp/include/index_partition.h | 9 +++++++ src/cpp/include/partition_manager.h | 2 +- src/cpp/src/index_partition.cpp | 37 +++++++++++++++++++++++++++++ src/cpp/src/partition_manager.cpp | 27 +++++++++++++++++++-- src/cpp/src/quake_index.cpp | 4 ++-- 5 files changed, 74 insertions(+), 5 deletions(-) diff --git a/src/cpp/include/index_partition.h b/src/cpp/include/index_partition.h index c6789ef4..f0da522f 100644 --- a/src/cpp/include/index_partition.h +++ b/src/cpp/include/index_partition.h @@ -110,6 +110,15 @@ class IndexPartition { */ void remove(int64_t index); + /** + * @brief Remove an attribute of an entry from the partition. Used in conjuntion with the + * + * Removes the attribute by performing masking & filtering + * + * @param index Index of the vector to remove. + */ + void removeAttribute(int64_t index); + /** * @brief Resize the partition. * diff --git a/src/cpp/include/partition_manager.h b/src/cpp/include/partition_manager.h index d1332621..3e22e895 100644 --- a/src/cpp/include/partition_manager.h +++ b/src/cpp/include/partition_manager.h @@ -57,7 +57,7 @@ class PartitionManager { * @param assignments Tensor of shape [num_vectors] containing partition IDs. If not provided, vectors are assigned using the parent index. * @return Timing information for the operation. */ - shared_ptr add(const Tensor &vectors, const Tensor &vector_ids, const Tensor &assignments = Tensor(), bool check_uniques = true); + shared_ptr add(const Tensor &vectors, const Tensor &vector_ids, const Tensor &assignments = Tensor(), bool check_uniques = true, std::shared_ptr attributes_table); /** * @brief Remove vectors by ID from the index. diff --git a/src/cpp/src/index_partition.cpp b/src/cpp/src/index_partition.cpp index cac42430..37275e85 100644 --- a/src/cpp/src/index_partition.cpp +++ b/src/cpp/src/index_partition.cpp @@ -55,6 +55,7 @@ void IndexPartition::append(int64_t n_entry, const idx_t* new_ids, const uint8_t const size_t code_bytes = static_cast(code_size_); std::memcpy(codes_ + num_vectors_ * code_bytes, new_codes, n_entry * code_bytes); std::memcpy(ids_ + num_vectors_, new_ids, n_entry * sizeof(idx_t)); + // TODO recheck where codes_, ids_ and attributes_tables_ are used if(attributes_table!=nullptr){ attributes_tables_.push_back(attributes_table); } @@ -89,6 +90,42 @@ void IndexPartition::remove(int64_t index) { ids_[index] = ids_[last_idx]; num_vectors_--; + + removeAttribute(index); +} + +void IndexPartition::removeAttribute(int64_t index) { + if (index < 0 || index >= static_cast(attributes_tables_.size())) { + throw std::runtime_error("Index out of range in remove"); + } + + for (size_t table_idx = 0; table_idx < attributes_tables_.size(); ++table_idx) { + auto& table = attributes_tables_[table_idx]; + + // Find "id" column + auto id_col_idx = table->schema()->GetFieldIndex("id"); + if (id_col_idx == -1) { + throw std::runtime_error("id column not present in this attribute table"); + } + + auto id_column = std::static_pointer_cast(table->column(id_col_idx)->chunk(0)); + + // Search for the row with matching id + for (int64_t row = 0; row < id_column->length(); ++row) { + if (!id_column->IsNull(row) && id_column->Value(row) == index) { + // Remove the row and update the table + arrow::compute::ExecContext ctx; + auto filter_mask = arrow::compute::NotEqual(arrow::compute::MakeScalar(index)).ValueOrDie(); + auto filtered_table = arrow::compute::Filter(table, filter_mask, &ctx).ValueOrDie(); + + // Replace the table in attributes_tables_ + attributes_tables_[table_idx] = filtered_table; + return; // Stop after removing the row + } + } + } + + throw std::runtime_error("Index not found in any table."); } void IndexPartition::resize(int64_t new_capacity) { diff --git a/src/cpp/src/partition_manager.cpp b/src/cpp/src/partition_manager.cpp index 211ddc06..fe415a73 100644 --- a/src/cpp/src/partition_manager.cpp +++ b/src/cpp/src/partition_manager.cpp @@ -127,7 +127,8 @@ shared_ptr PartitionManager::add( const Tensor &vectors, const Tensor &vector_ids, const Tensor &assignments, - bool check_uniques + bool check_uniques, + std::shared_ptr attributes_table ) { auto timing_info = std::make_shared(); @@ -145,12 +146,26 @@ shared_ptr PartitionManager::add( throw runtime_error("[PartitionManager] add: partitions_ is null. Did you call init_partitions?"); } + if(!attributes_table){ + throw runtime_error("[PartitionManager] add: attributes_table is null. Please add attributes for the vectors"); + } + if (!vectors.defined() || !vector_ids.defined()) { throw runtime_error("[PartitionManager] add: vectors or vector_ids is undefined."); } + if (vectors.size(0) != vector_ids.size(0)) { throw runtime_error("[PartitionManager] add: mismatch in vectors.size(0) and vector_ids.size(0)."); } + + if(attributes_table->num_rows!= vector_ids.size(0)){ + throw runtime_error("[PartitionManager] add: mismatch in attributes_table and vector_ids size."); + } + + if(!attributes_table->GetColumnByName("id")){ + throw runtime_error("[PartitionManager] add: No vector_id column in attributes_table"); + } + int64_t n = vectors.size(0); if (n == 0) { if (debug_) { @@ -186,6 +201,9 @@ shared_ptr PartitionManager::add( } } + // TODO: input validations for attributes table - is it null, size==0, check if vector_ids in table are unique? + + // checks assignments are less than partitions_->curr_list_id_ if (assignments.defined() && (assignments >= curr_partition_id_).any().item()) { throw runtime_error("[PartitionManager] add: assignments must be less than partitions_->curr_list_id_."); @@ -256,8 +274,10 @@ shared_ptr PartitionManager::add( pid, /*n_entry=*/1, id_ptr + i, - code_ptr + i * code_size_bytes + code_ptr + i * code_size_bytes, + attributes_table ); + } auto e3 = std::chrono::high_resolution_clock::now(); timing_info->modify_time_us = std::chrono::duration_cast(e3 - s3).count(); @@ -297,6 +317,8 @@ shared_ptr PartitionManager::remove(const Tensor &ids) { throw runtime_error("[PartitionManager] remove: vector ID does not exist in the index."); } resident_ids_.erase(id_val); + + // TODO: Remove associated attribute data as well } } auto e1 = std::chrono::high_resolution_clock::now(); @@ -313,6 +335,7 @@ shared_ptr PartitionManager::remove(const Tensor &ids) { auto s3 = std::chrono::high_resolution_clock::now(); partitions_->remove_vectors(to_remove); + // TODO: Remove associated attribute data as well??? if (debug_) { std::cout << "[PartitionManager] remove: Completed removal." << std::endl; } diff --git a/src/cpp/src/quake_index.cpp b/src/cpp/src/quake_index.cpp index 0895750a..b2473267 100644 --- a/src/cpp/src/quake_index.cpp +++ b/src/cpp/src/quake_index.cpp @@ -118,12 +118,12 @@ Tensor QuakeIndex::get(Tensor ids) { return partition_manager_->get(ids); } -shared_ptr QuakeIndex::add(Tensor x, Tensor ids) { +shared_ptr QuakeIndex::add(Tensor x, Tensor ids, std::shared_ptr attributes_table) { if (!partition_manager_) { throw std::runtime_error("[QuakeIndex::add()] No partition manager. Build the index first."); } - auto modify_info = partition_manager_->add(x, ids); + auto modify_info = partition_manager_->add(x, ids, attributes_table); modify_info->n_vectors = x.size(0); return modify_info; } From 318bb8c23db88fd8d9365054826e15485fec5953 Mon Sep 17 00:00:00 2001 From: Amith Date: Tue, 1 Apr 2025 21:49:07 -0500 Subject: [PATCH 10/21] written code for add & remove --- src/cpp/include/dynamic_inverted_list.h | 2 +- src/cpp/include/index_partition.h | 2 +- src/cpp/include/partition_manager.h | 8 +++ src/cpp/src/index_partition.cpp | 67 ++++++++++++++++--------- src/cpp/src/partition_manager.cpp | 53 ++++++++++++++++--- 5 files changed, 100 insertions(+), 32 deletions(-) diff --git a/src/cpp/include/dynamic_inverted_list.h b/src/cpp/include/dynamic_inverted_list.h index 91c4bba9..2490cc68 100644 --- a/src/cpp/include/dynamic_inverted_list.h +++ b/src/cpp/include/dynamic_inverted_list.h @@ -135,7 +135,7 @@ namespace faiss { * @param n_entry Number of entries to add. * @param ids Pointer to the vector IDs. * @param codes Pointer to the encoded vectors. - * @param data_frames Arrow data frames. + * @param data_frames Arrow data frames for the attributes. * @return Number of entries added. * @throws std::runtime_error if the partition does not exist. */ diff --git a/src/cpp/include/index_partition.h b/src/cpp/include/index_partition.h index bbf87fde..e67988a6 100644 --- a/src/cpp/include/index_partition.h +++ b/src/cpp/include/index_partition.h @@ -111,7 +111,7 @@ class IndexPartition { void remove(int64_t index); /** - * @brief Remove an attribute of an entry from the partition. Used in conjuntion with the + * @brief Remove the associated attribute of an entry from the partition. Used in conjuntion with the remove(index) function * * Removes the attribute by performing masking & filtering * diff --git a/src/cpp/include/partition_manager.h b/src/cpp/include/partition_manager.h index 3e22e895..becbc64d 100644 --- a/src/cpp/include/partition_manager.h +++ b/src/cpp/include/partition_manager.h @@ -59,6 +59,14 @@ class PartitionManager { */ shared_ptr add(const Tensor &vectors, const Tensor &vector_ids, const Tensor &assignments = Tensor(), bool check_uniques = true, std::shared_ptr attributes_table); + /** + * @brief Filter the appropriate row from the attribute table + * @param table Arrow table for the attributes. + * @param vector_id Vector_id by which we are filtering. + * @return Table containing only the row pertaining to the vector_id + */ + std::shared_ptr filterRowById(std::shared_ptr table, int64_t vector_id); + /** * @brief Remove vectors by ID from the index. * @param ids Tensor of shape [num_to_remove]. diff --git a/src/cpp/src/index_partition.cpp b/src/cpp/src/index_partition.cpp index b286297a..3a3d7bd6 100644 --- a/src/cpp/src/index_partition.cpp +++ b/src/cpp/src/index_partition.cpp @@ -97,38 +97,57 @@ void IndexPartition::remove(int64_t index) { removeAttribute(index); } +// https://github.com/apache/arrow/issues/44243 +// Arrow data is immutable. So you can't delete a row from existing Arrow data. +// You need to create a new Arrow data that doesn't have the target row. void IndexPartition::removeAttribute(int64_t index) { - if (index < 0 || index >= static_cast(attributes_tables_.size())) { - throw std::runtime_error("Index out of range in remove"); + assert(table && "Input table is null"); + + int64_t original_size = table->num_rows(); + if(original_size==0){ + std::cerr << "No attributes found in the table.\n"; + return; } - for (size_t table_idx = 0; table_idx < attributes_tables_.size(); ++table_idx) { - auto& table = attributes_tables_[table_idx]; - - // Find "id" column - auto id_col_idx = table->schema()->GetFieldIndex("id"); - if (id_col_idx == -1) { - throw std::runtime_error("id column not present in this attribute table"); - } + // Find the column index for "id" + auto schema = attributes_tables_->schema(); + int id_column_index = schema->GetFieldIndex("id"); - auto id_column = std::static_pointer_cast(table->column(id_col_idx)->chunk(0)); + if (id_column_index == -1) { + std::cerr << "Error: Column 'id' not found in table.\n"; + return; + } - // Search for the row with matching id - for (int64_t row = 0; row < id_column->length(); ++row) { - if (!id_column->IsNull(row) && id_column->Value(row) == index) { - // Remove the row and update the table - arrow::compute::ExecContext ctx; - auto filter_mask = arrow::compute::NotEqual(arrow::compute::MakeScalar(index)).ValueOrDie(); - auto filtered_table = arrow::compute::Filter(table, filter_mask, &ctx).ValueOrDie(); + // Get the column as a ChunkedArray + auto id_chunked_array = attributes_tables_->column(id_column_index); - // Replace the table in attributes_tables_ - attributes_tables_[table_idx] = filtered_table; - return; // Stop after removing the row - } - } + // Concatenate all chunks into a single array + auto concat_result = arrow::Concatenate(id_chunked_array->chunks()); + if (!concat_result.ok()) { + std::cerr << "Error concatenating chunks: " << concat_result.status().ToString() << std::endl; + return; } - throw std::runtime_error("Index not found in any table."); + std::shared_ptr id_array = concat_result.ValueOrDie(); + + // Create a boolean mask for filtering + auto filter_result = arrow::compute::NotEqual(arrow::Datum(id_array), arrow::Datum(target_id)); + if (!filter_result.ok()) { + std::cerr << "Error computing filter mask: " << filter_result.status().ToString() << std::endl; + return; + } + + // Apply the filter to create a new table + auto filtered_table_result = arrow::compute::Filter(table, filter_result.ValueOrDie()); + if (!filtered_table_result.ok()) { + std::cerr << "Error filtering table: " << filtered_table_result.status().ToString() << std::endl; + return; + } + + assert(filtered_table_result->num_rows() < original_size && "Table size did not decrease after filtering"); + + attributes_tables_ = filtered_table_result.ValueOrDie(); + } void IndexPartition::resize(int64_t new_capacity) { diff --git a/src/cpp/src/partition_manager.cpp b/src/cpp/src/partition_manager.cpp index fe415a73..f829ccd0 100644 --- a/src/cpp/src/partition_manager.cpp +++ b/src/cpp/src/partition_manager.cpp @@ -123,6 +123,50 @@ void PartitionManager::init_partitions( } } +std::shared_ptr filterRowById(std::shared_ptr table, int64_t idToFind) { + // Find the column index for "id" + auto schema = attributes_tables_->schema(); + int id_column_index = schema->GetFieldIndex("id"); + + if (id_column_index == -1) { + std::cerr << "Error: Column 'id' not found in table.\n"; + return nullptr; + } + + // Get the column as a ChunkedArray + auto id_chunked_array = attributes_tables_->column(id_column_index); + + // Concatenate all chunks into a single array + auto concat_result = arrow::Concatenate(id_chunked_array->chunks()); + if (!concat_result.ok()) { + std::cerr << "Error concatenating chunks: " << concat_result.status().ToString() << std::endl; + return nullptr; + } + + std::shared_ptr id_array = concat_result.ValueOrDie(); + + // Create a boolean mask for filtering + auto filter_result = arrow::compute::Equal(arrow::Datum(id_array), arrow::Datum(target_id)); + if (!filter_result.ok()) { + std::cerr << "Error computing filter mask: " << filter_result.status().ToString() << std::endl; + return nullptr; + } + + // Apply the filter to create a new table + auto filtered_table_result = arrow::compute::Filter(table, filter_result.ValueOrDie()); + if (!filtered_table_result.ok()) { + std::cerr << "Error filtering table: " << filtered_table_result.status().ToString() << std::endl; + return nullptr; + } + + std::shared_ptr filtered_table = filtered_table_result.ValueOrDie(); + + assert(filtered_table->num_rows() == 1 && "Table size is not 1 after the filter"); + + return filtered_table; +} + + shared_ptr PartitionManager::add( const Tensor &vectors, const Tensor &vector_ids, @@ -201,9 +245,6 @@ shared_ptr PartitionManager::add( } } - // TODO: input validations for attributes table - is it null, size==0, check if vector_ids in table are unique? - - // checks assignments are less than partitions_->curr_list_id_ if (assignments.defined() && (assignments >= curr_partition_id_).any().item()) { throw runtime_error("[PartitionManager] add: assignments must be less than partitions_->curr_list_id_."); @@ -270,12 +311,14 @@ shared_ptr PartitionManager::add( << " into partition " << pid << std::endl; } + std::shared_ptr filtered_table_result = filterRowById(attributes_table, search_id); + partitions_->add_entries( pid, /*n_entry=*/1, id_ptr + i, code_ptr + i * code_size_bytes, - attributes_table + filtered_table_result ); } @@ -317,8 +360,6 @@ shared_ptr PartitionManager::remove(const Tensor &ids) { throw runtime_error("[PartitionManager] remove: vector ID does not exist in the index."); } resident_ids_.erase(id_val); - - // TODO: Remove associated attribute data as well } } auto e1 = std::chrono::high_resolution_clock::now(); From 3bc3e5e9ce6470e5c94e584a26fc9a4be47694ad Mon Sep 17 00:00:00 2001 From: Amith Date: Wed, 2 Apr 2025 18:04:51 -0500 Subject: [PATCH 11/21] compile works --- src/cpp/include/partition_manager.h | 2 +- src/cpp/include/quake_index.h | 3 +- src/cpp/src/index_partition.cpp | 59 ++++++++++++--------------- src/cpp/src/partition_manager.cpp | 63 ++++++++++++----------------- src/cpp/src/quake_index.cpp | 2 +- test/cpp/quake_index.cpp | 4 +- 6 files changed, 57 insertions(+), 76 deletions(-) diff --git a/src/cpp/include/partition_manager.h b/src/cpp/include/partition_manager.h index becbc64d..f4ba0d80 100644 --- a/src/cpp/include/partition_manager.h +++ b/src/cpp/include/partition_manager.h @@ -57,7 +57,7 @@ class PartitionManager { * @param assignments Tensor of shape [num_vectors] containing partition IDs. If not provided, vectors are assigned using the parent index. * @return Timing information for the operation. */ - shared_ptr add(const Tensor &vectors, const Tensor &vector_ids, const Tensor &assignments = Tensor(), bool check_uniques = true, std::shared_ptr attributes_table); + shared_ptr add(const Tensor &vectors, const Tensor &vector_ids, const Tensor &assignments = Tensor(), bool check_uniques = true,std::shared_ptr attributes_table = {}); /** * @brief Filter the appropriate row from the attribute table diff --git a/src/cpp/include/quake_index.h b/src/cpp/include/quake_index.h index 9d776b6b..f39990b2 100644 --- a/src/cpp/include/quake_index.h +++ b/src/cpp/include/quake_index.h @@ -73,9 +73,10 @@ class QuakeIndex { * @brief Add vectors to the index. * @param x Tensor of shape [num_vectors, dimension]. * @param ids Tensor of shape [num_vectors]. + * * @param attributes_table Associated attribute_table for each vector_id. * @return Timing information for the add operation. */ - shared_ptr add(Tensor x, Tensor ids); + shared_ptr add(Tensor x, Tensor ids, std::shared_ptr attributes_table = {}); /** * @brief Remove vectors from the index. diff --git a/src/cpp/src/index_partition.cpp b/src/cpp/src/index_partition.cpp index 3a3d7bd6..a2f16092 100644 --- a/src/cpp/src/index_partition.cpp +++ b/src/cpp/src/index_partition.cpp @@ -5,6 +5,9 @@ // - Use descriptive variable names #include +#include +#include +#include IndexPartition::IndexPartition(int64_t num_vectors, uint8_t* codes, @@ -100,54 +103,42 @@ void IndexPartition::remove(int64_t index) { // https://github.com/apache/arrow/issues/44243 // Arrow data is immutable. So you can't delete a row from existing Arrow data. // You need to create a new Arrow data that doesn't have the target row. -void IndexPartition::removeAttribute(int64_t index) { - assert(table && "Input table is null"); +void IndexPartition::removeAttribute(int64_t target_id) { - int64_t original_size = table->num_rows(); + assert(attributes_table_ && "Input table is null"); + + int64_t original_size = attributes_table_->num_rows(); if(original_size==0){ std::cerr << "No attributes found in the table.\n"; return; } - // Find the column index for "id" - auto schema = attributes_tables_->schema(); - int id_column_index = schema->GetFieldIndex("id"); - - if (id_column_index == -1) { - std::cerr << "Error: Column 'id' not found in table.\n"; - return; - } - - // Get the column as a ChunkedArray - auto id_chunked_array = attributes_tables_->column(id_column_index); - - // Concatenate all chunks into a single array - auto concat_result = arrow::Concatenate(id_chunked_array->chunks()); - if (!concat_result.ok()) { - std::cerr << "Error concatenating chunks: " << concat_result.status().ToString() << std::endl; + + auto id_column = attributes_table_->GetColumnByName("id"); + if (!id_column) { + std::cerr << "Column 'id' not found in table." << std::endl; return; } + + // Create a filter expression (id != target_id) + auto column_data = id_column->chunk(0); + auto scalar_value = arrow::MakeScalar(target_id); + auto filter_expr = arrow::compute::CallFunction("not_equal", {column_data, scalar_value}); - std::shared_ptr id_array = concat_result.ValueOrDie(); - // Create a boolean mask for filtering - auto filter_result = arrow::compute::NotEqual(arrow::Datum(id_array), arrow::Datum(target_id)); - if (!filter_result.ok()) { - std::cerr << "Error computing filter mask: " << filter_result.status().ToString() << std::endl; + if (!filter_expr.ok()) { + std::cerr << "Error creating filter expression: " << filter_expr.status().ToString() << std::endl; return; } - - // Apply the filter to create a new table - auto filtered_table_result = arrow::compute::Filter(table, filter_result.ValueOrDie()); - if (!filtered_table_result.ok()) { - std::cerr << "Error filtering table: " << filtered_table_result.status().ToString() << std::endl; + + // Apply the filter + auto result = arrow::compute::Filter(attributes_table_, filter_expr.ValueOrDie()); + if (!result.ok()) { + std::cerr << "Error filtering table: " << result.status().ToString() << std::endl; return; } - - assert(filtered_table_result->num_rows() < original_size && "Table size did not decrease after filtering"); - - attributes_tables_ = filtered_table_result.ValueOrDie(); - + + attributes_table_ = result.ValueOrDie().table(); } void IndexPartition::resize(int64_t new_capacity) { diff --git a/src/cpp/src/partition_manager.cpp b/src/cpp/src/partition_manager.cpp index f829ccd0..62f39321 100644 --- a/src/cpp/src/partition_manager.cpp +++ b/src/cpp/src/partition_manager.cpp @@ -12,6 +12,8 @@ #include #include "quake_index.h" #include +#include +#include using std::runtime_error; @@ -123,47 +125,34 @@ void PartitionManager::init_partitions( } } -std::shared_ptr filterRowById(std::shared_ptr table, int64_t idToFind) { - // Find the column index for "id" - auto schema = attributes_tables_->schema(); - int id_column_index = schema->GetFieldIndex("id"); - - if (id_column_index == -1) { - std::cerr << "Error: Column 'id' not found in table.\n"; +std::shared_ptr PartitionManager::filterRowById( + std::shared_ptr table, + int64_t target_id +) { + auto id_column = table->GetColumnByName("id"); + if (!id_column) { + std::cerr << "Column 'id' not found in table." << std::endl; return nullptr; } - - // Get the column as a ChunkedArray - auto id_chunked_array = attributes_tables_->column(id_column_index); - - // Concatenate all chunks into a single array - auto concat_result = arrow::Concatenate(id_chunked_array->chunks()); - if (!concat_result.ok()) { - std::cerr << "Error concatenating chunks: " << concat_result.status().ToString() << std::endl; + + // Create a filter expression (id == target_id) + arrow::Datum column_data = id_column->chunk(0); + arrow::Datum scalar_value = arrow::MakeScalar(target_id); + auto filter_expr = arrow::compute::CallFunction("equal", {column_data, scalar_value}); + + if (!filter_expr.ok()) { + std::cerr << "Error creating filter expression: " << filter_expr.status().ToString() << std::endl; return nullptr; } - - std::shared_ptr id_array = concat_result.ValueOrDie(); - - // Create a boolean mask for filtering - auto filter_result = arrow::compute::Equal(arrow::Datum(id_array), arrow::Datum(target_id)); - if (!filter_result.ok()) { - std::cerr << "Error computing filter mask: " << filter_result.status().ToString() << std::endl; + + // Apply the filter + auto result = arrow::compute::Filter(table, filter_expr.ValueOrDie()); + if (!result.ok()) { + std::cerr << "Error filtering table: " << result.status().ToString() << std::endl; return nullptr; } - - // Apply the filter to create a new table - auto filtered_table_result = arrow::compute::Filter(table, filter_result.ValueOrDie()); - if (!filtered_table_result.ok()) { - std::cerr << "Error filtering table: " << filtered_table_result.status().ToString() << std::endl; - return nullptr; - } - - std::shared_ptr filtered_table = filtered_table_result.ValueOrDie(); - - assert(filtered_table->num_rows() == 1 && "Table size is not 1 after the filter"); - - return filtered_table; + + return result.ValueOrDie().table(); } @@ -202,7 +191,7 @@ shared_ptr PartitionManager::add( throw runtime_error("[PartitionManager] add: mismatch in vectors.size(0) and vector_ids.size(0)."); } - if(attributes_table->num_rows!= vector_ids.size(0)){ + if(attributes_table->num_rows()!= vector_ids.size(0)){ throw runtime_error("[PartitionManager] add: mismatch in attributes_table and vector_ids size."); } @@ -311,7 +300,7 @@ shared_ptr PartitionManager::add( << " into partition " << pid << std::endl; } - std::shared_ptr
filtered_table_result = filterRowById(attributes_table, search_id); + std::shared_ptr filtered_table_result = filterRowById(attributes_table, id_accessor[i]); partitions_->add_entries( pid, diff --git a/src/cpp/src/quake_index.cpp b/src/cpp/src/quake_index.cpp index b2473267..45b10099 100644 --- a/src/cpp/src/quake_index.cpp +++ b/src/cpp/src/quake_index.cpp @@ -123,7 +123,7 @@ shared_ptr QuakeIndex::add(Tensor x, Tensor ids, std::shared_p throw std::runtime_error("[QuakeIndex::add()] No partition manager. Build the index first."); } - auto modify_info = partition_manager_->add(x, ids, attributes_table); + auto modify_info = partition_manager_->add(x, ids, Tensor(), true, attributes_table); modify_info->n_vectors = x.size(0); return modify_info; } diff --git a/test/cpp/quake_index.cpp b/test/cpp/quake_index.cpp index 99223505..b4fae661 100644 --- a/test/cpp/quake_index.cpp +++ b/test/cpp/quake_index.cpp @@ -229,7 +229,7 @@ TEST_F(QuakeIndexTest, AddTest) { Tensor add_vectors = generate_random_data(10, dimension_); Tensor add_ids = generate_sequential_ids(10, 1000); - auto modify_info = index.add(add_vectors, add_ids); + auto modify_info = index.add(add_vectors, add_ids, attributes_table); EXPECT_EQ(modify_info->n_vectors, 10); EXPECT_GE(modify_info->modify_time_us, 0); } @@ -241,7 +241,7 @@ TEST_F(QuakeIndexTest, RemoveTest) { // Build auto build_params = std::make_shared(); build_params->nlist = nlist_; - index.build(data_vectors_, data_ids_, build_params); + index.build(data_vectors_, data_ids_, build_params, attributes_table); // remove half of them int64_t remove_count = num_vectors_ / 2; From bf953a275bab63f3cedb289b8014645172b3af28 Mon Sep 17 00:00:00 2001 From: Sujan Reddy Date: Fri, 4 Apr 2025 19:20:20 -0500 Subject: [PATCH 12/21] index partition bug fix --- src/cpp/src/dynamic_inverted_list.cpp | 2 +- src/cpp/src/index_partition.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cpp/src/dynamic_inverted_list.cpp b/src/cpp/src/dynamic_inverted_list.cpp index 4807a281..fc157f67 100644 --- a/src/cpp/src/dynamic_inverted_list.cpp +++ b/src/cpp/src/dynamic_inverted_list.cpp @@ -180,7 +180,7 @@ namespace faiss { const idx_t *ids, const uint8_t *codes ) { - return add_entries(list_no, n_entry, ids, codes, {}); + return add_entries(list_no, n_entry, ids, codes, nullptr); } void DynamicInvertedLists::update_entries( diff --git a/src/cpp/src/index_partition.cpp b/src/cpp/src/index_partition.cpp index a2f16092..7e31f55f 100644 --- a/src/cpp/src/index_partition.cpp +++ b/src/cpp/src/index_partition.cpp @@ -61,7 +61,7 @@ void IndexPartition::append(int64_t n_entry, const idx_t* new_ids, const uint8_t // append attributes_table to attributes_table_ if (attributes_table_ == nullptr) { attributes_table_ = attributes_table; - } else { + } else if (attributes_table != nullptr) { // Concatenate the new attributes table with the existing one auto concatenated_table = arrow::ConcatenateTables({attributes_table_, attributes_table}); attributes_table_ = concatenated_table.ValueOrDie(); From e70db43e0e08d82505c67602396a99c8d2b6ff6f Mon Sep 17 00:00:00 2001 From: Sujan Reddy Date: Fri, 4 Apr 2025 21:17:13 -0500 Subject: [PATCH 13/21] handled null attributes table --- src/cpp/src/partition_manager.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/cpp/src/partition_manager.cpp b/src/cpp/src/partition_manager.cpp index 62f39321..6aa0ab86 100644 --- a/src/cpp/src/partition_manager.cpp +++ b/src/cpp/src/partition_manager.cpp @@ -80,7 +80,10 @@ void PartitionManager::init_partitions( for (int64_t i = 0; i < nlist; i++) { Tensor v = clustering->vectors[i]; Tensor id = clustering->vector_ids[i]; - std::shared_ptr attributes_table = clustering->attributes_tables[i]; + std::shared_ptr attributes_table = nullptr; + if (clustering->attributes_tables.size() > i) { + attributes_table = clustering->attributes_tables[i]; + } if (v.size(0) != id.size(0)) { throw runtime_error("[PartitionManager] init_partitions: mismatch in v.size(0) vs id.size(0)."); } From 1d8a5b5cd9dea613a324ad289c34688e2b447f9c Mon Sep 17 00:00:00 2001 From: Kaustubh Dwivedi Date: Thu, 3 Apr 2025 23:59:22 -0500 Subject: [PATCH 14/21] Adding some search things --- src/cpp/include/common.h | 9 ++++ src/cpp/include/list_scanning.h | 13 +++++ src/cpp/src/clustering.cpp | 3 ++ src/cpp/src/quake_index.cpp | 1 + src/cpp/src/query_coordinator.cpp | 88 +++++++++++++++++++++++++++++++ test/cpp/query_coordinator.cpp | 31 +++++++++++ 6 files changed, 145 insertions(+) diff --git a/src/cpp/include/common.h b/src/cpp/include/common.h index ebd488b1..fbc448ad 100644 --- a/src/cpp/include/common.h +++ b/src/cpp/include/common.h @@ -83,6 +83,7 @@ constexpr bool DEFAULT_PRECOMPUTED = false; ///< Default flag to us constexpr float DEFAULT_INITIAL_SEARCH_FRACTION = 0.2f; ///< Default initial fraction of partitions to search. constexpr float DEFAULT_RECOMPUTE_THRESHOLD = 0.01f; ///< Default threshold to trigger recomputation of search parameters. constexpr int DEFAULT_APS_FLUSH_PERIOD_US = 100; ///< Default period (in microseconds) for flushing the APS buffer. +// constexpr int DEFAULT_PRICE_THRESHOLD = INT_MAX; // Default constants for maintenance policy parameters constexpr const char* DEFAULT_MAINTENANCE_POLICY = "query_cost"; ///< Default maintenance policy type. @@ -176,6 +177,12 @@ inline string metric_type_to_str(faiss::MetricType metric) { } } +enum class FilteringType { + PRE_FILTERING, + POST_FILTERING, + IN_FILTERING +}; + /** * @brief Parameters for the search operation */ @@ -190,6 +197,8 @@ struct SearchParams { float recompute_threshold = DEFAULT_RECOMPUTE_THRESHOLD; float initial_search_fraction = DEFAULT_INITIAL_SEARCH_FRACTION; int aps_flush_period_us = DEFAULT_APS_FLUSH_PERIOD_US; + // int price_threshold = DEFAULT_PRICE_THRESHOLD; + // FilteringType filteringType = FilteringType::PRE_FILTERING; SearchParams() = default; }; diff --git a/src/cpp/include/list_scanning.h b/src/cpp/include/list_scanning.h index cf3275db..9c89da38 100644 --- a/src/cpp/include/list_scanning.h +++ b/src/cpp/include/list_scanning.h @@ -285,6 +285,19 @@ inline void scan_list_with_ids_l2(const float *query_vec, buffer.add(sqrt(faiss::fvec_L2sqr(query_vec, vec, d)), list_ids[l]); vec += d; } + // if (bitmap == nullptr) { + // for (int l = 0; l < list_size; l++) { + // buffer.add(sqrt(faiss::fvec_L2sqr(query_vec, vec, d)), list_ids[l]); + // vec += d; + // } + // } else { + // for (int l = 0; l < list_size; l++) { + // if (bitmap[l]) { + // buffer.add(sqrt(faiss::fvec_L2sqr(query_vec, vec, d)), list_ids[l]); + // } + // vec += d; + // } + // } } // The main scan_list function that dispatches to one of the specialized functions. diff --git a/src/cpp/src/clustering.cpp b/src/cpp/src/clustering.cpp index 3505de3b..f4407c40 100644 --- a/src/cpp/src/clustering.cpp +++ b/src/cpp/src/clustering.cpp @@ -60,7 +60,10 @@ shared_ptr kmeans(Tensor vectors, for (int i = 0; i < n_clusters; i++) { auto mask = (assignments == i); + + // List of vectors present in the cluster i cluster_vectors[i] = vectors.index({mask}); + // List of vectorIds present in the cluster i cluster_ids[i] = ids.index({mask}); if(attributes_table == nullptr) { diff --git a/src/cpp/src/quake_index.cpp b/src/cpp/src/quake_index.cpp index 45b10099..55f2dbaa 100644 --- a/src/cpp/src/quake_index.cpp +++ b/src/cpp/src/quake_index.cpp @@ -91,6 +91,7 @@ shared_ptr QuakeIndex::build(Tensor x, Tensor ids, shared_ptr QuakeIndex::search(Tensor x, shared_ptr search_params) { + std::cout<<"HELLO"< #include #include +#include +#include +#include +#include // Constructor QueryCoordinator::QueryCoordinator(shared_ptr parent, @@ -473,6 +477,38 @@ shared_ptr QueryCoordinator::worker_scan( return search_result; } +// bool* create_bitmap(std::shared_ptr attributes_table, int64_t* list_ids, +// int64_t num_ids, shared_ptr search_params) { +// // Get 'id' and 'price' columns +// std::shared_ptr id_column = attributes_table->GetColumnByName("id"); +// std::shared_ptr price_column = attributes_table->GetColumnByName("price"); + +// if (!id_column || !price_column) { +// throw std::runtime_error("Columns not found in the table."); +// } + +// auto id_array = std::static_pointer_cast(id_column->chunk(0)); +// auto price_array = std::static_pointer_cast(price_column->chunk(0)); + +// bool* bitmap = new bool[num_ids]; + +// std::unordered_map id_to_price; +// for (int64_t i = 0; i < id_array->length(); i++) { +// id_to_price[id_array->Value(i)] = price_array->Value(i); +// } + +// for (int64_t i = 0; i < num_ids; i++) { +// int64_t id = list_ids[i]; +// if (id_to_price.count(id) && id_to_price[id] <= search_params->price_threshold) { +// bitmap[i] = 1; +// } else { +// bitmap[i] = 0; +// } +// } + +// return bitmap; +// } + shared_ptr QueryCoordinator::serial_scan(Tensor x, Tensor partition_ids_to_scan, shared_ptr search_params) { if (!partition_manager_) { @@ -552,7 +588,20 @@ shared_ptr QueryCoordinator::serial_scan(Tensor x, Tensor partitio start_time = std::chrono::high_resolution_clock::now(); float *list_vectors = (float *) partition_manager_->partitions_->get_codes(pi); int64_t *list_ids = (int64_t *) partition_manager_->partitions_->get_ids(pi); + // std::shared_ptr partition_attributes_table = + // partition_manager_->partitions_->partitions_[pi]->attributes_tables_.front(); + // int64_t list_size = partition_manager_->partitions_->list_size(pi); int64_t list_size = partition_manager_->partitions_->list_size(pi); + // bool* bitmap = nullptr; + // if (search_params->filteringType == FilteringType::PRE_FILTERING) { + // std::shared_ptr vector_id_column = partition_attributes_table->GetColumnByName("id"); + // std::shared_ptr price_column = partition_attributes_table->GetColumnByName("price"); + + // if (!vector_id_column || !price_column) { + // throw std::runtime_error("Columns not found in the table."); + // } + // bitmap = create_bitmap(partition_attributes_table, list_ids, list_size, search_params); + // } scan_list(query_vec, list_vectors, @@ -659,6 +708,45 @@ shared_ptr QueryCoordinator::search(Tensor x, shared_ptrfilteringType == FilteringType::POST_FILTERING and search_params->price_threshold != INT_MAX and parent_!=nullptr) { + // auto id_data = search_result->ids.data(); + // auto distance_data = search_result->distances.data(); + // int64_t num_results = search_result->ids.size(0); + + // std::vector filtered_ids; + // std::vector filtered_distances; + + // std::shared_ptr id_column = attributes_table->GetColumnByName("id"); + // std::shared_ptr price_column = attributes_table->GetColumnByName("price"); + + // for (int64_t i = 0; i < num_results; i++) { + // int64_t id = id_data[i]; + + // // Search for this ID in the attribute table + // std::shared_ptr found_row; + // auto id_scalar = arrow::MakeScalar(id); + // auto price_scalar = arrow::MakeScalar(search_params->price_threshold); + + // auto equal_condition = arrow::compute::CallFunction("equal", {id_column->chunk(0), id_scalar}); + // auto less_equal_condition = arrow::compute::CallFunction("less_equal", {price_column->chunk(0), price_scalar}); + + // auto combined_condition = arrow::compute::CallFunction("and", {equal_condition.ValueOrDie(), less_equal_condition.ValueOrDie()}); + + // auto mask_table = std::static_pointer_cast(combined_condition->make_array()); + + // auto filter_result = arrow::compute::Filter(attributes_table, combined_condition.ValueOrDie()); + + // if (filter_result.ok()) { + // filtered_ids.push_back(id); + // filtered_distances.push_back(distance_data[i]); + // } + // } + + // search_result->ids = torch::tensor(filtered_ids, torch::kInt64); + // search_result->distances = torch::tensor(filtered_distances, torch::kFloat); + // } + search_result->timing_info->parent_info = parent_timing_info; auto end = std::chrono::high_resolution_clock::now(); diff --git a/test/cpp/query_coordinator.cpp b/test/cpp/query_coordinator.cpp index d272f2d0..f94d9768 100644 --- a/test/cpp/query_coordinator.cpp +++ b/test/cpp/query_coordinator.cpp @@ -27,6 +27,37 @@ class QueryCoordinatorTest : public ::testing::Test { std::shared_ptr partition_manager_; MetricType metric_ = faiss::METRIC_L2; + + // static std::shared_ptr generate_data_frame(int64_t num_vectors, torch::Tensor ids) { + // arrow::MemoryPool* pool = arrow::default_memory_pool(); + + // // Builders for the "price" and "id" columns + // arrow::DoubleBuilder price_builder(pool); + // arrow::Int64Builder id_builder(pool); + + // // Append values to the builders + // for (int64_t i = 0; i < num_vectors; i++) { + // price_builder.Append(static_cast(i) * 1.5); // Price column + // id_builder.Append(ids[i].item()); // ID column from the input tensor + // } + + // // Finalize the arrays + // std::shared_ptr price_array; + // std::shared_ptr id_array; + // price_builder.Finish(&price_array); + // id_builder.Finish(&id_array); + + // // Define the schema with two fields: "price" and "id" + // std::vector> schema_vector = { + // arrow::field("id", arrow::int64()), + // arrow::field("price", arrow::float64()), + // }; + // auto schema = std::make_shared(schema_vector); + + // // Create and return the table with both columns + // return arrow::Table::Make(schema, {id_array, price_array}); + // } + void SetUp() override { // Create dummy vectors and IDs From f7eb486d3208fd2622cda2930c3a41f9820dc804 Mon Sep 17 00:00:00 2001 From: Kaustubh Dwivedi Date: Fri, 4 Apr 2025 19:09:15 -0500 Subject: [PATCH 15/21] Older tests working fine with filter search --- src/cpp/include/common.h | 6 +-- src/cpp/include/list_scanning.h | 45 +++++++++------- src/cpp/src/partition_manager.cpp | 1 - src/cpp/src/quake_index.cpp | 1 - src/cpp/src/query_coordinator.cpp | 86 ++++++++++++++++--------------- test/cpp/quake_index.cpp | 4 +- 6 files changed, 76 insertions(+), 67 deletions(-) diff --git a/src/cpp/include/common.h b/src/cpp/include/common.h index fbc448ad..c17b02a6 100644 --- a/src/cpp/include/common.h +++ b/src/cpp/include/common.h @@ -83,7 +83,7 @@ constexpr bool DEFAULT_PRECOMPUTED = false; ///< Default flag to us constexpr float DEFAULT_INITIAL_SEARCH_FRACTION = 0.2f; ///< Default initial fraction of partitions to search. constexpr float DEFAULT_RECOMPUTE_THRESHOLD = 0.01f; ///< Default threshold to trigger recomputation of search parameters. constexpr int DEFAULT_APS_FLUSH_PERIOD_US = 100; ///< Default period (in microseconds) for flushing the APS buffer. -// constexpr int DEFAULT_PRICE_THRESHOLD = INT_MAX; +constexpr int DEFAULT_PRICE_THRESHOLD = INT_MAX; // Default constants for maintenance policy parameters constexpr const char* DEFAULT_MAINTENANCE_POLICY = "query_cost"; ///< Default maintenance policy type. @@ -197,8 +197,8 @@ struct SearchParams { float recompute_threshold = DEFAULT_RECOMPUTE_THRESHOLD; float initial_search_fraction = DEFAULT_INITIAL_SEARCH_FRACTION; int aps_flush_period_us = DEFAULT_APS_FLUSH_PERIOD_US; - // int price_threshold = DEFAULT_PRICE_THRESHOLD; - // FilteringType filteringType = FilteringType::PRE_FILTERING; + int price_threshold = DEFAULT_PRICE_THRESHOLD; + FilteringType filteringType = FilteringType::IN_FILTERING; SearchParams() = default; }; diff --git a/src/cpp/include/list_scanning.h b/src/cpp/include/list_scanning.h index 9c89da38..4324b655 100644 --- a/src/cpp/include/list_scanning.h +++ b/src/cpp/include/list_scanning.h @@ -147,6 +147,10 @@ class TypedTopKBuffer { partitions_scanned_.fetch_add(1, std::memory_order_relaxed); } + void remove(int rejected_index) { + topk_[rejected_index] = topk_[--curr_offset_]; + } + DistanceType flush() { std::lock_guard buffer_lock(buffer_mutex_); if (curr_offset_ > k_) { @@ -279,25 +283,27 @@ inline void scan_list_with_ids_l2(const float *query_vec, const int64_t *list_ids, int list_size, int d, - TopkBuffer &buffer) { + TopkBuffer &buffer, + bool* bitmap = nullptr) { const float *vec = list_vecs; - for (int l = 0; l < list_size; l++) { - buffer.add(sqrt(faiss::fvec_L2sqr(query_vec, vec, d)), list_ids[l]); - vec += d; - } - // if (bitmap == nullptr) { - // for (int l = 0; l < list_size; l++) { - // buffer.add(sqrt(faiss::fvec_L2sqr(query_vec, vec, d)), list_ids[l]); - // vec += d; - // } - // } else { - // for (int l = 0; l < list_size; l++) { - // if (bitmap[l]) { - // buffer.add(sqrt(faiss::fvec_L2sqr(query_vec, vec, d)), list_ids[l]); - // } - // vec += d; - // } + // for (int l = 0; l < list_size; l++) { + // buffer.add(sqrt(faiss::fvec_L2sqr(query_vec, vec, d)), list_ids[l]); + // vec += d; // } + + if (bitmap == nullptr) { + for (int l = 0; l < list_size; l++) { + buffer.add(sqrt(faiss::fvec_L2sqr(query_vec, vec, d)), list_ids[l]); + vec += d; + } + } else { + for (int l = 0; l < list_size; l++) { + if (bitmap[l]) { + buffer.add(sqrt(faiss::fvec_L2sqr(query_vec, vec, d)), list_ids[l]); + } + vec += d; + } + } } // The main scan_list function that dispatches to one of the specialized functions. @@ -307,7 +313,8 @@ inline void scan_list(const float *query_vec, int list_size, int d, TopkBuffer &buffer, - faiss::MetricType metric = faiss::METRIC_L2) { + faiss::MetricType metric = faiss::METRIC_L2, + bool* bitmap = nullptr) { // Dispatch based on metric type and whether list_ids is provided. if (metric == faiss::METRIC_INNER_PRODUCT) { if (list_ids == nullptr) @@ -318,7 +325,7 @@ inline void scan_list(const float *query_vec, if (list_ids == nullptr) scan_list_no_ids_l2(query_vec, list_vecs, list_size, d, buffer); else - scan_list_with_ids_l2(query_vec, list_vecs, list_ids, list_size, d, buffer); + scan_list_with_ids_l2(query_vec, list_vecs, list_ids, list_size, d, buffer, bitmap); } } diff --git a/src/cpp/src/partition_manager.cpp b/src/cpp/src/partition_manager.cpp index 6aa0ab86..2098b174 100644 --- a/src/cpp/src/partition_manager.cpp +++ b/src/cpp/src/partition_manager.cpp @@ -166,7 +166,6 @@ shared_ptr PartitionManager::add( bool check_uniques, std::shared_ptr attributes_table ) { - auto timing_info = std::make_shared(); if (debug_) { diff --git a/src/cpp/src/quake_index.cpp b/src/cpp/src/quake_index.cpp index 55f2dbaa..45b10099 100644 --- a/src/cpp/src/quake_index.cpp +++ b/src/cpp/src/quake_index.cpp @@ -91,7 +91,6 @@ shared_ptr QuakeIndex::build(Tensor x, Tensor ids, shared_ptr QuakeIndex::search(Tensor x, shared_ptr search_params) { - std::cout<<"HELLO"< QueryCoordinator::worker_scan( return search_result; } -// bool* create_bitmap(std::shared_ptr attributes_table, int64_t* list_ids, -// int64_t num_ids, shared_ptr search_params) { -// // Get 'id' and 'price' columns -// std::shared_ptr id_column = attributes_table->GetColumnByName("id"); -// std::shared_ptr price_column = attributes_table->GetColumnByName("price"); - -// if (!id_column || !price_column) { -// throw std::runtime_error("Columns not found in the table."); -// } - -// auto id_array = std::static_pointer_cast(id_column->chunk(0)); -// auto price_array = std::static_pointer_cast(price_column->chunk(0)); +bool* create_bitmap(std::unordered_map id_to_price, int64_t* list_ids, + int64_t num_ids, shared_ptr search_params) { -// bool* bitmap = new bool[num_ids]; + bool* bitmap = new bool[num_ids]; -// std::unordered_map id_to_price; -// for (int64_t i = 0; i < id_array->length(); i++) { -// id_to_price[id_array->Value(i)] = price_array->Value(i); -// } - -// for (int64_t i = 0; i < num_ids; i++) { -// int64_t id = list_ids[i]; -// if (id_to_price.count(id) && id_to_price[id] <= search_params->price_threshold) { -// bitmap[i] = 1; -// } else { -// bitmap[i] = 0; -// } -// } + for (int64_t i = 0; i < num_ids; i++) { + int64_t id = list_ids[i]; + if (id_to_price.count(id) && id_to_price[id] <= search_params->price_threshold) { + bitmap[i] = 1; + } else { + bitmap[i] = 0; + } + } -// return bitmap; -// } + return bitmap; +} shared_ptr QueryCoordinator::serial_scan(Tensor x, Tensor partition_ids_to_scan, shared_ptr search_params) { @@ -588,20 +573,28 @@ shared_ptr QueryCoordinator::serial_scan(Tensor x, Tensor partitio start_time = std::chrono::high_resolution_clock::now(); float *list_vectors = (float *) partition_manager_->partitions_->get_codes(pi); int64_t *list_ids = (int64_t *) partition_manager_->partitions_->get_ids(pi); - // std::shared_ptr partition_attributes_table = - // partition_manager_->partitions_->partitions_[pi]->attributes_tables_.front(); - // int64_t list_size = partition_manager_->partitions_->list_size(pi); + std::shared_ptr partition_attributes_table = + partition_manager_->partitions_->partitions_[pi]->attributes_table_; int64_t list_size = partition_manager_->partitions_->list_size(pi); - // bool* bitmap = nullptr; - // if (search_params->filteringType == FilteringType::PRE_FILTERING) { - // std::shared_ptr vector_id_column = partition_attributes_table->GetColumnByName("id"); - // std::shared_ptr price_column = partition_attributes_table->GetColumnByName("price"); - // if (!vector_id_column || !price_column) { - // throw std::runtime_error("Columns not found in the table."); - // } - // bitmap = create_bitmap(partition_attributes_table, list_ids, list_size, search_params); - // } + std::shared_ptr id_array = nullptr; + std::shared_ptr price_array = nullptr; + + std::unordered_map id_to_price; + + if (partition_attributes_table != nullptr) { + id_array = std::static_pointer_cast(partition_attributes_table->GetColumnByName("id")->chunk(0)); + price_array = std::static_pointer_cast(partition_attributes_table->GetColumnByName("price")->chunk(0)); + for (int64_t i = 0; i < id_array->length(); i++) { + id_to_price[id_array->Value(i)] = price_array->Value(i); + } + } + + bool* bitmap = nullptr; + + if (search_params->filteringType == FilteringType::PRE_FILTERING) { + bitmap = create_bitmap(id_to_price, list_ids, list_size, search_params); + } scan_list(query_vec, list_vectors, @@ -609,7 +602,18 @@ shared_ptr QueryCoordinator::serial_scan(Tensor x, Tensor partitio partition_manager_->partitions_->list_size(pi), dimension, *topk_buf, - metric_); + metric_, + bitmap); + if (search_params->filteringType ==FilteringType::POST_FILTERING) { + auto scanned_vectors = topk_buf->topk_; + int buffer_size = topk_buf->curr_offset_; + for (int i = 0;i < buffer_size; i++) { + auto vector_id = scanned_vectors[i].second; + if (id_to_price.count(vector_id) and id_to_price[vector_id] > search_params->price_threshold) { + topk_buf->remove(i); + } + } + } float curr_radius = topk_buf->get_kth_distance(); float percent_change = abs(curr_radius - query_radius) / curr_radius; diff --git a/test/cpp/quake_index.cpp b/test/cpp/quake_index.cpp index b4fae661..32e1c9cb 100644 --- a/test/cpp/quake_index.cpp +++ b/test/cpp/quake_index.cpp @@ -149,7 +149,7 @@ TEST_F(QuakeIndexTest, SearchPartitionedTest) { auto build_params = std::make_shared(); build_params->nlist = nlist_; build_params->metric = "l2"; - index.build(data_vectors_, data_ids_, build_params); + index.build(data_vectors_, data_ids_, build_params, attributes_table); // Create a search_params object (if you need special fields, set them up) auto search_params = std::make_shared(); @@ -176,7 +176,7 @@ TEST_F(QuakeIndexTest, SearchFlatTest) { // Build auto build_params = std::make_shared(); build_params->metric = "l2"; - index.build(data_vectors_, data_ids_, build_params); + index.build(data_vectors_, data_ids_, build_params, attributes_table); // Create a search_params object (if you need special fields, set them up) auto search_params = std::make_shared(); From f12d21e4da5df3038d1badee1f29946c853d7440 Mon Sep 17 00:00:00 2001 From: Kaustubh Dwivedi Date: Sat, 5 Apr 2025 15:28:42 -0500 Subject: [PATCH 16/21] Tests for search added --- src/cpp/include/list_scanning.h | 4 -- src/cpp/src/query_coordinator.cpp | 5 +- test/cpp/query_coordinator.cpp | 116 +++++++++++++++++++++++------- 3 files changed, 95 insertions(+), 30 deletions(-) diff --git a/src/cpp/include/list_scanning.h b/src/cpp/include/list_scanning.h index 4324b655..53d54dfe 100644 --- a/src/cpp/include/list_scanning.h +++ b/src/cpp/include/list_scanning.h @@ -286,10 +286,6 @@ inline void scan_list_with_ids_l2(const float *query_vec, TopkBuffer &buffer, bool* bitmap = nullptr) { const float *vec = list_vecs; - // for (int l = 0; l < list_size; l++) { - // buffer.add(sqrt(faiss::fvec_L2sqr(query_vec, vec, d)), list_ids[l]); - // vec += d; - // } if (bitmap == nullptr) { for (int l = 0; l < list_size; l++) { diff --git a/src/cpp/src/query_coordinator.cpp b/src/cpp/src/query_coordinator.cpp index 0d0f9db6..1580bd65 100644 --- a/src/cpp/src/query_coordinator.cpp +++ b/src/cpp/src/query_coordinator.cpp @@ -578,13 +578,14 @@ shared_ptr QueryCoordinator::serial_scan(Tensor x, Tensor partitio int64_t list_size = partition_manager_->partitions_->list_size(pi); std::shared_ptr id_array = nullptr; - std::shared_ptr price_array = nullptr; + std::shared_ptr price_array = nullptr; std::unordered_map id_to_price; if (partition_attributes_table != nullptr) { id_array = std::static_pointer_cast(partition_attributes_table->GetColumnByName("id")->chunk(0)); - price_array = std::static_pointer_cast(partition_attributes_table->GetColumnByName("price")->chunk(0)); + price_array = std::static_pointer_cast(partition_attributes_table->GetColumnByName("price")->chunk(0)); + int64_t length = id_array->length(); for (int64_t i = 0; i < id_array->length(); i++) { id_to_price[id_array->Value(i)] = price_array->Value(i); } diff --git a/test/cpp/query_coordinator.cpp b/test/cpp/query_coordinator.cpp index f94d9768..33a468da 100644 --- a/test/cpp/query_coordinator.cpp +++ b/test/cpp/query_coordinator.cpp @@ -27,36 +27,44 @@ class QueryCoordinatorTest : public ::testing::Test { std::shared_ptr partition_manager_; MetricType metric_ = faiss::METRIC_L2; + static torch::Tensor generate_random_data(int64_t num_vectors, int64_t dim) { + return torch::randn({num_vectors, dim}, torch::kFloat32); + } - // static std::shared_ptr generate_data_frame(int64_t num_vectors, torch::Tensor ids) { - // arrow::MemoryPool* pool = arrow::default_memory_pool(); + static torch::Tensor generate_sequential_ids(int64_t count, int64_t start = 0) { + return torch::arange(start, start + count, torch::kInt64); + } - // // Builders for the "price" and "id" columns - // arrow::DoubleBuilder price_builder(pool); - // arrow::Int64Builder id_builder(pool); + static std::shared_ptr generate_data_frame(int64_t num_vectors, torch::Tensor ids) { + arrow::MemoryPool* pool = arrow::default_memory_pool(); - // // Append values to the builders - // for (int64_t i = 0; i < num_vectors; i++) { - // price_builder.Append(static_cast(i) * 1.5); // Price column - // id_builder.Append(ids[i].item()); // ID column from the input tensor - // } + // Builders for the "price" and "id" columns + arrow::DoubleBuilder price_builder(pool); + arrow::Int64Builder id_builder(pool); - // // Finalize the arrays - // std::shared_ptr price_array; - // std::shared_ptr id_array; - // price_builder.Finish(&price_array); - // id_builder.Finish(&id_array); + // Append values to the builders + for (int64_t i = 0; i < num_vectors; i++) { + price_builder.Append(i); // Price column + id_builder.Append(ids[i].item()); // ID column from the input tensor + } - // // Define the schema with two fields: "price" and "id" - // std::vector> schema_vector = { - // arrow::field("id", arrow::int64()), - // arrow::field("price", arrow::float64()), - // }; - // auto schema = std::make_shared(schema_vector); + // Finalize the arrays + std::shared_ptr price_array; + std::shared_ptr id_array; + price_builder.Finish(&price_array); + id_builder.Finish(&id_array); + + // Define the schema with two fields: "price" and "id" + std::vector> schema_vector = { + arrow::field("id", arrow::int64()), + arrow::field("price", arrow::float64()), + }; + auto schema = std::make_shared(schema_vector); + + // Create and return the table with both columns + return arrow::Table::Make(schema, {id_array, price_array}); + } - // // Create and return the table with both columns - // return arrow::Table::Make(schema, {id_array, price_array}); - // } void SetUp() override { @@ -200,6 +208,66 @@ TEST_F(QueryCoordinatorTest, WorkerInitializationTest) { ASSERT_TRUE(coordinator->workers_initialized_); } +TEST_F(QueryCoordinatorTest, PreFilteringTest) { + auto index = std::make_shared(); + auto build_params = std::make_shared(); + build_params->nlist = 1; + build_params->metric = "l2"; + int64_t num_vectors = 10; + auto data_vectors = generate_random_data(num_vectors, dimension_); + auto data_ids = generate_sequential_ids(num_vectors, 0); + auto attributes_table = generate_data_frame(num_vectors, data_ids); + index->build(data_vectors, data_ids, build_params, attributes_table); + auto coordinator = std::make_shared( + index->parent_, + index->partition_manager_, + nullptr, + faiss::METRIC_L2 + ); + auto search_params = std::make_shared(); + search_params->k = 2; + search_params->price_threshold = 1; + search_params->filteringType = FilteringType::PRE_FILTERING; + auto result_worker = coordinator->search(torch::randn({1, dimension_}, torch::kFloat32), search_params); + vector expected_result = {0, 1}; + ASSERT_TRUE(result_worker != nullptr); + ASSERT_EQ(result_worker->ids.sizes(), (std::vector{1, 2})); + ASSERT_EQ(result_worker->distances.sizes(), (std::vector{1, 2})); + std::vector result_worker_vector(result_worker->ids.data(), result_worker->ids.data() + result_worker->ids.numel()); + sort(result_worker_vector.begin(), result_worker_vector.end()); + ASSERT_EQ(expected_result, result_worker_vector); +} + +TEST_F(QueryCoordinatorTest, PostFilteringTest) { + auto index = std::make_shared(); + auto build_params = std::make_shared(); + build_params->nlist = 1; + build_params->metric = "l2"; + int64_t num_vectors = 10; + auto data_vectors = generate_random_data(num_vectors, dimension_); + auto data_ids = generate_sequential_ids(num_vectors, 0); + auto attributes_table = generate_data_frame(num_vectors, data_ids); + index->build(data_vectors, data_ids, build_params, attributes_table); + auto coordinator = std::make_shared( + index->parent_, + index->partition_manager_, + nullptr, + faiss::METRIC_L2 + ); + auto search_params = std::make_shared(); + search_params->k = 2; + search_params->price_threshold = 1; + search_params->filteringType = FilteringType::POST_FILTERING; + auto result_worker = coordinator->search(torch::randn({1, dimension_}, torch::kFloat32), search_params); + vector expected_result = {0, 1}; + ASSERT_TRUE(result_worker != nullptr); + ASSERT_EQ(result_worker->ids.sizes(), (std::vector{1, 2})); + ASSERT_EQ(result_worker->distances.sizes(), (std::vector{1, 2})); + std::vector result_worker_vector(result_worker->ids.data(), result_worker->ids.data() + result_worker->ids.numel()); + sort(result_worker_vector.begin(), result_worker_vector.end()); + ASSERT_EQ(expected_result, result_worker_vector); +} + TEST_F(QueryCoordinatorTest, FlatWorkerScan) { int num_workers = 4; From ac50b7fb4cce06a0c5cdd2dde74888e86da48fe0 Mon Sep 17 00:00:00 2001 From: Amith Bhat Nekkare <47982550+amithbhat1@users.noreply.github.com> Date: Sat, 5 Apr 2025 17:24:43 -0500 Subject: [PATCH 17/21] fix remove logical bug - attr table can be NULL --- src/cpp/src/index_partition.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/cpp/src/index_partition.cpp b/src/cpp/src/index_partition.cpp index 7e31f55f..bcbd02c7 100644 --- a/src/cpp/src/index_partition.cpp +++ b/src/cpp/src/index_partition.cpp @@ -105,7 +105,10 @@ void IndexPartition::remove(int64_t index) { // You need to create a new Arrow data that doesn't have the target row. void IndexPartition::removeAttribute(int64_t target_id) { - assert(attributes_table_ && "Input table is null"); + if(attributes_table_ == nullptr) { + // if there is no table, nothing to remove, so exit gracefully + return; + } int64_t original_size = attributes_table_->num_rows(); if(original_size==0){ @@ -298,4 +301,4 @@ T* IndexPartition::allocate_memory(size_t num_elements, int numa_node) { throw std::bad_alloc(); } return ptr; -} \ No newline at end of file +} From 12888634fc4dba3379405fcfd529d6ec8ee33ebd Mon Sep 17 00:00:00 2001 From: Amith Bhat Nekkare <47982550+amithbhat1@users.noreply.github.com> Date: Sat, 5 Apr 2025 17:29:48 -0500 Subject: [PATCH 18/21] allow attr_table to be null - add vector --- src/cpp/src/partition_manager.cpp | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/cpp/src/partition_manager.cpp b/src/cpp/src/partition_manager.cpp index 2098b174..e99deb2d 100644 --- a/src/cpp/src/partition_manager.cpp +++ b/src/cpp/src/partition_manager.cpp @@ -132,12 +132,17 @@ std::shared_ptr PartitionManager::filterRowById( std::shared_ptr table, int64_t target_id ) { + if(table==nullptr ) { + return nullptr; + } + auto id_column = table->GetColumnByName("id"); if (!id_column) { std::cerr << "Column 'id' not found in table." << std::endl; return nullptr; } + // Create a filter expression (id == target_id) arrow::Datum column_data = id_column->chunk(0); arrow::Datum scalar_value = arrow::MakeScalar(target_id); @@ -181,10 +186,6 @@ shared_ptr PartitionManager::add( throw runtime_error("[PartitionManager] add: partitions_ is null. Did you call init_partitions?"); } - if(!attributes_table){ - throw runtime_error("[PartitionManager] add: attributes_table is null. Please add attributes for the vectors"); - } - if (!vectors.defined() || !vector_ids.defined()) { throw runtime_error("[PartitionManager] add: vectors or vector_ids is undefined."); } @@ -193,11 +194,11 @@ shared_ptr PartitionManager::add( throw runtime_error("[PartitionManager] add: mismatch in vectors.size(0) and vector_ids.size(0)."); } - if(attributes_table->num_rows()!= vector_ids.size(0)){ + if(attributes_table!=nullptr && attributes_table->num_rows()!= vector_ids.size(0)){ throw runtime_error("[PartitionManager] add: mismatch in attributes_table and vector_ids size."); } - if(!attributes_table->GetColumnByName("id")){ + if(attributes_table!=nullptr && !attributes_table->GetColumnByName("id")){ throw runtime_error("[PartitionManager] add: No vector_id column in attributes_table"); } @@ -857,4 +858,4 @@ void PartitionManager::load(const string &path) { if (debug_) { std::cout << "[PartitionManager] load: Load complete." << std::endl; } -} \ No newline at end of file +} From 711e4146e00c2da1269ee56986ecd1d6663c8ff5 Mon Sep 17 00:00:00 2001 From: Amith Bhat Nekkare <47982550+amithbhat1@users.noreply.github.com> Date: Sat, 5 Apr 2025 17:33:05 -0500 Subject: [PATCH 19/21] fixed tests --- test/cpp/quake_index.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/cpp/quake_index.cpp b/test/cpp/quake_index.cpp index 32e1c9cb..a693ef13 100644 --- a/test/cpp/quake_index.cpp +++ b/test/cpp/quake_index.cpp @@ -228,8 +228,9 @@ TEST_F(QuakeIndexTest, AddTest) { Tensor add_vectors = generate_random_data(10, dimension_); Tensor add_ids = generate_sequential_ids(10, 1000); + auto attr_table = generate_data_frame(10,add_ids); - auto modify_info = index.add(add_vectors, add_ids, attributes_table); + auto modify_info = index.add(add_vectors, add_ids, attr_table); EXPECT_EQ(modify_info->n_vectors, 10); EXPECT_GE(modify_info->modify_time_us, 0); } @@ -537,4 +538,4 @@ TEST(QuakeIndexStressTest, SearchAddRemoveMaintenanceTest) { } SUCCEED(); -} \ No newline at end of file +} From 68706a6376fcf4e1e6c8be3927061db18231a457 Mon Sep 17 00:00:00 2001 From: Sujan Reddy Date: Sat, 5 Apr 2025 18:14:43 -0500 Subject: [PATCH 20/21] fixed conflicts --- src/cpp/src/query_coordinator.cpp | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/cpp/src/query_coordinator.cpp b/src/cpp/src/query_coordinator.cpp index 8bd4c90b..9c4fde0a 100644 --- a/src/cpp/src/query_coordinator.cpp +++ b/src/cpp/src/query_coordinator.cpp @@ -523,10 +523,10 @@ shared_ptr QueryCoordinator::serial_scan(Tensor x, Tensor partitio bool use_aps = (search_params->recall_target > 0.0 && parent_); // Ensure partition_ids is 2D. - if (partition_ids.dim() == 1) { - partition_ids = partition_ids.unsqueeze(0).expand({num_queries, partition_ids.size(0)}); + if (partition_ids_to_scan.dim() == 1) { + partition_ids_to_scan = partition_ids_to_scan.unsqueeze(0).expand({num_queries, partition_ids_to_scan.size(0)}); } - auto partition_ids_accessor = partition_ids.accessor(); + auto partition_ids_accessor = partition_ids_to_scan.accessor(); float *x_ptr = x.data_ptr(); // Allocate per-query result vectors. @@ -538,7 +538,7 @@ shared_ptr QueryCoordinator::serial_scan(Tensor x, Tensor partitio // Create a local TopK buffer for query q. auto topk_buf = std::make_shared(k, is_descending); const float* query_vec = x_ptr + q * dimension; - int num_parts = partition_ids.size(1); + int num_parts = partition_ids_to_scan.size(1); vector boundary_distances; vector partition_probs; @@ -547,10 +547,10 @@ shared_ptr QueryCoordinator::serial_scan(Tensor x, Tensor partitio query_radius = -1000000.0; } - Tensor partition_sizes = partition_manager_->get_partition_sizes(partition_ids[q]); + Tensor partition_sizes = partition_manager_->get_partition_sizes(partition_ids_to_scan[q]); if (use_aps) { - vector partition_ids_to_scan_vec = std::vector(partition_ids[q].data_ptr(), - partition_ids[q].data_ptr() + partition_ids[q].size(0)); + vector partition_ids_to_scan_vec = std::vector(partition_ids_to_scan[q].data_ptr(), + partition_ids_to_scan[q].data_ptr() + partition_ids_to_scan[q].size(0)); vector cluster_centroids = parent_->partition_manager_->get_vectors(partition_ids_to_scan_vec); boundary_distances = compute_boundary_distances(x[q], cluster_centroids, From f6305c903f6736c5daf42fff4aba6e94954e427b Mon Sep 17 00:00:00 2001 From: Sujan Reddy Date: Sat, 5 Apr 2025 18:23:41 -0500 Subject: [PATCH 21/21] added conda to quake_env --- .github/workflows/build_and_test.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/build_and_test.yaml b/.github/workflows/build_and_test.yaml index 412ca378..5f81f0d2 100644 --- a/.github/workflows/build_and_test.yaml +++ b/.github/workflows/build_and_test.yaml @@ -24,6 +24,7 @@ jobs: git config --global --add safe.directory '*' eval "$(conda shell.bash hook)" conda activate quake-env + conda install libarrow-all=19.0.1 -c conda-forge mkdir -p build cd build cmake -DCMAKE_BUILD_TYPE=${{ env.BUILD_TYPE }} \ @@ -46,6 +47,7 @@ jobs: git config --global --add safe.directory '*' eval "$(conda shell.bash hook)" conda activate quake-env + conda install libarrow-all=19.0.1 -c conda-forge pip install --no-use-pep517 . pip install pytest python -m pytest test/python \ No newline at end of file