From 2eea80b6dab1bd9eb8b3b20ccd1c627c03d46768 Mon Sep 17 00:00:00 2001 From: Jason Mohoney Date: Wed, 16 Apr 2025 14:37:55 +0000 Subject: [PATCH 01/15] add cuvs --- CMakeLists.txt | 18 +- src/cpp/include/cuv_cluster.h | 174 +++++++++++++++++++ src/cpp/third_party/cmake/fetch_rapids.cmake | 22 +++ src/cpp/third_party/cmake/get_cuvs.cmake | 56 ++++++ test/cpp/test_clustering_benchmark.cpp | 79 +++++++++ test/cpp/test_clustering_correctness.cpp | 157 +++++++++++++++++ test/cpp/test_cuvs_clustering.cpp | 88 ++++++++++ 7 files changed, 593 insertions(+), 1 deletion(-) create mode 100644 src/cpp/include/cuv_cluster.h create mode 100644 src/cpp/third_party/cmake/fetch_rapids.cmake create mode 100644 src/cpp/third_party/cmake/get_cuvs.cmake create mode 100644 test/cpp/test_clustering_benchmark.cpp create mode 100644 test/cpp/test_clustering_correctness.cpp create mode 100644 test/cpp/test_cuvs_clustering.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index a4d2c003..a337eb9d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -96,10 +96,25 @@ endif() # --------------------------------------------------------------- # Find Required Packages # --------------------------------------------------------------- +find_package(CuVS REQUIRED) find_package(Torch REQUIRED) find_package(Python3 COMPONENTS Development Interpreter REQUIRED) find_package(Python COMPONENTS Interpreter Development REQUIRED) +include(src/cpp/third_party/cmake/fetch_rapids.cmake) +include(rapids-cmake) +include(rapids-cpm) +include(rapids-cuda) +include(rapids-export) +include(rapids-find) + +rapids_cuda_init_architectures(quake_c) + +rapids_cpm_init() +set(BUILD_CUVS_C_LIBRARY OFF) +include(src/cpp/third_party/cmake/get_cuvs.cmake) + + message(STATUS "Torch include dir: ${TORCH_INCLUDE_DIRS}") message(STATUS "Torch libraries: ${TORCH_LIBRARIES}") message(STATUS "Python include dir: ${Python3_INCLUDE_DIRS}") @@ -131,6 +146,7 @@ target_compile_features(${PROJECT_NAME} PUBLIC cxx_std_17) # Base libraries common to all platforms set(LINK_LIBS ${TORCH_LIBRARIES} + cuvs::cuvs -ffast-math -lpthread -fPIC @@ -221,4 +237,4 @@ message(STATUS "MKL_LINK: ${MKL_LINK}") message(STATUS "MKL_INTERFACE_FULL: ${MKL_INTERFACE_FULL}") message(STATUS "MKL_THREADING: ${MKL_THREADING}") message(STATUS "MKL_MPI: ${MKL_MPI}") -message(STATUS "------------------------------------------------") \ No newline at end of file +message(STATUS "------------------------------------------------") diff --git a/src/cpp/include/cuv_cluster.h b/src/cpp/include/cuv_cluster.h new file mode 100644 index 00000000..6e2ab9b4 --- /dev/null +++ b/src/cpp/include/cuv_cluster.h @@ -0,0 +1,174 @@ +#include +#include // For at::cuda::getCurrentCUDAStream() +#include // RAFT resources (handle) +#include // RAFT device view (make_device_matrix_view, etc.) +#include // cuVS k-means API + +#include +#include +#include +#include +#include + +// Optionally, if you have your own enum, you can use that instead of faiss::METRIC_L2/INNER_PRODUCT. +struct ClusteringResult { + torch::Tensor centroids; + // Each pair is (cluster_vectors, cluster_ids) for one cluster. + std::vector> clusters; +}; + + +inline ClusteringResult ClusterWithCuVS(const torch::Tensor& vectors, + const torch::Tensor& ids, + int64_t num_clusters, + int metric = faiss::METRIC_L2) { + using clock = std::chrono::high_resolution_clock; + auto t0 = clock::now(); + + // Validate input shapes and sizes. + TORCH_CHECK(vectors.dim() == 2, "Input 'vectors' must be a 2D tensor"); + TORCH_CHECK(ids.dim() == 1 || (ids.dim() == 2 && ids.size(1) == 1), + "Input 'ids' must be a 1D tensor or 2D with shape (N,1)"); + TORCH_CHECK(vectors.size(0) == ids.size(0), "Number of ids must match number of vectors"); + TORCH_CHECK(vectors.size(0) >= num_clusters, "Number of clusters cannot exceed number of points"); + + // Move data and ids to GPU (if needed) and ensure contiguous memory. + torch::Tensor data = vectors.to(torch::kCUDA, torch::kFloat32).contiguous(); + torch::Tensor id_dev = ids.to(torch::kCUDA).contiguous(); + int64_t n_samples = data.size(0); + int64_t n_features = data.size(1); + + auto t1 = clock::now(); + std::cout << "[DEBUG] Data transfer & contiguity: " + << std::chrono::duration_cast(t1 - t0).count() << " ms" << std::endl; + + // If using inner-product (cosine), normalize input vectors. + if (metric == faiss::METRIC_INNER_PRODUCT) { + torch::Tensor norms = torch::sqrt((data * data).sum(1, /*keepdim=*/true)); + data = data / norms; + } + + auto t2 = clock::now(); + std::cout << "[DEBUG] Data normalization: " + << std::chrono::duration_cast(t2 - t1).count() << " ms" << std::endl; + + // RAFT handle and stream setup. + raft::resources handle; + cudaStream_t cuda_stream = at::cuda::getCurrentCUDAStream().stream(); + raft::resource::set_cuda_stream(handle, cuda_stream); + + auto t3 = clock::now(); + std::cout << "[DEBUG] RAFT handle & stream setup: " + << std::chrono::duration_cast(t3 - t2).count() << " ms" << std::endl; + + // Wrap the input data in a RAFT device_matrix_view. + float* data_ptr = data.data_ptr(); + auto X_view = raft::make_device_matrix_view(data_ptr, (int)n_samples, (int)n_features); + + // Allocate output centroids on GPU. + torch::Tensor centroids_tensor = torch::empty({num_clusters, n_features}, + torch::TensorOptions().dtype(torch::kFloat32).device(torch::kCUDA)); + float* centroids_ptr = centroids_tensor.data_ptr(); + auto centroids_view = raft::make_device_matrix_view(centroids_ptr, (int)num_clusters, (int)n_features); + + auto t4 = clock::now(); + std::cout << "[DEBUG] Memory allocation for centroids & data wrapping: " + << std::chrono::duration_cast(t4 - t3).count() << " ms" << std::endl; + + // Set up k-means parameters. + cuvs::cluster::kmeans::params params; + params.n_clusters = (int)num_clusters; + params.max_iter = 5; + params.init = cuvs::cluster::kmeans::params::InitMethod::Random; + + // Prepare host-side scalars to capture inertia and iterations. + float inertia = 0.0f; + int iterations = 0; + auto inertia_view = raft::make_host_scalar_view(&inertia); + auto iter_view = raft::make_host_scalar_view(&iterations); + + // Run k-means clustering (fit). + cuvs::cluster::kmeans::fit(handle, params, X_view, std::nullopt, + centroids_view, inertia_view, iter_view); + + auto t5 = clock::now(); + std::cout << "[DEBUG] cuVS fit (k-means clustering): " + << std::chrono::duration_cast(t5 - t4).count() << " ms" << std::endl; + + // If inner-product, renormalize centroids. + if (metric == faiss::METRIC_INNER_PRODUCT) { + torch::Tensor cent_norms = torch::sqrt((centroids_tensor * centroids_tensor).sum(1, /*keepdim=*/true)); + centroids_tensor.div_(cent_norms); + } + + auto t6 = clock::now(); + std::cout << "[DEBUG] Centroid renormalization (if applicable): " + << std::chrono::duration_cast(t6 - t5).count() << " ms" << std::endl; + + // Allocate memory for labels and run prediction. + torch::Tensor labels = torch::empty({n_samples}, torch::TensorOptions().dtype(torch::kInt32).device(torch::kCUDA)); + int* labels_ptr = labels.data_ptr(); + auto labels_view = raft::make_device_vector_view(labels_ptr, (int)n_samples); + + cuvs::cluster::kmeans::predict(handle, params, X_view, std::nullopt, + centroids_view, labels_view, false, + raft::make_host_scalar_view(&inertia)); + + auto t7 = clock::now(); + std::cout << "[DEBUG] cuVS predict: " + << std::chrono::duration_cast(t7 - t6).count() << " ms" << std::endl; + + // Synchronize the stream. + raft::resource::sync_stream(handle); + auto t8 = clock::now(); + std::cout << "[DEBUG] CUDA stream synchronization: " + << std::chrono::duration_cast(t8 - t7).count() << " ms" << std::endl; + + // ----- Grouping (GPU vectorized) ----- + // Sort the labels and get the sorted indices. + torch::Tensor sorted_tuple = std::get<1>(torch::sort(labels)); // sorted indices only, since sorted labels are not needed + torch::Tensor sorted_labels = labels.index_select(0, sorted_tuple); + + // Reorder the data and ids using the sorted indices. + torch::Tensor sorted_data = data.index_select(0, sorted_tuple); + torch::Tensor sorted_ids = id_dev.index_select(0, sorted_tuple); + + // Compute per-cluster counts using torch::bincount. + torch::Tensor counts = torch::bincount(sorted_labels.to(torch::kInt64), /*weights=*/{}, num_clusters); + + // Transfer counts to CPU and build a vector for split sizes. + auto counts_cpu = counts.to(torch::kCPU); + std::vector split_sizes(counts_cpu.data_ptr(), counts_cpu.data_ptr() + counts_cpu.numel()); + + auto t9 = clock::now(); + std::cout << "[DEBUG] Sorting, counting, and preparing split sizes: " + << std::chrono::duration_cast(t9 - t8).count() << " ms" << std::endl; + + // Split the sorted data and ids into clusters. + std::vector cluster_vectors = torch::split(sorted_data, split_sizes, 0); + std::vector cluster_ids = torch::split(sorted_ids, split_sizes, 0); + + auto t10 = clock::now(); + std::cout << "[DEBUG] Splitting into clusters: " + << std::chrono::duration_cast(t10 - t9).count() << " ms" << std::endl; + + // Build the final clustering result. + std::vector> clusters; + clusters.reserve(num_clusters); + for (int i = 0; i < num_clusters; ++i) { + clusters.emplace_back(cluster_vectors[i], cluster_ids[i]); + } + + auto t_end = clock::now(); + std::cout << "[DEBUG] Total grouping time: " + << std::chrono::duration_cast(t_end - t8).count() << " ms" << std::endl; + + ClusteringResult result; + result.centroids = centroids_tensor; // Shape: (num_clusters, n_features) + result.clusters = std::move(clusters); + + std::cout << "[DEBUG] Total cuVS clustering time: " + << std::chrono::duration_cast(t_end - t0).count() << " ms" << std::endl; + + return result; +} diff --git a/src/cpp/third_party/cmake/fetch_rapids.cmake b/src/cpp/third_party/cmake/fetch_rapids.cmake new file mode 100644 index 00000000..0ec5fa04 --- /dev/null +++ b/src/cpp/third_party/cmake/fetch_rapids.cmake @@ -0,0 +1,22 @@ +# ============================================================================= +# Copyright (c) 2023-2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. + +# Use this variable to update RAPIDS and cuVS versions +set(RAPIDS_VERSION "25.06") +set(rapids-cmake-version ${RAPIDS_VERSION}) + +if(NOT EXISTS ${CMAKE_CURRENT_BINARY_DIR}/CUVS_RAPIDS.cmake) + file(DOWNLOAD https://raw.githubusercontent.com/rapidsai/rapids-cmake/branch-${RAPIDS_VERSION}/RAPIDS.cmake + ${CMAKE_CURRENT_BINARY_DIR}/CUVS_RAPIDS.cmake) +endif() +include(${CMAKE_CURRENT_BINARY_DIR}/CUVS_RAPIDS.cmake) diff --git a/src/cpp/third_party/cmake/get_cuvs.cmake b/src/cpp/third_party/cmake/get_cuvs.cmake new file mode 100644 index 00000000..a2a4e884 --- /dev/null +++ b/src/cpp/third_party/cmake/get_cuvs.cmake @@ -0,0 +1,56 @@ +# ============================================================================= +# Copyright (c) 2023-2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. + +# Use RAPIDS_VERSION from cmake/thirdparty/fetch_rapids.cmake +set(CUVS_VERSION "${RAPIDS_VERSION}") +set(CUVS_FORK "rapidsai") +set(CUVS_PINNED_TAG "branch-${RAPIDS_VERSION}") + +function(find_and_configure_cuvs) + set(oneValueArgs VERSION FORK PINNED_TAG ENABLE_NVTX BUILD_CUVS_C_LIBRARY) + cmake_parse_arguments(PKG "${options}" "${oneValueArgs}" + "${multiValueArgs}" ${ARGN} ) + + + set(CUVS_COMPONENTS "") + if(PKG_BUILD_CUVS_C_LIBRARY) + string(APPEND CUVS_COMPONENTS " c_api") + endif() + #----------------------------------------------------- + # Invoke CPM find_package() + #----------------------------------------------------- + rapids_cpm_find(cuvs ${PKG_VERSION} + GLOBAL_TARGETS cuvs::cuvs + BUILD_EXPORT_SET cuvs-examples-exports + INSTALL_EXPORT_SET cuvs-examples-exports + COMPONENTS ${CUVS_COMPONENTS} + CPM_ARGS + GIT_REPOSITORY https://github.com/${PKG_FORK}/cuvs.git + GIT_TAG ${PKG_PINNED_TAG} + SOURCE_SUBDIR cpp + OPTIONS + "BUILD_C_LIBRARY ${PKG_BUILD_CUVS_C_LIBRARY}" + "BUILD_TESTS OFF" + "CUVS_NVTX ${PKG_ENABLE_NVTX}" + ) +endfunction() + +# Change pinned tag here to test a commit in CI +# To use a different CUVS locally, set the CMake variable +# CPM_cuvs_SOURCE=/path/to/local/cuvs +find_and_configure_cuvs(VERSION ${CUVS_VERSION}.00 + FORK ${CUVS_FORK} + PINNED_TAG ${CUVS_PINNED_TAG} + ENABLE_NVTX OFF + BUILD_CUVS_C_LIBRARY ${BUILD_CUVS_C_LIBRARY} +) diff --git a/test/cpp/test_clustering_benchmark.cpp b/test/cpp/test_clustering_benchmark.cpp new file mode 100644 index 00000000..733ca95f --- /dev/null +++ b/test/cpp/test_clustering_benchmark.cpp @@ -0,0 +1,79 @@ +// benchmark_clustering_gtest.cpp +// +// This Google Test benchmarks two clustering implementations: +// 1. FAISS-based CPU clustering (kmeans with use_gpu == false) +// 2. cuVS-based GPU clustering (ClusterWithCuVS) +// on a dataset of 4,000,000 vectors (of dimension 128) and 10,000 clusters. +// The test prints the elapsed time (milliseconds) for each method. + +#include +#include +#include +#include +#include "clustering.h" // Declaration of kmeans() and ClusterWithCuVS() +#include "cuv_cluster.h" + +using namespace std::chrono; + +TEST(ClusteringBenchmark, CPU_vs_cuVS) { + // Verify CUDA is available. + if (!torch::cuda::is_available()) { + GTEST_SKIP() << "CUDA is not available; skipping benchmark."; + } + + // Define benchmark parameters. + const int64_t num_vectors = 10000000; // 10 million vectors + const int64_t dim = 128; + const int num_clusters = 10000; + const int niter = 5; // number of clustering iterations + + std::cout << "Benchmarking clustering with " << num_vectors << " vectors, dimension " + << dim << ", and " << num_clusters << " clusters." << std::endl; + + // Create CPU tensors. + auto options_float_cpu = torch::TensorOptions().dtype(torch::kFloat32).device(torch::kCPU); + auto options_int64_cpu = torch::TensorOptions().dtype(torch::kInt64).device(torch::kCPU); + torch::Tensor vectors_cpu = torch::randn({num_vectors, dim}, options_float_cpu).contiguous(); + torch::Tensor ids_cpu = torch::arange(0, num_vectors, options_int64_cpu).contiguous(); + + // Create CUDA tensors. + auto options_float_cuda = torch::TensorOptions().dtype(torch::kFloat32).device(torch::kCUDA); + auto options_int64_cuda = torch::TensorOptions().dtype(torch::kInt64).device(torch::kCUDA); + torch::Tensor vectors_cuda = vectors_cpu.to(options_float_cuda).contiguous(); + torch::Tensor ids_cuda = ids_cpu.to(options_int64_cuda).contiguous(); + + // Warm up the GPU using the cuVS clustering (to remove initialization overhead). + std::cout << "[DEBUG] Warming up GPU with cuVS clustering..." << std::endl; + { + auto dummy = ClusterWithCuVS(vectors_cuda, ids_cuda, 10, faiss::METRIC_L2); + torch::cuda::synchronize(); + } + std::cout << "[DEBUG] Warm-up complete." << std::endl; + + // Benchmark FAISS CPU clustering. + std::cout << "[DEBUG] Starting FAISS CPU clustering benchmark..." << std::endl; + auto start_cpu = high_resolution_clock::now(); + auto clustering_cpu = kmeans(vectors_cpu, ids_cpu, num_clusters, faiss::METRIC_L2, niter, false, torch::Tensor()); + auto end_cpu = high_resolution_clock::now(); + auto duration_cpu = duration_cast(end_cpu - start_cpu).count(); + std::cout << "FAISS CPU clustering time: " << duration_cpu << " ms" << std::endl; + + // Benchmark cuVS GPU clustering. + std::cout << "[DEBUG] Starting cuVS clustering benchmark..." << std::endl; + auto start_cuvs = high_resolution_clock::now(); + auto clustering_cuvs = ClusterWithCuVS(vectors_cuda, ids_cuda, num_clusters, faiss::METRIC_L2); + torch::cuda::synchronize(); // ensure GPU work is complete + auto end_cuvs = high_resolution_clock::now(); + auto duration_cuvs = duration_cast(end_cuvs - start_cuvs).count(); + std::cout << "cuVS clustering time: " << duration_cuvs << " ms" << std::endl; + + // Print summary. + std::cout << "---------------------------" << std::endl; + std::cout << "FAISS CPU clustering took " << duration_cpu << " ms." << std::endl; + std::cout << "cuVS clustering took " << duration_cuvs << " ms." << std::endl; + std::cout << "---------------------------" << std::endl; + + // Basic checks. + EXPECT_GT(duration_cpu, 0); + EXPECT_GT(duration_cuvs, 0); +} diff --git a/test/cpp/test_clustering_correctness.cpp b/test/cpp/test_clustering_correctness.cpp new file mode 100644 index 00000000..01fff623 --- /dev/null +++ b/test/cpp/test_clustering_correctness.cpp @@ -0,0 +1,157 @@ +// test_clustering_correctness.cpp +// +// This file tests the correctness of two clustering implementations: +// 1. FAISS-based clustering on CPU: kmeans(..., use_gpu = false) +// 2. cuVS-based clustering: ClusterWithCuVS(...) +// The tests compute the mean squared error (MSE) between each input vector and its +// corresponding cluster centroid and require that the MSE from the two methods +// agree within a reasonable tolerance. Additionally, the tests verify that each method +// returns centroids of the correct shape and partitions the input set completely. + +#include +#include +#include +#include +#include "clustering.h" // Declaration of kmeans(), ClusterWithCuVS(), Clustering, ClusteringResult, and MetricType + +// Helper to compute MSE for FAISS-based clustering results. +// The 'vectors' field of Clustering is a vector of tensors (one per cluster). +double ComputeMSE_Faiss(const torch::Tensor& centroids, const std::vector& cluster_vectors) { + double total_error = 0.0; + int64_t total_count = 0; + // Ensure centroids are on CPU. + torch::Tensor centroids_cpu = centroids.to(torch::kCPU); + for (int i = 0; i < centroids_cpu.size(0); ++i) { + if (cluster_vectors[i].size(0) > 0) { + // Move the cluster vectors to CPU. + torch::Tensor cluster_cpu = cluster_vectors[i].to(torch::kCPU); + torch::Tensor diff = cluster_cpu - centroids_cpu[i].unsqueeze(0); + total_error += diff.pow(2).sum().item(); + total_count += cluster_cpu.size(0); + } + } + return total_count > 0 ? total_error / total_count : 0.0; +} + +// Helper to compute MSE for cuVS-based clustering results. +// Here, clusters are returned as a vector of pairs where the first tensor is the cluster vectors. +double ComputeMSE_cuVS(const torch::Tensor& centroids, + const std::vector>& clusters) { + double total_error = 0.0; + int64_t total_count = 0; + torch::Tensor centroids_cpu = centroids.to(torch::kCPU); + for (int i = 0; i < centroids_cpu.size(0); ++i) { + const torch::Tensor& cluster_vectors = clusters[i].first; + if (cluster_vectors.size(0) > 0) { + torch::Tensor cluster_cpu = cluster_vectors.to(torch::kCPU); + torch::Tensor diff = cluster_cpu - centroids_cpu[i].unsqueeze(0); + total_error += diff.pow(2).sum().item(); + total_count += cluster_cpu.size(0); + } + } + return total_count > 0 ? total_error / total_count : 0.0; +} + +// Test fixture for clustering correctness tests. +class ClusteringCorrectnessTest : public ::testing::Test { + protected: + // Use a moderate number of vectors for correctness testing. + const int64_t num_vectors_ = 5000; + const int64_t dim_ = 64; + const int num_clusters_ = 20; + // CPU tensors. + torch::Tensor vectors_cpu_; + torch::Tensor ids_cpu_; + // CUDA tensors. + torch::Tensor vectors_cuda_; + torch::Tensor ids_cuda_; + + void SetUp() override { + if (!torch::cuda::is_available()) { + GTEST_SKIP() << "CUDA is not available; skipping clustering correctness tests."; + } + // Create CPU tensors. + auto options_float_cpu = torch::TensorOptions().dtype(torch::kFloat32).device(torch::kCPU); + auto options_int64_cpu = torch::TensorOptions().dtype(torch::kInt64).device(torch::kCPU); + vectors_cpu_ = torch::randn({num_vectors_, dim_}, options_float_cpu).contiguous(); + ids_cpu_ = torch::arange(0, num_vectors_, options_int64_cpu).contiguous(); + // Create CUDA versions. + auto options_float_cuda = torch::TensorOptions().dtype(torch::kFloat32).device(torch::kCUDA); + auto options_int64_cuda = torch::TensorOptions().dtype(torch::kInt64).device(torch::kCUDA); + vectors_cuda_ = vectors_cpu_.to(options_float_cuda).contiguous(); + ids_cuda_ = ids_cpu_.to(options_int64_cuda).contiguous(); + + std::cout << "[DEBUG] SetUp complete: " + << "num_vectors = " << num_vectors_ + << ", dim = " << dim_ + << ", clusters = " << num_clusters_ << std::endl; + std::cout.flush(); + } +}; + +TEST_F(ClusteringCorrectnessTest, CompareClusteringMethods_L2) { + const int niter = 20; // number of iterations for clustering + + std::cout << "[DEBUG] Starting FAISS CPU clustering ..." << std::endl; + std::cout.flush(); + auto clustering_cpu = kmeans(vectors_cpu_, ids_cpu_, num_clusters_, faiss::METRIC_L2, niter, false, torch::Tensor()); + std::cout << "[DEBUG] Completed FAISS CPU clustering." << std::endl; + std::cout << "[DEBUG] CPU centroids shape: "; + for (auto s : clustering_cpu->centroids.sizes()) { std::cout << s << " "; } + std::cout << std::endl; + std::cout.flush(); + + std::cout << "[DEBUG] Starting cuVS clustering ..." << std::endl; + std::cout.flush(); + ClusteringResult clustering_cuvs = ClusterWithCuVS(vectors_cuda_, ids_cuda_, num_clusters_, faiss::METRIC_L2); + std::cout << "[DEBUG] Completed cuVS clustering." << std::endl; + std::cout << "[DEBUG] cuVS centroids shape: "; + for (auto s : clustering_cuvs.centroids.sizes()) { std::cout << s << " "; } + std::cout << std::endl; + std::cout.flush(); + + // Compute MSE for each method. + double mse_cpu = ComputeMSE_Faiss(clustering_cpu->centroids, clustering_cpu->vectors); + double mse_cuvs = ComputeMSE_cuVS(clustering_cuvs.centroids, clustering_cuvs.clusters); + + std::cout << "[DEBUG] L2 Clustering MSEs:" << std::endl; + std::cout << " FAISS CPU: " << mse_cpu << std::endl; + std::cout << " cuVS : " << mse_cuvs << std::endl; + std::cout.flush(); + + // Assert that MSE values are similar (within 20% relative difference) + double tol = 0.20; + ASSERT_NEAR(mse_cpu, mse_cuvs, tol * mse_cpu); +} + +TEST_F(ClusteringCorrectnessTest, CompareClusteringMethods_InnerProduct) { + const int niter = 20; + std::cout << "[DEBUG] Starting FAISS CPU clustering (InnerProduct) ..." << std::endl; + std::cout.flush(); + auto clustering_cpu = kmeans(vectors_cpu_, ids_cpu_, num_clusters_, faiss::METRIC_INNER_PRODUCT, niter, false, torch::Tensor()); + std::cout << "[DEBUG] Completed FAISS CPU clustering (InnerProduct)." << std::endl; + std::cout << "[DEBUG] CPU centroids shape: "; + for (auto s : clustering_cpu->centroids.sizes()) { std::cout << s << " "; } + std::cout << std::endl; + std::cout.flush(); + + std::cout << "[DEBUG] Starting cuVS clustering (InnerProduct) ..." << std::endl; + std::cout.flush(); + ClusteringResult clustering_cuvs = ClusterWithCuVS(vectors_cuda_, ids_cuda_, num_clusters_, faiss::METRIC_INNER_PRODUCT); + std::cout << "[DEBUG] Completed cuVS clustering (InnerProduct)." << std::endl; + std::cout << "[DEBUG] cuVS centroids shape: "; + for (auto s : clustering_cuvs.centroids.sizes()) { std::cout << s << " "; } + std::cout << std::endl; + std::cout.flush(); + + double mse_cpu = ComputeMSE_Faiss(clustering_cpu->centroids, clustering_cpu->vectors); + double mse_cuvs = ComputeMSE_cuVS(clustering_cuvs.centroids, clustering_cuvs.clusters); + + std::cout << "[DEBUG] Inner-product Clustering MSEs:" << std::endl; + std::cout << " FAISS CPU: " << mse_cpu << std::endl; + std::cout << " cuVS : " << mse_cuvs << std::endl; + std::cout.flush(); + + double tol = 0.25; + ASSERT_NEAR(mse_cpu, mse_cuvs, tol * mse_cpu); +} diff --git a/test/cpp/test_cuvs_clustering.cpp b/test/cpp/test_cuvs_clustering.cpp new file mode 100644 index 00000000..394cf4e7 --- /dev/null +++ b/test/cpp/test_cuvs_clustering.cpp @@ -0,0 +1,88 @@ +// test_cuvs_clustering.cpp +// +// This file tests the cuVS-based k-means clustering implementation. +// It verifies that the centroids are computed correctly and that the +// entire dataset is partitioned among the clusters. + +#include +#include +#include "cuv_cluster.h" // Contains ClusterWithCuVS, ClusteringResult, and MetricType + +// Test that clustering with L2 metric produces the expected output shapes, +// and that every input vector is assigned to one and only one cluster. +TEST(CUVSClusteringTest, ClusteringL2) { + // Skip the test if CUDA is not available. + if (!torch::cuda::is_available()) { + GTEST_SKIP() << "CUDA is not available, skipping cuVS clustering test."; + } + + const int64_t num_vectors = 1000; + const int64_t dim = 32; + const int64_t num_clusters = 10; + + // Generate random data and corresponding sequential IDs on the GPU. + auto vectors = torch::randn({num_vectors, dim}, torch::kCUDA).to(torch::kFloat32).contiguous(); + auto ids = torch::arange(0, num_vectors, torch::kCUDA).contiguous(); + + // Call the cuVS-based clustering function using L2 metric. + ClusteringResult result = ClusterWithCuVS(vectors, ids, num_clusters, faiss::METRIC_L2); + + // Validate that the centroids tensor has shape [num_clusters, dim]. + ASSERT_EQ(result.centroids.dim(), 2); + ASSERT_EQ(result.centroids.size(0), num_clusters); + ASSERT_EQ(result.centroids.size(1), dim); + + // Verify that all input vectors are partitioned by summing counts from each cluster. + int64_t total_count = 0; + for (const auto& cluster : result.clusters) { + // Each cluster is returned as a pair: + torch::Tensor cluster_vectors = cluster.first; + torch::Tensor cluster_ids = cluster.second; + // The number of vectors in each cluster must match the number of IDs. + ASSERT_EQ(cluster_vectors.size(0), cluster_ids.size(0)); + total_count += cluster_vectors.size(0); + } + ASSERT_EQ(total_count, num_vectors); +} + +// Test that clustering with the inner product metric (which performs normalization) +// yields centroids that are unit-norm and partitions the dataset correctly. +TEST(CUVSClusteringTest, ClusteringInnerProduct) { + if (!torch::cuda::is_available()) { + GTEST_SKIP() << "CUDA is not available, skipping cuVS clustering test."; + } + + const int64_t num_vectors = 1000; + const int64_t dim = 64; + const int64_t num_clusters = 8; + + // Generate random input data and corresponding IDs. + auto vectors = torch::randn({num_vectors, dim}, torch::kCUDA).to(torch::kFloat32).contiguous(); + auto ids = torch::arange(0, num_vectors, torch::kCUDA).contiguous(); + + // Run clustering with inner product metric. The implementation should normalize the vectors. + ClusteringResult result = ClusterWithCuVS(vectors, ids, num_clusters, faiss::METRIC_INNER_PRODUCT); + + // Check that the centroids tensor has the expected shape. + ASSERT_EQ(result.centroids.dim(), 2); + ASSERT_EQ(result.centroids.size(0), num_clusters); + ASSERT_EQ(result.centroids.size(1), dim); + + // Optionally, validate that each centroid is normalized (i.e. its L2 norm is ~1) + auto cent_norms = result.centroids.norm(2, /*dim=*/1); + for (int64_t i = 0; i < cent_norms.size(0); ++i) { + float norm_val = cent_norms[i].item(); + // Allow a small numerical tolerance. + ASSERT_NEAR(norm_val, 1.0, 1e-3); + } + + // Confirm that the total number of vectors across all clusters is equal to the input size. + int64_t total_count = 0; + for (const auto& cluster : result.clusters) { + torch::Tensor cluster_vectors = cluster.first; + torch::Tensor cluster_ids = cluster.second; + ASSERT_EQ(cluster_vectors.size(0), cluster_ids.size(0)); + total_count += cluster_vectors.size(0); + } + ASSERT_EQ(total_count, num_vectors); +} From d68423fd1e53d63d1e1451f761a51618bade733c Mon Sep 17 00:00:00 2001 From: Jason Date: Wed, 16 Apr 2025 10:14:43 -0500 Subject: [PATCH 02/15] cleanup cuvs cmake and clustering --- CMakeLists.txt | 50 +++++------ src/cpp/include/clustering.h | 45 +++++++++- src/cpp/include/common.h | 8 -- src/cpp/include/cuv_cluster.h | 160 ---------------------------------- src/cpp/src/clustering.cpp | 147 ++++++++++++++++++++++++++----- 5 files changed, 193 insertions(+), 217 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index a337eb9d..4cf50474 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,18 +12,6 @@ set(Torch_USE_CUDA OFF CACHE BOOL "Force disable CUDA in Torch") set(Torch_NO_CUDA ON CACHE BOOL "Force disable CUDA in Torch") set(USE_CUDA OFF CACHE BOOL "Force disable CUDA globally") -# QUAKE_ENABLE_GPU: Enable GPU support for Faiss -# Default: OFF -if(QUAKE_ENABLE_GPU) - set(FAISS_ENABLE_GPU ON) -else() - set(FAISS_ENABLE_GPU OFF) -endif() - -if(QUAKE_ENABLE_GPU) - add_compile_definitions(FAISS_ENABLE_GPU) -endif() - if(QUAKE_USE_NUMA) add_compile_definitions(QUAKE_USE_NUMA) endif() @@ -64,6 +52,24 @@ set(project_BINDINGS_DIR ${CPP_SOURCE}/bindings) set(project_THIRD_PARTY_DIR ${CPP_SOURCE}/third_party) set(project_TEST_DIR test/cpp) +if(QUAKE_ENABLE_GPU) + find_package(CuVS REQUIRED) + include(${project_THIRD_PARTY_DIR}cmake/fetch_rapids.cmake) + include(rapids-cmake) + include(rapids-cpm) + include(rapids-cuda) + include(rapids-export) + include(rapids-find) + + rapids_cuda_init_architectures(quake_c) + + rapids_cpm_init() + set(BUILD_CUVS_C_LIBRARY OFF) + include(${project_THIRD_PARTY_DIR}/cmake/get_cuvs.cmake) + + add_compile_definitions(QUAKE_ENABLE_GPU) +endif() + # --------------------------------------------------------------- # Print out Compiler and Path Information # --------------------------------------------------------------- @@ -96,25 +102,10 @@ endif() # --------------------------------------------------------------- # Find Required Packages # --------------------------------------------------------------- -find_package(CuVS REQUIRED) find_package(Torch REQUIRED) find_package(Python3 COMPONENTS Development Interpreter REQUIRED) find_package(Python COMPONENTS Interpreter Development REQUIRED) -include(src/cpp/third_party/cmake/fetch_rapids.cmake) -include(rapids-cmake) -include(rapids-cpm) -include(rapids-cuda) -include(rapids-export) -include(rapids-find) - -rapids_cuda_init_architectures(quake_c) - -rapids_cpm_init() -set(BUILD_CUVS_C_LIBRARY OFF) -include(src/cpp/third_party/cmake/get_cuvs.cmake) - - message(STATUS "Torch include dir: ${TORCH_INCLUDE_DIRS}") message(STATUS "Torch libraries: ${TORCH_LIBRARIES}") message(STATUS "Python include dir: ${Python3_INCLUDE_DIRS}") @@ -146,7 +137,6 @@ target_compile_features(${PROJECT_NAME} PUBLIC cxx_std_17) # Base libraries common to all platforms set(LINK_LIBS ${TORCH_LIBRARIES} - cuvs::cuvs -ffast-math -lpthread -fPIC @@ -165,6 +155,10 @@ elseif(UNIX) if(QUAKE_USE_NUMA) list(APPEND LINK_LIBS -lnuma) endif() + + if (QUAKE_ENABLE_GPU) + list(APPEND LINK_LIBS cuvs::cuvs) + endif() else() # unsupported platform message(FATAL_ERROR "Unsupported platform") diff --git a/src/cpp/include/clustering.h b/src/cpp/include/clustering.h index e4b00229..cc98bebc 100644 --- a/src/cpp/include/clustering.h +++ b/src/cpp/include/clustering.h @@ -9,12 +9,55 @@ #include +#ifdef QUAKE_ENABLE_GPU +#include // RAFT resources (handle) +#include // RAFT device view (make_device_matrix_view, etc.) +#include // cuVS k-means API +#endif + class IndexPartition; +/** + * @brief Clusters vectors into partitions using faiss::Clustering + * + * + * @param vectors The vectors to cluster. + * @param ids The IDs of the vectors. + * @param n_clusters The number of clusters to create. + * @param metric_type The metric type to use for clustering. + * @param niter The number of iterations to run k-means. + * @param initial_centroids The initial centroids to use for k-means. + */ +shared_ptr kmeans_cpu(Tensor vectors, + Tensor ids, + int n_clusters, + MetricType metric_type, + int niter = 5, + Tensor initial_centroids = Tensor()); + +/** + * @brief Clusters vectors into partitions using CuVS k-means. + * + * + * @param vectors The vectors to cluster. + * @param ids The IDs of the vectors. + * @param n_clusters The number of clusters to create. + * @param metric_type The metric type to use for clustering. + * @param niter The number of iterations to run k-means. + * @param initial_centroids The initial centroids to use for k-means. + */ +#ifdef QUAKE_ENABLE_GPU +shared_ptr kmeans_cuvs(Tensor vectors, + Tensor ids, + int n_clusters, + MetricType metric_type, + int niter = 5, + Tensor initial_centroids = Tensor()); +#endif + /** * @brief Clusters vectors into partitions using k-means. * - * Uses the faiss::Clustering class to cluster vectors into n_clusters partitions. * * @param vectors The vectors to cluster. * @param ids The IDs of the vectors. diff --git a/src/cpp/include/common.h b/src/cpp/include/common.h index 0938591c..ff95b2a2 100644 --- a/src/cpp/include/common.h +++ b/src/cpp/include/common.h @@ -38,14 +38,6 @@ #include #endif -#ifdef FAISS_ENABLE_GPU -#include -#include -#include -#include -#include -#endif - using torch::Tensor; using std::vector; using std::unordered_map; diff --git a/src/cpp/include/cuv_cluster.h b/src/cpp/include/cuv_cluster.h index 6e2ab9b4..32fe9315 100644 --- a/src/cpp/include/cuv_cluster.h +++ b/src/cpp/include/cuv_cluster.h @@ -1,15 +1,3 @@ -#include -#include // For at::cuda::getCurrentCUDAStream() -#include // RAFT resources (handle) -#include // RAFT device view (make_device_matrix_view, etc.) -#include // cuVS k-means API - -#include -#include -#include -#include -#include - // Optionally, if you have your own enum, you can use that instead of faiss::METRIC_L2/INNER_PRODUCT. struct ClusteringResult { torch::Tensor centroids; @@ -22,153 +10,5 @@ inline ClusteringResult ClusterWithCuVS(const torch::Tensor& vectors, const torch::Tensor& ids, int64_t num_clusters, int metric = faiss::METRIC_L2) { - using clock = std::chrono::high_resolution_clock; - auto t0 = clock::now(); - - // Validate input shapes and sizes. - TORCH_CHECK(vectors.dim() == 2, "Input 'vectors' must be a 2D tensor"); - TORCH_CHECK(ids.dim() == 1 || (ids.dim() == 2 && ids.size(1) == 1), - "Input 'ids' must be a 1D tensor or 2D with shape (N,1)"); - TORCH_CHECK(vectors.size(0) == ids.size(0), "Number of ids must match number of vectors"); - TORCH_CHECK(vectors.size(0) >= num_clusters, "Number of clusters cannot exceed number of points"); - - // Move data and ids to GPU (if needed) and ensure contiguous memory. - torch::Tensor data = vectors.to(torch::kCUDA, torch::kFloat32).contiguous(); - torch::Tensor id_dev = ids.to(torch::kCUDA).contiguous(); - int64_t n_samples = data.size(0); - int64_t n_features = data.size(1); - - auto t1 = clock::now(); - std::cout << "[DEBUG] Data transfer & contiguity: " - << std::chrono::duration_cast(t1 - t0).count() << " ms" << std::endl; - - // If using inner-product (cosine), normalize input vectors. - if (metric == faiss::METRIC_INNER_PRODUCT) { - torch::Tensor norms = torch::sqrt((data * data).sum(1, /*keepdim=*/true)); - data = data / norms; - } - - auto t2 = clock::now(); - std::cout << "[DEBUG] Data normalization: " - << std::chrono::duration_cast(t2 - t1).count() << " ms" << std::endl; - - // RAFT handle and stream setup. - raft::resources handle; - cudaStream_t cuda_stream = at::cuda::getCurrentCUDAStream().stream(); - raft::resource::set_cuda_stream(handle, cuda_stream); - - auto t3 = clock::now(); - std::cout << "[DEBUG] RAFT handle & stream setup: " - << std::chrono::duration_cast(t3 - t2).count() << " ms" << std::endl; - - // Wrap the input data in a RAFT device_matrix_view. - float* data_ptr = data.data_ptr(); - auto X_view = raft::make_device_matrix_view(data_ptr, (int)n_samples, (int)n_features); - - // Allocate output centroids on GPU. - torch::Tensor centroids_tensor = torch::empty({num_clusters, n_features}, - torch::TensorOptions().dtype(torch::kFloat32).device(torch::kCUDA)); - float* centroids_ptr = centroids_tensor.data_ptr(); - auto centroids_view = raft::make_device_matrix_view(centroids_ptr, (int)num_clusters, (int)n_features); - - auto t4 = clock::now(); - std::cout << "[DEBUG] Memory allocation for centroids & data wrapping: " - << std::chrono::duration_cast(t4 - t3).count() << " ms" << std::endl; - - // Set up k-means parameters. - cuvs::cluster::kmeans::params params; - params.n_clusters = (int)num_clusters; - params.max_iter = 5; - params.init = cuvs::cluster::kmeans::params::InitMethod::Random; - // Prepare host-side scalars to capture inertia and iterations. - float inertia = 0.0f; - int iterations = 0; - auto inertia_view = raft::make_host_scalar_view(&inertia); - auto iter_view = raft::make_host_scalar_view(&iterations); - - // Run k-means clustering (fit). - cuvs::cluster::kmeans::fit(handle, params, X_view, std::nullopt, - centroids_view, inertia_view, iter_view); - - auto t5 = clock::now(); - std::cout << "[DEBUG] cuVS fit (k-means clustering): " - << std::chrono::duration_cast(t5 - t4).count() << " ms" << std::endl; - - // If inner-product, renormalize centroids. - if (metric == faiss::METRIC_INNER_PRODUCT) { - torch::Tensor cent_norms = torch::sqrt((centroids_tensor * centroids_tensor).sum(1, /*keepdim=*/true)); - centroids_tensor.div_(cent_norms); - } - - auto t6 = clock::now(); - std::cout << "[DEBUG] Centroid renormalization (if applicable): " - << std::chrono::duration_cast(t6 - t5).count() << " ms" << std::endl; - - // Allocate memory for labels and run prediction. - torch::Tensor labels = torch::empty({n_samples}, torch::TensorOptions().dtype(torch::kInt32).device(torch::kCUDA)); - int* labels_ptr = labels.data_ptr(); - auto labels_view = raft::make_device_vector_view(labels_ptr, (int)n_samples); - - cuvs::cluster::kmeans::predict(handle, params, X_view, std::nullopt, - centroids_view, labels_view, false, - raft::make_host_scalar_view(&inertia)); - - auto t7 = clock::now(); - std::cout << "[DEBUG] cuVS predict: " - << std::chrono::duration_cast(t7 - t6).count() << " ms" << std::endl; - - // Synchronize the stream. - raft::resource::sync_stream(handle); - auto t8 = clock::now(); - std::cout << "[DEBUG] CUDA stream synchronization: " - << std::chrono::duration_cast(t8 - t7).count() << " ms" << std::endl; - - // ----- Grouping (GPU vectorized) ----- - // Sort the labels and get the sorted indices. - torch::Tensor sorted_tuple = std::get<1>(torch::sort(labels)); // sorted indices only, since sorted labels are not needed - torch::Tensor sorted_labels = labels.index_select(0, sorted_tuple); - - // Reorder the data and ids using the sorted indices. - torch::Tensor sorted_data = data.index_select(0, sorted_tuple); - torch::Tensor sorted_ids = id_dev.index_select(0, sorted_tuple); - - // Compute per-cluster counts using torch::bincount. - torch::Tensor counts = torch::bincount(sorted_labels.to(torch::kInt64), /*weights=*/{}, num_clusters); - - // Transfer counts to CPU and build a vector for split sizes. - auto counts_cpu = counts.to(torch::kCPU); - std::vector split_sizes(counts_cpu.data_ptr(), counts_cpu.data_ptr() + counts_cpu.numel()); - - auto t9 = clock::now(); - std::cout << "[DEBUG] Sorting, counting, and preparing split sizes: " - << std::chrono::duration_cast(t9 - t8).count() << " ms" << std::endl; - - // Split the sorted data and ids into clusters. - std::vector cluster_vectors = torch::split(sorted_data, split_sizes, 0); - std::vector cluster_ids = torch::split(sorted_ids, split_sizes, 0); - - auto t10 = clock::now(); - std::cout << "[DEBUG] Splitting into clusters: " - << std::chrono::duration_cast(t10 - t9).count() << " ms" << std::endl; - - // Build the final clustering result. - std::vector> clusters; - clusters.reserve(num_clusters); - for (int i = 0; i < num_clusters; ++i) { - clusters.emplace_back(cluster_vectors[i], cluster_ids[i]); - } - - auto t_end = clock::now(); - std::cout << "[DEBUG] Total grouping time: " - << std::chrono::duration_cast(t_end - t8).count() << " ms" << std::endl; - - ClusteringResult result; - result.centroids = centroids_tensor; // Shape: (num_clusters, n_features) - result.clusters = std::move(clusters); - - std::cout << "[DEBUG] Total cuVS clustering time: " - << std::chrono::duration_cast(t_end - t0).count() << " ms" << std::endl; - - return result; } diff --git a/src/cpp/src/clustering.cpp b/src/cpp/src/clustering.cpp index ebbc88e0..50052a72 100644 --- a/src/cpp/src/clustering.cpp +++ b/src/cpp/src/clustering.cpp @@ -10,12 +10,115 @@ #include "index_partition.h" #include -shared_ptr kmeans(Tensor vectors, +#ifdef QUAKE_ENABLE_GPU +shared_ptr kmeans_cuvs(Tensor vectors, + Tensor ids, + int num_clusters, + MetricType metric, + int niter, + Tensor initial_centroids) { + // Validate input shapes and sizes. + TORCH_CHECK(vectors.dim() == 2, "Input 'vectors' must be a 2D tensor"); + TORCH_CHECK(ids.dim() == 1 || (ids.dim() == 2 && ids.size(1) == 1), + "Input 'ids' must be a 1D tensor or 2D with shape (N,1)"); + TORCH_CHECK(vectors.size(0) == ids.size(0), "Number of ids must match number of vectors"); + TORCH_CHECK(vectors.size(0) >= num_clusters, "Number of clusters cannot exceed number of points"); + + int64_t n_samples = data.size(0); + int64_t n_features = data.size(1); + + // If using inner-product (cosine), normalize input vectors. + if (metric == faiss::METRIC_INNER_PRODUCT) { + Tensor norms = torch::sqrt((data * data).sum(1, /*keepdim=*/true)); + data = data / norms; + } + + // RAFT handle and stream setup. + raft::resources handle; + cudaStream_t cuda_stream = at::cuda::getCurrentCUDAStream().stream(); + raft::resource::set_cuda_stream(handle, cuda_stream); + + // Wrap the input data in a RAFT device_matrix_view. + float* data_ptr = data.data_ptr(); + auto X_view = raft::make_host_matrix_view(data_ptr, (int)n_samples, (int)n_features); + + // Allocate output centroids on GPU. + Tensor centroids_tensor = torch::empty({num_clusters, n_features}, + torch::TensorOptions().dtype(torch::kFloat32).device(torch::kCUDA)); + float* centroids_ptr = centroids_tensor.data_ptr(); + auto centroids_view = raft::make_device_matrix_view(centroids_ptr, (int)num_clusters, (int)n_features); + + // Set up k-means parameters. + cuvs::cluster::kmeans::params params; + params.n_clusters = (int)num_clusters; + params.max_iter = niter; + params.init = cuvs::cluster::kmeans::params::InitMethod::Random; + + // Prepare host-side scalars to capture inertia and iterations. + float inertia = 0.0f; + int iterations = 0; + auto inertia_view = raft::make_host_scalar_view(&inertia); + auto iter_view = raft::make_host_scalar_view(&iterations); + + // Run k-means clustering (fit). + cuvs::cluster::kmeans::fit(handle, params, X_view, std::nullopt, + centroids_view, inertia_view, iter_view); + + // If inner-product, renormalize centroids. + if (metric == faiss::METRIC_INNER_PRODUCT) { + Tensor cent_norms = torch::sqrt((centroids_tensor * centroids_tensor).sum(1, /*keepdim=*/true)); + centroids_tensor.div_(cent_norms); + } + + // Allocate memory for labels and run prediction. + Tensor labels = torch::empty({n_samples}, torch::TensorOptions().dtype(torch::kInt32).device(torch::kCUDA)); + int* labels_ptr = labels.data_ptr(); + auto labels_view = raft::make_device_vector_view(labels_ptr, (int)n_samples); + + cuvs::cluster::kmeans::predict(handle, params, X_view, std::nullopt, + centroids_view, labels_view, false, + raft::make_host_scalar_view(&inertia)); + + // Synchronize the stream. + raft::resource::sync_stream(handle); + + // ----- Grouping (GPU vectorized) ----- + // Sort the labels and get the sorted indices. + Tensor sorted_tuple = std::get<1>(torch::sort(labels)); // sorted indices only, since sorted labels are not needed + Tensor sorted_labels = labels.index_select(0, sorted_tuple); + + // Reorder the data and ids using the sorted indices. + Tensor sorted_data = data.index_select(0, sorted_tuple); + Tensor sorted_ids = id_dev.index_select(0, sorted_tuple); + + // Compute per-cluster counts using torch::bincount. + Tensor counts = torch::bincount(sorted_labels.to(torch::kInt64), /*weights=*/{}, num_clusters); + + // Transfer counts to CPU and build a vector for split sizes. + auto counts_cpu = counts.to(torch::kCPU); + std::vector split_sizes(counts_cpu.data_ptr(), counts_cpu.data_ptr() + counts_cpu.numel()); + + // Split the sorted data and ids into clusters. + vector cluster_vectors = torch::split(sorted_data, split_sizes, 0); + vector cluster_ids = torch::split(sorted_ids, split_sizes, 0); + + Tensor partition_ids = torch::arange(num_clusters, torch::kInt64); + + shared_ptr clustering = std::make_shared(); + clustering->centroids = centroids_tensor; + clustering->partition_ids = partition_ids; + clustering->vectors = cluster_vectors; + clustering->vector_ids = cluster_ids; + + return clustering; +} +#endif + +shared_ptr kmeans_cpu(Tensor vectors, Tensor ids, int n_clusters, MetricType metric_type, int niter, - bool use_gpu /*=false*/, Tensor /* initial_centroids */) { // Ensure enough vectors are available and sizes match. assert(vectors.size(0) >= n_clusters * 2); @@ -29,24 +132,10 @@ shared_ptr kmeans(Tensor vectors, int d = vectors.size(1); faiss::Index* index_ptr = nullptr; - - if (use_gpu) { - // Check if GPU resources are available. - #ifdef FAISS_ENABLE_GPU - faiss::gpu::StandardGpuResources gpu_res; - if (metric_type == faiss::METRIC_INNER_PRODUCT) - index_ptr = new faiss::gpu::GpuIndexFlatIP(&gpu_res, d); - else - index_ptr = new faiss::gpu::GpuIndexFlatL2(&gpu_res, d); - #else - throw std::runtime_error("GPU resources are not available. Please compile with FAISS_ENABLE_GPU."); - #endif - } else { - if (metric_type == faiss::METRIC_INNER_PRODUCT) - index_ptr = new faiss::IndexFlatIP(d); - else - index_ptr = new faiss::IndexFlatL2(d); - } + if (metric_type == faiss::METRIC_INNER_PRODUCT) + index_ptr = new faiss::IndexFlatIP(d); + else + index_ptr = new faiss::IndexFlatL2(d); faiss::ClusteringParameters cp; cp.niter = niter; @@ -96,6 +185,24 @@ shared_ptr kmeans(Tensor vectors, return clustering; } +shared_ptr kmeans(Tensor vectors, + Tensor ids, + int n_clusters, + MetricType metric_type, + int niter, + bool use_gpu /*=false*/, + Tensor /* initial_centroids */) { + if (use_gpu) { + #ifdef QUAKE_ENABLE_GPU + return kmeans_cuvs(vectors, ids, n_clusters, metric_type, niter); + #elif + throw std::runtime_error("GPU support is not enabled. Please compile with QUAKE_ENABLE_GPU."); + #endif + } else { + return kmeans_cpu(vectors, ids, n_clusters, metric_type, niter); + } +} + tuple >> kmeans_refine_partitions( Tensor centroids, vector> partitions, From 8bb1929185d72b3e467b60b0b08430d60ac3829a Mon Sep 17 00:00:00 2001 From: Jason Date: Wed, 16 Apr 2025 11:21:42 -0500 Subject: [PATCH 03/15] add clustering tests --- test/cpp/clustering.cpp | 179 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 179 insertions(+) create mode 100644 test/cpp/clustering.cpp diff --git a/test/cpp/clustering.cpp b/test/cpp/clustering.cpp new file mode 100644 index 00000000..9c0b76c3 --- /dev/null +++ b/test/cpp/clustering.cpp @@ -0,0 +1,179 @@ +#include +#include +#include "clustering.h" +#include + +// Helper functions to generate random test data. +static Tensor generate_random_data(int64_t num_vectors, int64_t dim) { + return torch::randn({num_vectors, dim}, torch::kFloat32).contiguous(); +} + +static Tensor generate_sequential_ids(int64_t count, int64_t start = 0) { + return torch::arange(start, start + count, torch::kInt64).contiguous(); +} + +// Helpers to compute mean squared error (MSE) for clustering. +// For FAISS-based clustering: 'vectors' is a vector of tensors (one per cluster). +static double compute_mse(const Tensor& centroids, + const std::vector& clusters) { + double total_error = 0.0; + int64_t total_count = 0; + auto centroids_cpu = centroids.to(torch::kCPU); + for (size_t i = 0; i < clusters.size(); ++i) { + if (clusters[i].size(0) > 0) { + auto cluster = clusters[i].to(torch::kCPU); + auto diff = cluster - centroids_cpu[i].unsqueeze(0); + total_error += diff.pow(2).sum().item(); + total_count += cluster.size(0); + } + } + return total_count > 0 ? total_error / total_count : 0.0; +} + +// Test fixture for clustering tests. +class ClusteringTest : public ::testing::Test { + protected: + const int64_t num_vectors = 5000; + const int64_t dim = 64; + const int num_clusters = 20; + Tensor vectors_cpu, ids_cpu; + Tensor vectors_cuda, ids_cuda; + + void SetUp() override { + // Skip these tests if CUDA is not available. + if (!torch::cuda::is_available()) { + GTEST_SKIP() << "CUDA is not available; skipping clustering tests."; + } + vectors_cpu = generate_random_data(num_vectors, dim); + ids_cpu = generate_sequential_ids(num_vectors); +#ifdef QUAKE_ENABLE_GPU + vectors_cuda = vectors_cpu.to(torch::kCUDA).contiguous(); + ids_cuda = ids_cpu.to(torch::kCUDA).contiguous(); +#endif + } +}; + +// Compare clustering methods using the L2 (Euclidean) metric. +#ifdef QUAKE_ENABLE_GPU +TEST_F(ClusteringTest, CompareClustering_L2) { + const int niter = 20; + // FAISS-based clustering on CPU. + auto clustering_cpu = kmeans_cpu(vectors_cpu, ids_cpu, num_clusters, faiss::METRIC_L2, niter, Tensor()); + // cuVS-based clustering on GPU. + auto clustering_cuvs = kmeans_cuvs(vectors_cuda, ids_cuda, num_clusters, faiss::METRIC_L2); + + // Verify centroid shapes. + ASSERT_EQ(clustering_cpu->centroids.dim(), 2); + ASSERT_EQ(clustering_cpu->centroids.size(0), num_clusters); + ASSERT_EQ(clustering_cpu->centroids.size(1), dim); + ASSERT_EQ(clustering_cuvs->centroids.dim(), 2); + ASSERT_EQ(clustering_cuvs->centroids.size(0), num_clusters); + ASSERT_EQ(clustering_cuvs->centroids.size(1), dim); + + // Compare the mean squared errors (MSEs) between the methods. + double mse_cpu = compute_mse(clustering_cpu->centroids, clustering_cpu->vectors); + double mse_cuvs = compute_mse(clustering_cuvs->centroids, clustering_cuvs->vectors); + // They should agree within roughly 20% relative difference. + ASSERT_NEAR(mse_cpu, mse_cuvs, mse_cpu * 0.20); +} + +// Compare clustering methods using the inner product metric. +TEST_F(ClusteringTest, CompareClustering_InnerProduct) { + const int niter = 20; + auto clustering_cpu = kmeans(vectors_cpu, ids_cpu, num_clusters, + faiss::METRIC_INNER_PRODUCT, niter, false, Tensor()); + auto clustering_cuvs = kmeans_cuvs(vectors_cuda, ids_cuda, num_clusters, faiss::METRIC_INNER_PRODUCT); + + // Verify centroid shapes. + ASSERT_EQ(clustering_cpu->centroids.dim(), 2); + ASSERT_EQ(clustering_cpu->centroids.size(0), num_clusters); + ASSERT_EQ(clustering_cpu->centroids.size(1), dim); + ASSERT_EQ(clustering_cuvs->centroids.dim(), 2); + ASSERT_EQ(clustering_cuvs->centroids.size(0), num_clusters); + ASSERT_EQ(clustering_cuvs->centroids.size(1), dim); + + // For inner product, check that cuVS centroids are normalized. + auto norms = clustering_cuvs->centroids.norm(2, 1); + for (int i = 0; i < norms.size(0); ++i) { + ASSERT_NEAR(norms[i].item(), 1.0f, 1e-3); + } + + double mse_cpu = compute_mse(clustering_cpu->centroids, clustering_cpu->vectors); + double mse_cuvs = compute_mse(clustering_cuvs->centroids, clustering_cuvs->vectors); + // Allow a bit larger relative difference for inner product. + ASSERT_NEAR(mse_cpu, mse_cuvs, mse_cpu * 0.25); +} + +// Test that cuVS clustering partitions all vectors correctly (L2 metric). +TEST_F(ClusteringTest, CUVSClustering_Partitioning_L2) { + auto result = kmeans_cuvs(vectors_cuda, ids_cuda, num_clusters, faiss::METRIC_L2); + ASSERT_EQ(result->centroids.dim(), 2); + ASSERT_EQ(result->centroids.size(0), num_clusters); + ASSERT_EQ(result->centroids.size(1), dim); + + int64_t total_vectors = 0; + for (int i = 0; i < num_clusters; ++i) { + auto cluster = result->vectors[i]; + ASSERT_EQ(cluster.size(0), result->vector_ids[i].size(0)); + total_vectors += cluster.size(0); + } + ASSERT_EQ(total_vectors, num_vectors); +} + + +// Test that cuVS clustering (inner product) produces unit-norm centroids and correctly partitions vectors. +TEST_F(ClusteringTest, CUVSClustering_Partitioning_InnerProduct) { + auto result = kmeans_cuvs(vectors_cuda, ids_cuda, num_clusters, faiss::METRIC_INNER_PRODUCT); + ASSERT_EQ(result->centroids.dim(), 2); + ASSERT_EQ(result->centroids.size(0), num_clusters); + ASSERT_EQ(result->centroids.size(1), dim); + + // Verify centroids are normalized. + auto norms = result->centroids.norm(2, 1); + for (int i = 0; i < norms.size(0); ++i) { + ASSERT_NEAR(norms[i].item(), 1.0f, 1e-3); + } + + int64_t total_vectors = 0; + for (int i = 0; i < num_clusters; ++i) { + auto cluster = result->vectors[i]; + ASSERT_EQ(cluster.size(0), result->vector_ids[i].size(0)); + total_vectors += cluster.size(0); + } + ASSERT_EQ(total_vectors, num_vectors); +} +#endif + +TEST_F(ClusteringTest, KMeansCPU_L2) { + // Test CPU-based k-means clustering. + int niter = 10; + auto clustering = kmeans_cpu(vectors_cpu, ids_cpu, num_clusters, faiss::METRIC_L2, niter); + ASSERT_EQ(clustering->centroids.dim(), 2); + ASSERT_EQ(clustering->centroids.size(0), num_clusters); + ASSERT_EQ(clustering->centroids.size(1), dim); + + int64_t total_vectors = 0; + for (int i = 0; i < num_clusters; ++i) { + auto cluster = clustering->vectors[i]; + ASSERT_EQ(cluster.size(0), clustering->vector_ids[i].size(0)); + total_vectors += cluster.size(0); + } + ASSERT_EQ(total_vectors, num_vectors); +} + +TEST_F(ClusteringTest, KMeansCPU_InnerProduct) { + // Test CPU-based k-means clustering with inner product metric. + int niter = 10; + auto clustering = kmeans_cpu(vectors_cpu, ids_cpu, num_clusters, faiss::METRIC_INNER_PRODUCT, niter); + ASSERT_EQ(clustering->centroids.dim(), 2); + ASSERT_EQ(clustering->centroids.size(0), num_clusters); + ASSERT_EQ(clustering->centroids.size(1), dim); + + int64_t total_vectors = 0; + for (int i = 0; i < num_clusters; ++i) { + auto cluster = clustering->vectors[i]; + ASSERT_EQ(cluster.size(0), clustering->vector_ids[i].size(0)); + total_vectors += cluster.size(0); + } + ASSERT_EQ(total_vectors, num_vectors); +} \ No newline at end of file From 13a316188e791effd07566f186074983b0d4a737 Mon Sep 17 00:00:00 2001 From: Jason Date: Wed, 16 Apr 2025 11:21:54 -0500 Subject: [PATCH 04/15] remove old tests --- test/cpp/test_clustering_benchmark.cpp | 79 ------------ test/cpp/test_clustering_correctness.cpp | 157 ----------------------- test/cpp/test_cuvs_clustering.cpp | 88 ------------- 3 files changed, 324 deletions(-) delete mode 100644 test/cpp/test_clustering_benchmark.cpp delete mode 100644 test/cpp/test_clustering_correctness.cpp delete mode 100644 test/cpp/test_cuvs_clustering.cpp diff --git a/test/cpp/test_clustering_benchmark.cpp b/test/cpp/test_clustering_benchmark.cpp deleted file mode 100644 index 733ca95f..00000000 --- a/test/cpp/test_clustering_benchmark.cpp +++ /dev/null @@ -1,79 +0,0 @@ -// benchmark_clustering_gtest.cpp -// -// This Google Test benchmarks two clustering implementations: -// 1. FAISS-based CPU clustering (kmeans with use_gpu == false) -// 2. cuVS-based GPU clustering (ClusterWithCuVS) -// on a dataset of 4,000,000 vectors (of dimension 128) and 10,000 clusters. -// The test prints the elapsed time (milliseconds) for each method. - -#include -#include -#include -#include -#include "clustering.h" // Declaration of kmeans() and ClusterWithCuVS() -#include "cuv_cluster.h" - -using namespace std::chrono; - -TEST(ClusteringBenchmark, CPU_vs_cuVS) { - // Verify CUDA is available. - if (!torch::cuda::is_available()) { - GTEST_SKIP() << "CUDA is not available; skipping benchmark."; - } - - // Define benchmark parameters. - const int64_t num_vectors = 10000000; // 10 million vectors - const int64_t dim = 128; - const int num_clusters = 10000; - const int niter = 5; // number of clustering iterations - - std::cout << "Benchmarking clustering with " << num_vectors << " vectors, dimension " - << dim << ", and " << num_clusters << " clusters." << std::endl; - - // Create CPU tensors. - auto options_float_cpu = torch::TensorOptions().dtype(torch::kFloat32).device(torch::kCPU); - auto options_int64_cpu = torch::TensorOptions().dtype(torch::kInt64).device(torch::kCPU); - torch::Tensor vectors_cpu = torch::randn({num_vectors, dim}, options_float_cpu).contiguous(); - torch::Tensor ids_cpu = torch::arange(0, num_vectors, options_int64_cpu).contiguous(); - - // Create CUDA tensors. - auto options_float_cuda = torch::TensorOptions().dtype(torch::kFloat32).device(torch::kCUDA); - auto options_int64_cuda = torch::TensorOptions().dtype(torch::kInt64).device(torch::kCUDA); - torch::Tensor vectors_cuda = vectors_cpu.to(options_float_cuda).contiguous(); - torch::Tensor ids_cuda = ids_cpu.to(options_int64_cuda).contiguous(); - - // Warm up the GPU using the cuVS clustering (to remove initialization overhead). - std::cout << "[DEBUG] Warming up GPU with cuVS clustering..." << std::endl; - { - auto dummy = ClusterWithCuVS(vectors_cuda, ids_cuda, 10, faiss::METRIC_L2); - torch::cuda::synchronize(); - } - std::cout << "[DEBUG] Warm-up complete." << std::endl; - - // Benchmark FAISS CPU clustering. - std::cout << "[DEBUG] Starting FAISS CPU clustering benchmark..." << std::endl; - auto start_cpu = high_resolution_clock::now(); - auto clustering_cpu = kmeans(vectors_cpu, ids_cpu, num_clusters, faiss::METRIC_L2, niter, false, torch::Tensor()); - auto end_cpu = high_resolution_clock::now(); - auto duration_cpu = duration_cast(end_cpu - start_cpu).count(); - std::cout << "FAISS CPU clustering time: " << duration_cpu << " ms" << std::endl; - - // Benchmark cuVS GPU clustering. - std::cout << "[DEBUG] Starting cuVS clustering benchmark..." << std::endl; - auto start_cuvs = high_resolution_clock::now(); - auto clustering_cuvs = ClusterWithCuVS(vectors_cuda, ids_cuda, num_clusters, faiss::METRIC_L2); - torch::cuda::synchronize(); // ensure GPU work is complete - auto end_cuvs = high_resolution_clock::now(); - auto duration_cuvs = duration_cast(end_cuvs - start_cuvs).count(); - std::cout << "cuVS clustering time: " << duration_cuvs << " ms" << std::endl; - - // Print summary. - std::cout << "---------------------------" << std::endl; - std::cout << "FAISS CPU clustering took " << duration_cpu << " ms." << std::endl; - std::cout << "cuVS clustering took " << duration_cuvs << " ms." << std::endl; - std::cout << "---------------------------" << std::endl; - - // Basic checks. - EXPECT_GT(duration_cpu, 0); - EXPECT_GT(duration_cuvs, 0); -} diff --git a/test/cpp/test_clustering_correctness.cpp b/test/cpp/test_clustering_correctness.cpp deleted file mode 100644 index 01fff623..00000000 --- a/test/cpp/test_clustering_correctness.cpp +++ /dev/null @@ -1,157 +0,0 @@ -// test_clustering_correctness.cpp -// -// This file tests the correctness of two clustering implementations: -// 1. FAISS-based clustering on CPU: kmeans(..., use_gpu = false) -// 2. cuVS-based clustering: ClusterWithCuVS(...) -// The tests compute the mean squared error (MSE) between each input vector and its -// corresponding cluster centroid and require that the MSE from the two methods -// agree within a reasonable tolerance. Additionally, the tests verify that each method -// returns centroids of the correct shape and partitions the input set completely. - -#include -#include -#include -#include -#include "clustering.h" // Declaration of kmeans(), ClusterWithCuVS(), Clustering, ClusteringResult, and MetricType - -// Helper to compute MSE for FAISS-based clustering results. -// The 'vectors' field of Clustering is a vector of tensors (one per cluster). -double ComputeMSE_Faiss(const torch::Tensor& centroids, const std::vector& cluster_vectors) { - double total_error = 0.0; - int64_t total_count = 0; - // Ensure centroids are on CPU. - torch::Tensor centroids_cpu = centroids.to(torch::kCPU); - for (int i = 0; i < centroids_cpu.size(0); ++i) { - if (cluster_vectors[i].size(0) > 0) { - // Move the cluster vectors to CPU. - torch::Tensor cluster_cpu = cluster_vectors[i].to(torch::kCPU); - torch::Tensor diff = cluster_cpu - centroids_cpu[i].unsqueeze(0); - total_error += diff.pow(2).sum().item(); - total_count += cluster_cpu.size(0); - } - } - return total_count > 0 ? total_error / total_count : 0.0; -} - -// Helper to compute MSE for cuVS-based clustering results. -// Here, clusters are returned as a vector of pairs where the first tensor is the cluster vectors. -double ComputeMSE_cuVS(const torch::Tensor& centroids, - const std::vector>& clusters) { - double total_error = 0.0; - int64_t total_count = 0; - torch::Tensor centroids_cpu = centroids.to(torch::kCPU); - for (int i = 0; i < centroids_cpu.size(0); ++i) { - const torch::Tensor& cluster_vectors = clusters[i].first; - if (cluster_vectors.size(0) > 0) { - torch::Tensor cluster_cpu = cluster_vectors.to(torch::kCPU); - torch::Tensor diff = cluster_cpu - centroids_cpu[i].unsqueeze(0); - total_error += diff.pow(2).sum().item(); - total_count += cluster_cpu.size(0); - } - } - return total_count > 0 ? total_error / total_count : 0.0; -} - -// Test fixture for clustering correctness tests. -class ClusteringCorrectnessTest : public ::testing::Test { - protected: - // Use a moderate number of vectors for correctness testing. - const int64_t num_vectors_ = 5000; - const int64_t dim_ = 64; - const int num_clusters_ = 20; - // CPU tensors. - torch::Tensor vectors_cpu_; - torch::Tensor ids_cpu_; - // CUDA tensors. - torch::Tensor vectors_cuda_; - torch::Tensor ids_cuda_; - - void SetUp() override { - if (!torch::cuda::is_available()) { - GTEST_SKIP() << "CUDA is not available; skipping clustering correctness tests."; - } - // Create CPU tensors. - auto options_float_cpu = torch::TensorOptions().dtype(torch::kFloat32).device(torch::kCPU); - auto options_int64_cpu = torch::TensorOptions().dtype(torch::kInt64).device(torch::kCPU); - vectors_cpu_ = torch::randn({num_vectors_, dim_}, options_float_cpu).contiguous(); - ids_cpu_ = torch::arange(0, num_vectors_, options_int64_cpu).contiguous(); - // Create CUDA versions. - auto options_float_cuda = torch::TensorOptions().dtype(torch::kFloat32).device(torch::kCUDA); - auto options_int64_cuda = torch::TensorOptions().dtype(torch::kInt64).device(torch::kCUDA); - vectors_cuda_ = vectors_cpu_.to(options_float_cuda).contiguous(); - ids_cuda_ = ids_cpu_.to(options_int64_cuda).contiguous(); - - std::cout << "[DEBUG] SetUp complete: " - << "num_vectors = " << num_vectors_ - << ", dim = " << dim_ - << ", clusters = " << num_clusters_ << std::endl; - std::cout.flush(); - } -}; - -TEST_F(ClusteringCorrectnessTest, CompareClusteringMethods_L2) { - const int niter = 20; // number of iterations for clustering - - std::cout << "[DEBUG] Starting FAISS CPU clustering ..." << std::endl; - std::cout.flush(); - auto clustering_cpu = kmeans(vectors_cpu_, ids_cpu_, num_clusters_, faiss::METRIC_L2, niter, false, torch::Tensor()); - std::cout << "[DEBUG] Completed FAISS CPU clustering." << std::endl; - std::cout << "[DEBUG] CPU centroids shape: "; - for (auto s : clustering_cpu->centroids.sizes()) { std::cout << s << " "; } - std::cout << std::endl; - std::cout.flush(); - - std::cout << "[DEBUG] Starting cuVS clustering ..." << std::endl; - std::cout.flush(); - ClusteringResult clustering_cuvs = ClusterWithCuVS(vectors_cuda_, ids_cuda_, num_clusters_, faiss::METRIC_L2); - std::cout << "[DEBUG] Completed cuVS clustering." << std::endl; - std::cout << "[DEBUG] cuVS centroids shape: "; - for (auto s : clustering_cuvs.centroids.sizes()) { std::cout << s << " "; } - std::cout << std::endl; - std::cout.flush(); - - // Compute MSE for each method. - double mse_cpu = ComputeMSE_Faiss(clustering_cpu->centroids, clustering_cpu->vectors); - double mse_cuvs = ComputeMSE_cuVS(clustering_cuvs.centroids, clustering_cuvs.clusters); - - std::cout << "[DEBUG] L2 Clustering MSEs:" << std::endl; - std::cout << " FAISS CPU: " << mse_cpu << std::endl; - std::cout << " cuVS : " << mse_cuvs << std::endl; - std::cout.flush(); - - // Assert that MSE values are similar (within 20% relative difference) - double tol = 0.20; - ASSERT_NEAR(mse_cpu, mse_cuvs, tol * mse_cpu); -} - -TEST_F(ClusteringCorrectnessTest, CompareClusteringMethods_InnerProduct) { - const int niter = 20; - std::cout << "[DEBUG] Starting FAISS CPU clustering (InnerProduct) ..." << std::endl; - std::cout.flush(); - auto clustering_cpu = kmeans(vectors_cpu_, ids_cpu_, num_clusters_, faiss::METRIC_INNER_PRODUCT, niter, false, torch::Tensor()); - std::cout << "[DEBUG] Completed FAISS CPU clustering (InnerProduct)." << std::endl; - std::cout << "[DEBUG] CPU centroids shape: "; - for (auto s : clustering_cpu->centroids.sizes()) { std::cout << s << " "; } - std::cout << std::endl; - std::cout.flush(); - - std::cout << "[DEBUG] Starting cuVS clustering (InnerProduct) ..." << std::endl; - std::cout.flush(); - ClusteringResult clustering_cuvs = ClusterWithCuVS(vectors_cuda_, ids_cuda_, num_clusters_, faiss::METRIC_INNER_PRODUCT); - std::cout << "[DEBUG] Completed cuVS clustering (InnerProduct)." << std::endl; - std::cout << "[DEBUG] cuVS centroids shape: "; - for (auto s : clustering_cuvs.centroids.sizes()) { std::cout << s << " "; } - std::cout << std::endl; - std::cout.flush(); - - double mse_cpu = ComputeMSE_Faiss(clustering_cpu->centroids, clustering_cpu->vectors); - double mse_cuvs = ComputeMSE_cuVS(clustering_cuvs.centroids, clustering_cuvs.clusters); - - std::cout << "[DEBUG] Inner-product Clustering MSEs:" << std::endl; - std::cout << " FAISS CPU: " << mse_cpu << std::endl; - std::cout << " cuVS : " << mse_cuvs << std::endl; - std::cout.flush(); - - double tol = 0.25; - ASSERT_NEAR(mse_cpu, mse_cuvs, tol * mse_cpu); -} diff --git a/test/cpp/test_cuvs_clustering.cpp b/test/cpp/test_cuvs_clustering.cpp deleted file mode 100644 index 394cf4e7..00000000 --- a/test/cpp/test_cuvs_clustering.cpp +++ /dev/null @@ -1,88 +0,0 @@ -// test_cuvs_clustering.cpp -// -// This file tests the cuVS-based k-means clustering implementation. -// It verifies that the centroids are computed correctly and that the -// entire dataset is partitioned among the clusters. - -#include -#include -#include "cuv_cluster.h" // Contains ClusterWithCuVS, ClusteringResult, and MetricType - -// Test that clustering with L2 metric produces the expected output shapes, -// and that every input vector is assigned to one and only one cluster. -TEST(CUVSClusteringTest, ClusteringL2) { - // Skip the test if CUDA is not available. - if (!torch::cuda::is_available()) { - GTEST_SKIP() << "CUDA is not available, skipping cuVS clustering test."; - } - - const int64_t num_vectors = 1000; - const int64_t dim = 32; - const int64_t num_clusters = 10; - - // Generate random data and corresponding sequential IDs on the GPU. - auto vectors = torch::randn({num_vectors, dim}, torch::kCUDA).to(torch::kFloat32).contiguous(); - auto ids = torch::arange(0, num_vectors, torch::kCUDA).contiguous(); - - // Call the cuVS-based clustering function using L2 metric. - ClusteringResult result = ClusterWithCuVS(vectors, ids, num_clusters, faiss::METRIC_L2); - - // Validate that the centroids tensor has shape [num_clusters, dim]. - ASSERT_EQ(result.centroids.dim(), 2); - ASSERT_EQ(result.centroids.size(0), num_clusters); - ASSERT_EQ(result.centroids.size(1), dim); - - // Verify that all input vectors are partitioned by summing counts from each cluster. - int64_t total_count = 0; - for (const auto& cluster : result.clusters) { - // Each cluster is returned as a pair: - torch::Tensor cluster_vectors = cluster.first; - torch::Tensor cluster_ids = cluster.second; - // The number of vectors in each cluster must match the number of IDs. - ASSERT_EQ(cluster_vectors.size(0), cluster_ids.size(0)); - total_count += cluster_vectors.size(0); - } - ASSERT_EQ(total_count, num_vectors); -} - -// Test that clustering with the inner product metric (which performs normalization) -// yields centroids that are unit-norm and partitions the dataset correctly. -TEST(CUVSClusteringTest, ClusteringInnerProduct) { - if (!torch::cuda::is_available()) { - GTEST_SKIP() << "CUDA is not available, skipping cuVS clustering test."; - } - - const int64_t num_vectors = 1000; - const int64_t dim = 64; - const int64_t num_clusters = 8; - - // Generate random input data and corresponding IDs. - auto vectors = torch::randn({num_vectors, dim}, torch::kCUDA).to(torch::kFloat32).contiguous(); - auto ids = torch::arange(0, num_vectors, torch::kCUDA).contiguous(); - - // Run clustering with inner product metric. The implementation should normalize the vectors. - ClusteringResult result = ClusterWithCuVS(vectors, ids, num_clusters, faiss::METRIC_INNER_PRODUCT); - - // Check that the centroids tensor has the expected shape. - ASSERT_EQ(result.centroids.dim(), 2); - ASSERT_EQ(result.centroids.size(0), num_clusters); - ASSERT_EQ(result.centroids.size(1), dim); - - // Optionally, validate that each centroid is normalized (i.e. its L2 norm is ~1) - auto cent_norms = result.centroids.norm(2, /*dim=*/1); - for (int64_t i = 0; i < cent_norms.size(0); ++i) { - float norm_val = cent_norms[i].item(); - // Allow a small numerical tolerance. - ASSERT_NEAR(norm_val, 1.0, 1e-3); - } - - // Confirm that the total number of vectors across all clusters is equal to the input size. - int64_t total_count = 0; - for (const auto& cluster : result.clusters) { - torch::Tensor cluster_vectors = cluster.first; - torch::Tensor cluster_ids = cluster.second; - ASSERT_EQ(cluster_vectors.size(0), cluster_ids.size(0)); - total_count += cluster_vectors.size(0); - } - ASSERT_EQ(total_count, num_vectors); -} From b201997dda368bf162e35903c532362d6e0b1863 Mon Sep 17 00:00:00 2001 From: Jason Date: Wed, 16 Apr 2025 11:27:45 -0500 Subject: [PATCH 05/15] update cmake --- CMakeLists.txt | 4 ++-- test/cpp/quake_index.cpp | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4cf50474..5b6d8859 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -54,7 +54,7 @@ set(project_TEST_DIR test/cpp) if(QUAKE_ENABLE_GPU) find_package(CuVS REQUIRED) - include(${project_THIRD_PARTY_DIR}cmake/fetch_rapids.cmake) + include(${project_THIRD_PARTY_DIR}/cmake/fetch_rapids.cmake) include(rapids-cmake) include(rapids-cpm) include(rapids-cuda) @@ -221,7 +221,7 @@ endif() # --------------------------------------------------------------- message(STATUS "--------- Final Configuration Summary ---------") message(STATUS "Build type: ${CMAKE_BUILD_TYPE}") -message(STATUS "GPU Enabled: ${FAISS_ENABLE_GPU}") +message(STATUS "GPU Enabled: ${QUAKE_ENABLE_GPU}") message(STATUS "NUMA Enabled: ${QUAKE_USE_NUMA}") message(STATUS "Python used: ${Python3_EXECUTABLE}") message(STATUS "Torch Path: ${TorchPath}") diff --git a/test/cpp/quake_index.cpp b/test/cpp/quake_index.cpp index 0521ea02..242c16df 100644 --- a/test/cpp/quake_index.cpp +++ b/test/cpp/quake_index.cpp @@ -283,7 +283,7 @@ TEST(QuakeIndexStressTest, LargeBuildTest) { << " vectors took " << build_duration_ms << " ms.\n"; } -#ifdef FAISS_ENABLE_GPU +#ifdef QUAKE_ENABLE_GPU TEST(QuakeIndexStressTestGPU, LargeBuildTest) { // Attempt to build an index with a large number of vectors. // Adjust these numbers based on your available memory/compute. @@ -527,7 +527,7 @@ TEST(QuakeIndexStressTest, SearchAddRemoveMaintenanceTest) { } // Define the GPU related test only if FAISS GPU support is enabled -#ifdef FAISS_ENABLE_GPU +#ifdef QUAKE_ENABLE_GPU // Test build with GPU enabled TEST(QuakeIndexGPUTest, BuildWithGPUTest) { int64_t dimension = 32; From e39c79628a51b7888e1ebc895f40bc6a5ee08cf9 Mon Sep 17 00:00:00 2001 From: Jason Date: Wed, 16 Apr 2025 11:49:04 -0500 Subject: [PATCH 06/15] update clustering --- src/cpp/include/clustering.h | 6 ------ src/cpp/include/cuv_cluster.h | 14 -------------- src/cpp/src/clustering.cpp | 19 ++++++++++++------- 3 files changed, 12 insertions(+), 27 deletions(-) delete mode 100644 src/cpp/include/cuv_cluster.h diff --git a/src/cpp/include/clustering.h b/src/cpp/include/clustering.h index cc98bebc..e069527c 100644 --- a/src/cpp/include/clustering.h +++ b/src/cpp/include/clustering.h @@ -9,12 +9,6 @@ #include -#ifdef QUAKE_ENABLE_GPU -#include // RAFT resources (handle) -#include // RAFT device view (make_device_matrix_view, etc.) -#include // cuVS k-means API -#endif - class IndexPartition; /** diff --git a/src/cpp/include/cuv_cluster.h b/src/cpp/include/cuv_cluster.h deleted file mode 100644 index 32fe9315..00000000 --- a/src/cpp/include/cuv_cluster.h +++ /dev/null @@ -1,14 +0,0 @@ -// Optionally, if you have your own enum, you can use that instead of faiss::METRIC_L2/INNER_PRODUCT. -struct ClusteringResult { - torch::Tensor centroids; - // Each pair is (cluster_vectors, cluster_ids) for one cluster. - std::vector> clusters; -}; - - -inline ClusteringResult ClusterWithCuVS(const torch::Tensor& vectors, - const torch::Tensor& ids, - int64_t num_clusters, - int metric = faiss::METRIC_L2) { - -} diff --git a/src/cpp/src/clustering.cpp b/src/cpp/src/clustering.cpp index 50052a72..36f9fdae 100644 --- a/src/cpp/src/clustering.cpp +++ b/src/cpp/src/clustering.cpp @@ -11,6 +11,11 @@ #include #ifdef QUAKE_ENABLE_GPU +#include // RAFT resources (handle) +#include // RAFT device view (make_device_matrix_view, etc.) +#include // cuVS k-means API +#endif + shared_ptr kmeans_cuvs(Tensor vectors, Tensor ids, int num_clusters, @@ -24,13 +29,13 @@ shared_ptr kmeans_cuvs(Tensor vectors, TORCH_CHECK(vectors.size(0) == ids.size(0), "Number of ids must match number of vectors"); TORCH_CHECK(vectors.size(0) >= num_clusters, "Number of clusters cannot exceed number of points"); - int64_t n_samples = data.size(0); - int64_t n_features = data.size(1); + int64_t n_samples = vectors.size(0); + int64_t n_features = vectors.size(1); // If using inner-product (cosine), normalize input vectors. if (metric == faiss::METRIC_INNER_PRODUCT) { - Tensor norms = torch::sqrt((data * data).sum(1, /*keepdim=*/true)); - data = data / norms; + Tensor norms = torch::sqrt((vectors * vectors).sum(1, /*keepdim=*/true)); + vectors = vectors / norms; } // RAFT handle and stream setup. @@ -39,7 +44,7 @@ shared_ptr kmeans_cuvs(Tensor vectors, raft::resource::set_cuda_stream(handle, cuda_stream); // Wrap the input data in a RAFT device_matrix_view. - float* data_ptr = data.data_ptr(); + float* data_ptr = vectors.data_ptr(); auto X_view = raft::make_host_matrix_view(data_ptr, (int)n_samples, (int)n_features); // Allocate output centroids on GPU. @@ -88,8 +93,8 @@ shared_ptr kmeans_cuvs(Tensor vectors, Tensor sorted_labels = labels.index_select(0, sorted_tuple); // Reorder the data and ids using the sorted indices. - Tensor sorted_data = data.index_select(0, sorted_tuple); - Tensor sorted_ids = id_dev.index_select(0, sorted_tuple); + Tensor sorted_data = vectors.index_select(0, sorted_tuple); + Tensor sorted_ids = ids.index_select(0, sorted_tuple); // Compute per-cluster counts using torch::bincount. Tensor counts = torch::bincount(sorted_labels.to(torch::kInt64), /*weights=*/{}, num_clusters); From 174e8d44539faff03baf8bf9ecafd0be2084cd77 Mon Sep 17 00:00:00 2001 From: Jason Date: Wed, 16 Apr 2025 11:49:23 -0500 Subject: [PATCH 07/15] update clustering --- src/cpp/src/clustering.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/cpp/src/clustering.cpp b/src/cpp/src/clustering.cpp index 36f9fdae..01f9f0c7 100644 --- a/src/cpp/src/clustering.cpp +++ b/src/cpp/src/clustering.cpp @@ -14,7 +14,6 @@ #include // RAFT resources (handle) #include // RAFT device view (make_device_matrix_view, etc.) #include // cuVS k-means API -#endif shared_ptr kmeans_cuvs(Tensor vectors, Tensor ids, From 1c57d974c822cb7fbfbc83520ee45868f0ce5d56 Mon Sep 17 00:00:00 2001 From: Jason Date: Wed, 16 Apr 2025 14:22:23 -0500 Subject: [PATCH 08/15] update clustering --- src/cpp/src/clustering.cpp | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/src/cpp/src/clustering.cpp b/src/cpp/src/clustering.cpp index 01f9f0c7..c4535662 100644 --- a/src/cpp/src/clustering.cpp +++ b/src/cpp/src/clustering.cpp @@ -11,6 +11,7 @@ #include #ifdef QUAKE_ENABLE_GPU +#include #include // RAFT resources (handle) #include // RAFT device view (make_device_matrix_view, etc.) #include // cuVS k-means API @@ -21,6 +22,7 @@ shared_ptr kmeans_cuvs(Tensor vectors, MetricType metric, int niter, Tensor initial_centroids) { + // Validate input shapes and sizes. TORCH_CHECK(vectors.dim() == 2, "Input 'vectors' must be a 2D tensor"); TORCH_CHECK(ids.dim() == 1 || (ids.dim() == 2 && ids.size(1) == 1), @@ -39,22 +41,23 @@ shared_ptr kmeans_cuvs(Tensor vectors, // RAFT handle and stream setup. raft::resources handle; - cudaStream_t cuda_stream = at::cuda::getCurrentCUDAStream().stream(); + // Replace the at::cuda call with c10's current CUDA stream. + cudaStream_t cuda_stream = c10::cuda::getCurrentCUDAStream(); raft::resource::set_cuda_stream(handle, cuda_stream); - // Wrap the input data in a RAFT device_matrix_view. + // Wrap the input data in a RAFT *device* matrix view. float* data_ptr = vectors.data_ptr(); - auto X_view = raft::make_host_matrix_view(data_ptr, (int)n_samples, (int)n_features); + auto X_view = raft::make_device_matrix_view(data_ptr, (int)n_samples, (int)n_features); // Allocate output centroids on GPU. Tensor centroids_tensor = torch::empty({num_clusters, n_features}, - torch::TensorOptions().dtype(torch::kFloat32).device(torch::kCUDA)); + torch::TensorOptions().dtype(torch::kFloat32).device(torch::kCUDA)); float* centroids_ptr = centroids_tensor.data_ptr(); auto centroids_view = raft::make_device_matrix_view(centroids_ptr, (int)num_clusters, (int)n_features); // Set up k-means parameters. cuvs::cluster::kmeans::params params; - params.n_clusters = (int)num_clusters; + params.n_clusters = num_clusters; params.max_iter = niter; params.init = cuvs::cluster::kmeans::params::InitMethod::Random; @@ -66,12 +69,12 @@ shared_ptr kmeans_cuvs(Tensor vectors, // Run k-means clustering (fit). cuvs::cluster::kmeans::fit(handle, params, X_view, std::nullopt, - centroids_view, inertia_view, iter_view); + centroids_view, inertia_view, iter_view); // If inner-product, renormalize centroids. if (metric == faiss::METRIC_INNER_PRODUCT) { - Tensor cent_norms = torch::sqrt((centroids_tensor * centroids_tensor).sum(1, /*keepdim=*/true)); - centroids_tensor.div_(cent_norms); + Tensor cent_norms = torch::sqrt((centroids_tensor * centroids_tensor).sum(1, /*keepdim=*/true)); + centroids_tensor.div_(cent_norms); } // Allocate memory for labels and run prediction. @@ -80,15 +83,15 @@ shared_ptr kmeans_cuvs(Tensor vectors, auto labels_view = raft::make_device_vector_view(labels_ptr, (int)n_samples); cuvs::cluster::kmeans::predict(handle, params, X_view, std::nullopt, - centroids_view, labels_view, false, - raft::make_host_scalar_view(&inertia)); + centroids_view, labels_view, false, + raft::make_host_scalar_view(&inertia)); // Synchronize the stream. raft::resource::sync_stream(handle); // ----- Grouping (GPU vectorized) ----- // Sort the labels and get the sorted indices. - Tensor sorted_tuple = std::get<1>(torch::sort(labels)); // sorted indices only, since sorted labels are not needed + Tensor sorted_tuple = std::get<1>(torch::sort(labels)); // sorted indices only Tensor sorted_labels = labels.index_select(0, sorted_tuple); // Reorder the data and ids using the sorted indices. @@ -97,8 +100,6 @@ shared_ptr kmeans_cuvs(Tensor vectors, // Compute per-cluster counts using torch::bincount. Tensor counts = torch::bincount(sorted_labels.to(torch::kInt64), /*weights=*/{}, num_clusters); - - // Transfer counts to CPU and build a vector for split sizes. auto counts_cpu = counts.to(torch::kCPU); std::vector split_sizes(counts_cpu.data_ptr(), counts_cpu.data_ptr() + counts_cpu.numel()); From 2b2e96104d5a7e9d00b60a2d45cdd36237f3b623 Mon Sep 17 00:00:00 2001 From: Jason Date: Wed, 16 Apr 2025 22:28:48 -0500 Subject: [PATCH 09/15] add CPU-GPU clustering --- src/cpp/src/clustering.cpp | 249 ++++++++++++++++++++++--------------- 1 file changed, 152 insertions(+), 97 deletions(-) diff --git a/src/cpp/src/clustering.cpp b/src/cpp/src/clustering.cpp index c4535662..f6954c3b 100644 --- a/src/cpp/src/clustering.cpp +++ b/src/cpp/src/clustering.cpp @@ -16,106 +16,155 @@ #include // RAFT device view (make_device_matrix_view, etc.) #include // cuVS k-means API -shared_ptr kmeans_cuvs(Tensor vectors, - Tensor ids, +shared_ptr kmeans_cuvs_sample_and_predict( + Tensor vectors, Tensor ids, int num_clusters, MetricType metric, + int sample_size, int niter, - Tensor initial_centroids) { - - // Validate input shapes and sizes. - TORCH_CHECK(vectors.dim() == 2, "Input 'vectors' must be a 2D tensor"); - TORCH_CHECK(ids.dim() == 1 || (ids.dim() == 2 && ids.size(1) == 1), - "Input 'ids' must be a 1D tensor or 2D with shape (N,1)"); - TORCH_CHECK(vectors.size(0) == ids.size(0), "Number of ids must match number of vectors"); - TORCH_CHECK(vectors.size(0) >= num_clusters, "Number of clusters cannot exceed number of points"); - - int64_t n_samples = vectors.size(0); - int64_t n_features = vectors.size(1); - - // If using inner-product (cosine), normalize input vectors. - if (metric == faiss::METRIC_INNER_PRODUCT) { - Tensor norms = torch::sqrt((vectors * vectors).sum(1, /*keepdim=*/true)); - vectors = vectors / norms; + int gpu_batch_size) { + + TORCH_CHECK(vectors.dim()==2, "vectors must be [N,D]"); + TORCH_CHECK(ids.dim()==1, "ids must be [N]"); + int64_t N = vectors.size(0), D = vectors.size(1); + TORCH_CHECK(sample_size > 0 && sample_size <= N, + "invalid sample_size"); + + // 1) pin + normalize if needed + Tensor cpu_pts = vectors.contiguous().pin_memory(); + if (metric == faiss::METRIC_INNER_PRODUCT) { + auto norms = cpu_pts.norm(2,1,true); + cpu_pts = cpu_pts.div(norms); + } + + // 2) choose a random sample of indices + auto perm = torch::randperm(N, torch::kLong); + auto samp_idx = perm.slice(0, 0, sample_size); + Tensor samp_pts = cpu_pts.index_select(0, samp_idx); + Tensor samp_ids = ids.index_select(0, samp_idx); + + // 3) move sample to GPU + Tensor samp_gpu = samp_pts.to(torch::kCUDA, /*non_blocking=*/true) + .contiguous(); + + // 4) prepare RAFT handle & cuVS params + raft::resources handle; + cudaStream_t stream = c10::cuda::getCurrentCUDAStream(); + raft::resource::set_cuda_stream(handle, stream); + + cuvs::cluster::kmeans::params params; + params.n_clusters = num_clusters; + params.init = cuvs::cluster::kmeans::params::InitMethod::Random; + params.max_iter = niter; + + // 5) allocate centroids on GPU + Tensor cent_gpu = torch::empty({num_clusters, D}, + torch::kFloat32, + torch::TensorOptions().device(torch::kCUDA)) + .contiguous(); + + // 6) run fit on just the sample + { + auto X_view = raft::make_device_matrix_view( + samp_gpu.data_ptr(), + (int)sample_size, (int)D); + auto C_view = raft::make_device_matrix_view( + cent_gpu.data_ptr(), + num_clusters, (int)D); + + cuvs::cluster::kmeans::fit( + handle, params, + X_view, + std::nullopt, + C_view, + raft::make_host_scalar_view(nullptr), + raft::make_host_scalar_view(nullptr) + ); + } + + // 7) now predict labels for all N in batches + Tensor all_labels = torch::empty({N}, torch::kLong); + Tensor labels32 = torch::empty({gpu_batch_size}, + torch::kInt32, + torch::TensorOptions().device(torch::kCUDA)); + auto predict_fn = [&](Tensor batch_cpu, int64_t off) { + int64_t bs = batch_cpu.size(0); + Tensor batch_gpu = batch_cpu.to(torch::kCUDA, /*NB=*/true) + .contiguous(); + auto Xv = raft::make_device_matrix_view( + batch_gpu.data_ptr(), + (int)bs, (int)D); + auto Lv = raft::make_device_vector_view( + labels32.data_ptr(), (int)bs); + + cuvs::cluster::kmeans::predict( + handle, params, + Xv, + std::nullopt, + raft::make_device_matrix_view( + cent_gpu.data_ptr(), + num_clusters, (int)D), + Lv, + false, + raft::make_host_scalar_view(nullptr) + ); + + // copy back + all_labels.narrow(0, off, bs) + .copy_(labels32.slice(0,0,bs) + .to(torch::kLong) + .to(torch::kCPU)); + }; + + // 7a) predict for the sample slice + predict_fn(samp_pts, /*off=*/0); + // 7b) predict for the rest + int64_t written = sample_size; + // we’ll write the rest starting at index sample_size + for (int64_t off = 0; off < N; off += gpu_batch_size) { + int64_t bs = std::min(gpu_batch_size, N - off); + // skip the sample zone + if (off < sample_size) { + // overlap: part sample / part rest + if (off + bs <= sample_size) { + // whole chunk was sample: already done + continue; + } else { + // split chunk + int64_t s_end = sample_size - off; + int64_t r_bs = bs - s_end; + Tensor rest_chunk = cpu_pts.slice(0, off + s_end, off + bs); + predict_fn(rest_chunk, /*off=*/off + s_end); + continue; + } } - - // RAFT handle and stream setup. - raft::resources handle; - // Replace the at::cuda call with c10's current CUDA stream. - cudaStream_t cuda_stream = c10::cuda::getCurrentCUDAStream(); - raft::resource::set_cuda_stream(handle, cuda_stream); - - // Wrap the input data in a RAFT *device* matrix view. - float* data_ptr = vectors.data_ptr(); - auto X_view = raft::make_device_matrix_view(data_ptr, (int)n_samples, (int)n_features); - - // Allocate output centroids on GPU. - Tensor centroids_tensor = torch::empty({num_clusters, n_features}, - torch::TensorOptions().dtype(torch::kFloat32).device(torch::kCUDA)); - float* centroids_ptr = centroids_tensor.data_ptr(); - auto centroids_view = raft::make_device_matrix_view(centroids_ptr, (int)num_clusters, (int)n_features); - - // Set up k-means parameters. - cuvs::cluster::kmeans::params params; - params.n_clusters = num_clusters; - params.max_iter = niter; - params.init = cuvs::cluster::kmeans::params::InitMethod::Random; - - // Prepare host-side scalars to capture inertia and iterations. - float inertia = 0.0f; - int iterations = 0; - auto inertia_view = raft::make_host_scalar_view(&inertia); - auto iter_view = raft::make_host_scalar_view(&iterations); - - // Run k-means clustering (fit). - cuvs::cluster::kmeans::fit(handle, params, X_view, std::nullopt, - centroids_view, inertia_view, iter_view); - - // If inner-product, renormalize centroids. - if (metric == faiss::METRIC_INNER_PRODUCT) { - Tensor cent_norms = torch::sqrt((centroids_tensor * centroids_tensor).sum(1, /*keepdim=*/true)); - centroids_tensor.div_(cent_norms); - } - - // Allocate memory for labels and run prediction. - Tensor labels = torch::empty({n_samples}, torch::TensorOptions().dtype(torch::kInt32).device(torch::kCUDA)); - int* labels_ptr = labels.data_ptr(); - auto labels_view = raft::make_device_vector_view(labels_ptr, (int)n_samples); - - cuvs::cluster::kmeans::predict(handle, params, X_view, std::nullopt, - centroids_view, labels_view, false, - raft::make_host_scalar_view(&inertia)); - - // Synchronize the stream. - raft::resource::sync_stream(handle); - - // ----- Grouping (GPU vectorized) ----- - // Sort the labels and get the sorted indices. - Tensor sorted_tuple = std::get<1>(torch::sort(labels)); // sorted indices only - Tensor sorted_labels = labels.index_select(0, sorted_tuple); - - // Reorder the data and ids using the sorted indices. - Tensor sorted_data = vectors.index_select(0, sorted_tuple); - Tensor sorted_ids = ids.index_select(0, sorted_tuple); - - // Compute per-cluster counts using torch::bincount. - Tensor counts = torch::bincount(sorted_labels.to(torch::kInt64), /*weights=*/{}, num_clusters); - auto counts_cpu = counts.to(torch::kCPU); - std::vector split_sizes(counts_cpu.data_ptr(), counts_cpu.data_ptr() + counts_cpu.numel()); - - // Split the sorted data and ids into clusters. - vector cluster_vectors = torch::split(sorted_data, split_sizes, 0); - vector cluster_ids = torch::split(sorted_ids, split_sizes, 0); - - Tensor partition_ids = torch::arange(num_clusters, torch::kInt64); - - shared_ptr clustering = std::make_shared(); - clustering->centroids = centroids_tensor; - clustering->partition_ids = partition_ids; - clustering->vectors = cluster_vectors; - clustering->vector_ids = cluster_ids; - - return clustering; + // pure rest + Tensor rest_chunk = cpu_pts.slice(0, off, off + bs); + predict_fn(rest_chunk, /*off=*/off); + } + + // 8) group on CPU + Tensor sorted_lbl, sorted_idx; + std::tie(sorted_lbl, sorted_idx) = torch::sort(all_labels); + Tensor sorted_vecs = vectors.index_select(0, sorted_idx); + Tensor sorted_ids = ids.index_select(0, sorted_idx); + + Tensor counts = torch::bincount(sorted_lbl, /*weights=*/{}, num_clusters); + auto cnt_cpu = counts.to(torch::kCPU); + std::vector split_sizes( + cnt_cpu.data_ptr(), + cnt_cpu.data_ptr() + num_clusters + ); + + auto cluster_vecs = torch::split(sorted_vecs, split_sizes, 0); + auto cluster_ids = torch::split(sorted_ids, split_sizes, 0); + + auto out = std::make_shared(); + out->centroids = cent_gpu.cpu().contiguous(); + out->partition_ids = torch::arange(num_clusters, torch::kLong); + out->vectors = std::move(cluster_vecs); + out->vector_ids = std::move(cluster_ids); + return out; } #endif @@ -199,7 +248,13 @@ shared_ptr kmeans(Tensor vectors, Tensor /* initial_centroids */) { if (use_gpu) { #ifdef QUAKE_ENABLE_GPU - return kmeans_cuvs(vectors, ids, n_clusters, metric_type, niter); + const int sample_size = std::min(1000000, vectors.size(0)); + const int gpu_batch_size = 100000; // or from build_params + return kmeans_cuvs_sample_and_predict( + vectors, ids, + n_clusters, metric, + sample_size, niter, + gpu_batch_size); #elif throw std::runtime_error("GPU support is not enabled. Please compile with QUAKE_ENABLE_GPU."); #endif From 19d5c30dd3a4c915511d22291eb86bf2933cb9c8 Mon Sep 17 00:00:00 2001 From: Jason Mohoney Date: Thu, 17 Apr 2025 17:22:46 +0000 Subject: [PATCH 10/15] cpu-gpu k-means bug fix --- CMakeLists.txt | 2 +- src/cpp/include/clustering.h | 13 +- src/cpp/src/clustering.cpp | 104 ++++++++++----- test/cpp/clustering.cpp | 245 ++++++++++++++--------------------- test/cpp/quake_index.cpp | 16 ++- 5 files changed, 184 insertions(+), 196 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 5b6d8859..fa71a403 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -70,6 +70,7 @@ if(QUAKE_ENABLE_GPU) add_compile_definitions(QUAKE_ENABLE_GPU) endif() +set(FAISS_ENABLE_GPU OFF) # --------------------------------------------------------------- # Print out Compiler and Path Information # --------------------------------------------------------------- @@ -87,7 +88,6 @@ message(STATUS "QUAKE_USE_NUMA: ${QUAKE_USE_NUMA}") # Apple-specific adjustments if(APPLE) include_directories("/opt/homebrew/opt/openblas/include") - set(FAISS_ENABLE_GPU OFF) endif() # Compiler options and definitions diff --git a/src/cpp/include/clustering.h b/src/cpp/include/clustering.h index e069527c..130b5a1d 100644 --- a/src/cpp/include/clustering.h +++ b/src/cpp/include/clustering.h @@ -41,12 +41,13 @@ shared_ptr kmeans_cpu(Tensor vectors, * @param initial_centroids The initial centroids to use for k-means. */ #ifdef QUAKE_ENABLE_GPU -shared_ptr kmeans_cuvs(Tensor vectors, - Tensor ids, - int n_clusters, - MetricType metric_type, - int niter = 5, - Tensor initial_centroids = Tensor()); +shared_ptr kmeans_cuvs_sample_and_predict( + Tensor vectors, Tensor ids, + int num_clusters, + MetricType metric, + int sample_size, + int niter, + int gpu_batch_size); #endif /** diff --git a/src/cpp/src/clustering.cpp b/src/cpp/src/clustering.cpp index f6954c3b..d5520320 100644 --- a/src/cpp/src/clustering.cpp +++ b/src/cpp/src/clustering.cpp @@ -24,8 +24,15 @@ shared_ptr kmeans_cuvs_sample_and_predict( int niter, int gpu_batch_size) { - TORCH_CHECK(vectors.dim()==2, "vectors must be [N,D]"); - TORCH_CHECK(ids.dim()==1, "ids must be [N]"); + std::cout << "[kmeans] Starting sample_and_predict: N=" + << vectors.size(0) << ", D=" << vectors.size(1) + << ", clusters=" << num_clusters + << ", sample_size=" << sample_size + << ", niter=" << niter + << ", batch_size=" << gpu_batch_size << std::endl; + + TORCH_CHECK(vectors.dim() == 2, "vectors must be [N,D]"); + TORCH_CHECK(ids.dim() == 1, "ids must be [N]"); int64_t N = vectors.size(0), D = vectors.size(1); TORCH_CHECK(sample_size > 0 && sample_size <= N, "invalid sample_size"); @@ -33,19 +40,20 @@ shared_ptr kmeans_cuvs_sample_and_predict( // 1) pin + normalize if needed Tensor cpu_pts = vectors.contiguous().pin_memory(); if (metric == faiss::METRIC_INNER_PRODUCT) { - auto norms = cpu_pts.norm(2,1,true); + auto norms = cpu_pts.norm(2, 1, true); cpu_pts = cpu_pts.div(norms); + std::cout << "[kmeans] Normalized for inner product metric" << std::endl; } // 2) choose a random sample of indices - auto perm = torch::randperm(N, torch::kLong); + std::cout << "[kmeans] Sampling " << sample_size << " points" << std::endl; + auto perm = torch::randperm(N, torch::kLong); auto samp_idx = perm.slice(0, 0, sample_size); Tensor samp_pts = cpu_pts.index_select(0, samp_idx); - Tensor samp_ids = ids.index_select(0, samp_idx); // 3) move sample to GPU - Tensor samp_gpu = samp_pts.to(torch::kCUDA, /*non_blocking=*/true) - .contiguous(); + std::cout << "[kmeans] Uploading sample to GPU" << std::endl; + Tensor samp_gpu = samp_pts.to(torch::kCUDA, /*non_blocking=*/true).contiguous(); // 4) prepare RAFT handle & cuVS params raft::resources handle; @@ -59,12 +67,20 @@ shared_ptr kmeans_cuvs_sample_and_predict( // 5) allocate centroids on GPU Tensor cent_gpu = torch::empty({num_clusters, D}, - torch::kFloat32, - torch::TensorOptions().device(torch::kCUDA)) + torch::TensorOptions() + .dtype(torch::kFloat32) + .device(torch::kCUDA)) .contiguous(); // 6) run fit on just the sample { + std::cout << "[kmeans] Running cuVS fit on sample" << std::endl; + // host scalars + float inertia = 0.0f; + int actual_iter= 0; + auto host_inertia = raft::make_host_scalar_view(&inertia); + auto host_iter = raft::make_host_scalar_view(&actual_iter); + auto X_view = raft::make_device_matrix_view( samp_gpu.data_ptr(), (int)sample_size, (int)D); @@ -77,26 +93,39 @@ shared_ptr kmeans_cuvs_sample_and_predict( X_view, std::nullopt, C_view, - raft::make_host_scalar_view(nullptr), - raft::make_host_scalar_view(nullptr) + host_inertia, + host_iter ); + + std::cout << "[kmeans] Fit complete: inertia=" << inertia + << ", iterations=" << actual_iter << std::endl; } - // 7) now predict labels for all N in batches + std::cout << "[kmeans] Beginning predict over all " << N << " points" << std::endl; Tensor all_labels = torch::empty({N}, torch::kLong); - Tensor labels32 = torch::empty({gpu_batch_size}, - torch::kInt32, - torch::TensorOptions().device(torch::kCUDA)); + auto predict_fn = [&](Tensor batch_cpu, int64_t off) { int64_t bs = batch_cpu.size(0); + + // allocate exactly bs labels on the GPU + Tensor labels32 = torch::empty( + {bs}, + torch::TensorOptions() + .dtype(torch::kInt32) + .device(torch::kCUDA)); + Tensor batch_gpu = batch_cpu.to(torch::kCUDA, /*NB=*/true) - .contiguous(); + .contiguous(); + auto Xv = raft::make_device_matrix_view( batch_gpu.data_ptr(), (int)bs, (int)D); auto Lv = raft::make_device_vector_view( labels32.data_ptr(), (int)bs); + float pred_inertia = 0.0f; + auto host_pred = raft::make_host_scalar_view(&pred_inertia); + cuvs::cluster::kmeans::predict( handle, params, Xv, @@ -106,44 +135,44 @@ shared_ptr kmeans_cuvs_sample_and_predict( num_clusters, (int)D), Lv, false, - raft::make_host_scalar_view(nullptr) + host_pred ); - // copy back + std::cout << "[kmeans] Predict batch off=" << off + << ", bs=" << bs + << ", inertia=" << pred_inertia << std::endl; + + // now safe to copy back exactly bs elements all_labels.narrow(0, off, bs) - .copy_(labels32.slice(0,0,bs) - .to(torch::kLong) - .to(torch::kCPU)); + .copy_( + labels32.to(torch::kLong) + .to(torch::kCPU) + ); }; - // 7a) predict for the sample slice + // predict the sample slice + std::cout << "[kmeans] Predicting sample slice [0," << sample_size << ")" << std::endl; predict_fn(samp_pts, /*off=*/0); - // 7b) predict for the rest - int64_t written = sample_size; - // we’ll write the rest starting at index sample_size + + // predict the rest + std::cout << "[kmeans] Predicting remaining chunks" << std::endl; for (int64_t off = 0; off < N; off += gpu_batch_size) { int64_t bs = std::min(gpu_batch_size, N - off); - // skip the sample zone if (off < sample_size) { - // overlap: part sample / part rest if (off + bs <= sample_size) { - // whole chunk was sample: already done continue; } else { - // split chunk - int64_t s_end = sample_size - off; - int64_t r_bs = bs - s_end; - Tensor rest_chunk = cpu_pts.slice(0, off + s_end, off + bs); - predict_fn(rest_chunk, /*off=*/off + s_end); + int64_t overlap = sample_size - off; + Tensor rest_chunk = cpu_pts.slice(0, off + overlap, off + bs); + predict_fn(rest_chunk, /*off=*/off + overlap); continue; } } - // pure rest Tensor rest_chunk = cpu_pts.slice(0, off, off + bs); predict_fn(rest_chunk, /*off=*/off); } - // 8) group on CPU + std::cout << "[kmeans] Grouping on CPU" << std::endl; Tensor sorted_lbl, sorted_idx; std::tie(sorted_lbl, sorted_idx) = torch::sort(all_labels); Tensor sorted_vecs = vectors.index_select(0, sorted_idx); @@ -164,8 +193,11 @@ shared_ptr kmeans_cuvs_sample_and_predict( out->partition_ids = torch::arange(num_clusters, torch::kLong); out->vectors = std::move(cluster_vecs); out->vector_ids = std::move(cluster_ids); + + std::cout << "[kmeans] Completed sample_and_predict\n"; return out; } + #endif shared_ptr kmeans_cpu(Tensor vectors, @@ -252,7 +284,7 @@ shared_ptr kmeans(Tensor vectors, const int gpu_batch_size = 100000; // or from build_params return kmeans_cuvs_sample_and_predict( vectors, ids, - n_clusters, metric, + n_clusters, metric_type, sample_size, niter, gpu_batch_size); #elif diff --git a/test/cpp/clustering.cpp b/test/cpp/clustering.cpp index 9c0b76c3..c94cb44a 100644 --- a/test/cpp/clustering.cpp +++ b/test/cpp/clustering.cpp @@ -1,179 +1,132 @@ #include #include #include "clustering.h" -#include -// Helper functions to generate random test data. -static Tensor generate_random_data(int64_t num_vectors, int64_t dim) { - return torch::randn({num_vectors, dim}, torch::kFloat32).contiguous(); +// Helpers to generate random data and sequential ids +static torch::Tensor generate_random_data(int64_t N, int64_t D) { + return torch::randn({N, D}, torch::kFloat32).contiguous(); } - -static Tensor generate_sequential_ids(int64_t count, int64_t start = 0) { - return torch::arange(start, start + count, torch::kInt64).contiguous(); +static torch::Tensor generate_sequential_ids(int64_t N, int64_t start = 0) { + return torch::arange(start, start + N, torch::kInt64).contiguous(); } -// Helpers to compute mean squared error (MSE) for clustering. -// For FAISS-based clustering: 'vectors' is a vector of tensors (one per cluster). -static double compute_mse(const Tensor& centroids, - const std::vector& clusters) { - double total_error = 0.0; - int64_t total_count = 0; - auto centroids_cpu = centroids.to(torch::kCPU); +// Compute mean squared error for clustering (for CPU sanity) +static double compute_mse(const torch::Tensor& centroids, + const std::vector& clusters) { + double total_err = 0.0; + int64_t count = 0; + auto C = centroids.to(torch::kCPU); for (size_t i = 0; i < clusters.size(); ++i) { - if (clusters[i].size(0) > 0) { - auto cluster = clusters[i].to(torch::kCPU); - auto diff = cluster - centroids_cpu[i].unsqueeze(0); - total_error += diff.pow(2).sum().item(); - total_count += cluster.size(0); - } + auto cl = clusters[i].to(torch::kCPU); + if (cl.size(0) == 0) continue; + auto diff = cl - C[i].unsqueeze(0); + total_err += diff.pow(2).sum().item(); + count += cl.size(0); } - return total_count > 0 ? total_error / total_count : 0.0; + return count>0 ? total_err / count : 0.0; } -// Test fixture for clustering tests. +// Fixture class ClusteringTest : public ::testing::Test { protected: const int64_t num_vectors = 5000; - const int64_t dim = 64; - const int num_clusters = 20; - Tensor vectors_cpu, ids_cpu; - Tensor vectors_cuda, ids_cuda; + const int64_t dim = 64; + const int num_clusters= 20; + + torch::Tensor vectors_cpu, ids_cpu; +#ifdef QUAKE_ENABLE_GPU + torch::Tensor vectors_cuda, ids_cuda; +#endif void SetUp() override { - // Skip these tests if CUDA is not available. - if (!torch::cuda::is_available()) { - GTEST_SKIP() << "CUDA is not available; skipping clustering tests."; - } vectors_cpu = generate_random_data(num_vectors, dim); - ids_cpu = generate_sequential_ids(num_vectors); + ids_cpu = generate_sequential_ids(num_vectors); + #ifdef QUAKE_ENABLE_GPU + if (!torch::cuda::is_available()) { + GTEST_SKIP() << "CUDA not available"; + } vectors_cuda = vectors_cpu.to(torch::kCUDA).contiguous(); - ids_cuda = ids_cpu.to(torch::kCUDA).contiguous(); + ids_cuda = ids_cpu.to(torch::kCUDA).contiguous(); #endif } }; -// Compare clustering methods using the L2 (Euclidean) metric. -#ifdef QUAKE_ENABLE_GPU -TEST_F(ClusteringTest, CompareClustering_L2) { - const int niter = 20; - // FAISS-based clustering on CPU. - auto clustering_cpu = kmeans_cpu(vectors_cpu, ids_cpu, num_clusters, faiss::METRIC_L2, niter, Tensor()); - // cuVS-based clustering on GPU. - auto clustering_cuvs = kmeans_cuvs(vectors_cuda, ids_cuda, num_clusters, faiss::METRIC_L2); - - // Verify centroid shapes. - ASSERT_EQ(clustering_cpu->centroids.dim(), 2); - ASSERT_EQ(clustering_cpu->centroids.size(0), num_clusters); - ASSERT_EQ(clustering_cpu->centroids.size(1), dim); - ASSERT_EQ(clustering_cuvs->centroids.dim(), 2); - ASSERT_EQ(clustering_cuvs->centroids.size(0), num_clusters); - ASSERT_EQ(clustering_cuvs->centroids.size(1), dim); - - // Compare the mean squared errors (MSEs) between the methods. - double mse_cpu = compute_mse(clustering_cpu->centroids, clustering_cpu->vectors); - double mse_cuvs = compute_mse(clustering_cuvs->centroids, clustering_cuvs->vectors); - // They should agree within roughly 20% relative difference. - ASSERT_NEAR(mse_cpu, mse_cuvs, mse_cpu * 0.20); -} - -// Compare clustering methods using the inner product metric. -TEST_F(ClusteringTest, CompareClustering_InnerProduct) { - const int niter = 20; - auto clustering_cpu = kmeans(vectors_cpu, ids_cpu, num_clusters, - faiss::METRIC_INNER_PRODUCT, niter, false, Tensor()); - auto clustering_cuvs = kmeans_cuvs(vectors_cuda, ids_cuda, num_clusters, faiss::METRIC_INNER_PRODUCT); - - // Verify centroid shapes. - ASSERT_EQ(clustering_cpu->centroids.dim(), 2); - ASSERT_EQ(clustering_cpu->centroids.size(0), num_clusters); - ASSERT_EQ(clustering_cpu->centroids.size(1), dim); - ASSERT_EQ(clustering_cuvs->centroids.dim(), 2); - ASSERT_EQ(clustering_cuvs->centroids.size(0), num_clusters); - ASSERT_EQ(clustering_cuvs->centroids.size(1), dim); - - // For inner product, check that cuVS centroids are normalized. - auto norms = clustering_cuvs->centroids.norm(2, 1); - for (int i = 0; i < norms.size(0); ++i) { - ASSERT_NEAR(norms[i].item(), 1.0f, 1e-3); +// Test existing CPU kmeans +TEST_F(ClusteringTest, KMeansCPU_L2) { + auto cl = kmeans_cpu(vectors_cpu, ids_cpu, num_clusters, + faiss::METRIC_L2, /*niter=*/10, torch::Tensor()); + ASSERT_EQ(cl->centroids.sizes(), (std::vector{num_clusters, dim})); + int64_t tot=0; + for (int i=0;ivectors[i].size(0), cl->vector_ids[i].size(0)); + tot += cl->vectors[i].size(0); } - - double mse_cpu = compute_mse(clustering_cpu->centroids, clustering_cpu->vectors); - double mse_cuvs = compute_mse(clustering_cuvs->centroids, clustering_cuvs->vectors); - // Allow a bit larger relative difference for inner product. - ASSERT_NEAR(mse_cpu, mse_cuvs, mse_cpu * 0.25); + ASSERT_EQ(tot, num_vectors); } -// Test that cuVS clustering partitions all vectors correctly (L2 metric). -TEST_F(ClusteringTest, CUVSClustering_Partitioning_L2) { - auto result = kmeans_cuvs(vectors_cuda, ids_cuda, num_clusters, faiss::METRIC_L2); - ASSERT_EQ(result->centroids.dim(), 2); - ASSERT_EQ(result->centroids.size(0), num_clusters); - ASSERT_EQ(result->centroids.size(1), dim); - - int64_t total_vectors = 0; - for (int i = 0; i < num_clusters; ++i) { - auto cluster = result->vectors[i]; - ASSERT_EQ(cluster.size(0), result->vector_ids[i].size(0)); - total_vectors += cluster.size(0); +// Compare CPU vs CPU wrapper +TEST_F(ClusteringTest, KMeansWrapper_CPU) { + auto cl = kmeans(vectors_cpu, ids_cpu, num_clusters, + faiss::METRIC_L2, /*niter=*/10, /*use_gpu=*/false, torch::Tensor()); + ASSERT_EQ(cl->centroids.sizes(), (std::vector{num_clusters, dim})); + int64_t tot=0; + for (int i=0;ivectors[i].size(0); } - ASSERT_EQ(total_vectors, num_vectors); + ASSERT_EQ(tot, num_vectors); } - -// Test that cuVS clustering (inner product) produces unit-norm centroids and correctly partitions vectors. -TEST_F(ClusteringTest, CUVSClustering_Partitioning_InnerProduct) { - auto result = kmeans_cuvs(vectors_cuda, ids_cuda, num_clusters, faiss::METRIC_INNER_PRODUCT); - ASSERT_EQ(result->centroids.dim(), 2); - ASSERT_EQ(result->centroids.size(0), num_clusters); - ASSERT_EQ(result->centroids.size(1), dim); - - // Verify centroids are normalized. - auto norms = result->centroids.norm(2, 1); - for (int i = 0; i < norms.size(0); ++i) { - ASSERT_NEAR(norms[i].item(), 1.0f, 1e-3); - } - - int64_t total_vectors = 0; - for (int i = 0; i < num_clusters; ++i) { - auto cluster = result->vectors[i]; - ASSERT_EQ(cluster.size(0), result->vector_ids[i].size(0)); - total_vectors += cluster.size(0); +#ifdef QUAKE_ENABLE_GPU +TEST_F(ClusteringTest, SampleAndPredict_GPU_L2) { + const int sample_size = 1000; + const int niter = 5; + const int gpu_batch_size = 512; + + auto cl = kmeans_cuvs_sample_and_predict( + vectors_cpu, ids_cpu, + num_clusters, + faiss::METRIC_L2, + sample_size, + niter, + gpu_batch_size); + + // centroids must live on CPU and have correct shape + ASSERT_EQ(cl->centroids.device().type(), torch::kCPU); + ASSERT_EQ(cl->centroids.sizes(), (std::vector{num_clusters, dim})); + + // all vectors accounted for + int64_t tot=0; + for (int i=0;ivectors[i]; + ASSERT_EQ(part.device().type(), torch::kCPU); + ASSERT_EQ(part.size(0), cl->vector_ids[i].size(0)); + tot += part.size(0); } - ASSERT_EQ(total_vectors, num_vectors); + ASSERT_EQ(tot, num_vectors); + + // Optional quality check: rough MSE vs CPU run + auto cl_cpu = kmeans_cpu(vectors_cpu, ids_cpu, num_clusters, + faiss::METRIC_L2, niter, torch::Tensor()); + double mse_cpu = compute_mse(cl_cpu->centroids, cl_cpu->vectors); + double mse_gpu = compute_mse(cl->centroids, cl->vectors); + ASSERT_NEAR(mse_cpu, mse_gpu, mse_cpu * 0.30); } -#endif - -TEST_F(ClusteringTest, KMeansCPU_L2) { - // Test CPU-based k-means clustering. - int niter = 10; - auto clustering = kmeans_cpu(vectors_cpu, ids_cpu, num_clusters, faiss::METRIC_L2, niter); - ASSERT_EQ(clustering->centroids.dim(), 2); - ASSERT_EQ(clustering->centroids.size(0), num_clusters); - ASSERT_EQ(clustering->centroids.size(1), dim); - int64_t total_vectors = 0; - for (int i = 0; i < num_clusters; ++i) { - auto cluster = clustering->vectors[i]; - ASSERT_EQ(cluster.size(0), clustering->vector_ids[i].size(0)); - total_vectors += cluster.size(0); - } - ASSERT_EQ(total_vectors, num_vectors); +// Full wrapper test for GPU +TEST_F(ClusteringTest, KMeansWrapper_GPU) { + const int gpu_sample = 2000; + const int gpu_iter = 3; + const int gpu_batch = 1024; + + auto cl = kmeans(vectors_cpu, ids_cpu, num_clusters, + faiss::METRIC_L2, gpu_iter, + /*use_gpu=*/true, + torch::Tensor()); + ASSERT_EQ(cl->centroids.device().type(), torch::kCPU); + ASSERT_EQ(cl->vectors.size(), size_t(num_clusters)); + int64_t tot=0; + for (auto &p : cl->vectors) tot += p.size(0); + ASSERT_EQ(tot, num_vectors); } - -TEST_F(ClusteringTest, KMeansCPU_InnerProduct) { - // Test CPU-based k-means clustering with inner product metric. - int niter = 10; - auto clustering = kmeans_cpu(vectors_cpu, ids_cpu, num_clusters, faiss::METRIC_INNER_PRODUCT, niter); - ASSERT_EQ(clustering->centroids.dim(), 2); - ASSERT_EQ(clustering->centroids.size(0), num_clusters); - ASSERT_EQ(clustering->centroids.size(1), dim); - - int64_t total_vectors = 0; - for (int i = 0; i < num_clusters; ++i) { - auto cluster = clustering->vectors[i]; - ASSERT_EQ(cluster.size(0), clustering->vector_ids[i].size(0)); - total_vectors += cluster.size(0); - } - ASSERT_EQ(total_vectors, num_vectors); -} \ No newline at end of file +#endif // QUAKE_ENABLE_GPU diff --git a/test/cpp/quake_index.cpp b/test/cpp/quake_index.cpp index 242c16df..12dd3bf0 100644 --- a/test/cpp/quake_index.cpp +++ b/test/cpp/quake_index.cpp @@ -257,14 +257,14 @@ TEST(QuakeIndexStressTest, LargeBuildTest) { // Attempt to build an index with a large number of vectors. // Adjust these numbers based on your available memory/compute. int64_t dimension = 128; // Medium-high dimension - int64_t num_vectors = 1e6; // 1 million vectors + int64_t num_vectors = 10e6; // 1 million vectors auto data_vectors = generate_random_data(num_vectors, dimension); auto data_ids = generate_sequential_ids(num_vectors, 0); QuakeIndex index; auto build_params = std::make_shared(); - build_params->nlist = 512; + build_params->nlist = 5120; build_params->metric = "l2"; // Keep the iteration count modest to avoid overly long tests build_params->niter = 5; @@ -288,14 +288,16 @@ TEST(QuakeIndexStressTestGPU, LargeBuildTest) { // Attempt to build an index with a large number of vectors. // Adjust these numbers based on your available memory/compute. int64_t dimension = 128; // Medium-high dimension - int64_t num_vectors = 1e6; // 1 million vectors - auto data_vectors = generate_random_data(num_vectors, dimension); - auto data_ids = generate_sequential_ids(num_vectors, 0); + int64_t num_vectors = 10e6; // 1 million vectors + auto data_vectors = generate_random_data(num_vectors, dimension).contiguous(); + auto data_ids = generate_sequential_ids(num_vectors, 0).contiguous(); QuakeIndex index; + std::cout << "generated\n"; + auto build_params = std::make_shared(); - build_params->nlist = 512; + build_params->nlist = 5120; build_params->metric = "l2"; build_params->use_gpu = true; // Keep the iteration count modest to avoid overly long tests @@ -554,4 +556,4 @@ TEST(QuakeIndexGPUTest, BuildWithGPUTest) { EXPECT_EQ(timing_info->n_vectors, data_vectors.size(0)); EXPECT_EQ(timing_info->d, data_vectors.size(1)); } -#endif \ No newline at end of file +#endif From 62650c942d15f97ab8766b7e1485d4e9fc0f874f Mon Sep 17 00:00:00 2001 From: Jason Date: Thu, 17 Apr 2025 12:24:37 -0500 Subject: [PATCH 11/15] remove logs --- src/cpp/src/clustering.cpp | 28 ++-------------------------- 1 file changed, 2 insertions(+), 26 deletions(-) diff --git a/src/cpp/src/clustering.cpp b/src/cpp/src/clustering.cpp index d5520320..b2a3f15a 100644 --- a/src/cpp/src/clustering.cpp +++ b/src/cpp/src/clustering.cpp @@ -10,7 +10,7 @@ #include "index_partition.h" #include -#ifdef QUAKE_ENABLE_GPU +// #ifdef QUAKE_ENABLE_GPU #include #include // RAFT resources (handle) #include // RAFT device view (make_device_matrix_view, etc.) @@ -24,13 +24,6 @@ shared_ptr kmeans_cuvs_sample_and_predict( int niter, int gpu_batch_size) { - std::cout << "[kmeans] Starting sample_and_predict: N=" - << vectors.size(0) << ", D=" << vectors.size(1) - << ", clusters=" << num_clusters - << ", sample_size=" << sample_size - << ", niter=" << niter - << ", batch_size=" << gpu_batch_size << std::endl; - TORCH_CHECK(vectors.dim() == 2, "vectors must be [N,D]"); TORCH_CHECK(ids.dim() == 1, "ids must be [N]"); int64_t N = vectors.size(0), D = vectors.size(1); @@ -42,17 +35,14 @@ shared_ptr kmeans_cuvs_sample_and_predict( if (metric == faiss::METRIC_INNER_PRODUCT) { auto norms = cpu_pts.norm(2, 1, true); cpu_pts = cpu_pts.div(norms); - std::cout << "[kmeans] Normalized for inner product metric" << std::endl; } // 2) choose a random sample of indices - std::cout << "[kmeans] Sampling " << sample_size << " points" << std::endl; auto perm = torch::randperm(N, torch::kLong); auto samp_idx = perm.slice(0, 0, sample_size); Tensor samp_pts = cpu_pts.index_select(0, samp_idx); // 3) move sample to GPU - std::cout << "[kmeans] Uploading sample to GPU" << std::endl; Tensor samp_gpu = samp_pts.to(torch::kCUDA, /*non_blocking=*/true).contiguous(); // 4) prepare RAFT handle & cuVS params @@ -74,7 +64,6 @@ shared_ptr kmeans_cuvs_sample_and_predict( // 6) run fit on just the sample { - std::cout << "[kmeans] Running cuVS fit on sample" << std::endl; // host scalars float inertia = 0.0f; int actual_iter= 0; @@ -96,12 +85,8 @@ shared_ptr kmeans_cuvs_sample_and_predict( host_inertia, host_iter ); - - std::cout << "[kmeans] Fit complete: inertia=" << inertia - << ", iterations=" << actual_iter << std::endl; } - std::cout << "[kmeans] Beginning predict over all " << N << " points" << std::endl; Tensor all_labels = torch::empty({N}, torch::kLong); auto predict_fn = [&](Tensor batch_cpu, int64_t off) { @@ -138,10 +123,6 @@ shared_ptr kmeans_cuvs_sample_and_predict( host_pred ); - std::cout << "[kmeans] Predict batch off=" << off - << ", bs=" << bs - << ", inertia=" << pred_inertia << std::endl; - // now safe to copy back exactly bs elements all_labels.narrow(0, off, bs) .copy_( @@ -151,11 +132,9 @@ shared_ptr kmeans_cuvs_sample_and_predict( }; // predict the sample slice - std::cout << "[kmeans] Predicting sample slice [0," << sample_size << ")" << std::endl; predict_fn(samp_pts, /*off=*/0); // predict the rest - std::cout << "[kmeans] Predicting remaining chunks" << std::endl; for (int64_t off = 0; off < N; off += gpu_batch_size) { int64_t bs = std::min(gpu_batch_size, N - off); if (off < sample_size) { @@ -172,7 +151,6 @@ shared_ptr kmeans_cuvs_sample_and_predict( predict_fn(rest_chunk, /*off=*/off); } // 8) group on CPU - std::cout << "[kmeans] Grouping on CPU" << std::endl; Tensor sorted_lbl, sorted_idx; std::tie(sorted_lbl, sorted_idx) = torch::sort(all_labels); Tensor sorted_vecs = vectors.index_select(0, sorted_idx); @@ -194,11 +172,9 @@ shared_ptr kmeans_cuvs_sample_and_predict( out->vectors = std::move(cluster_vecs); out->vector_ids = std::move(cluster_ids); - std::cout << "[kmeans] Completed sample_and_predict\n"; return out; } - -#endif +// #endif shared_ptr kmeans_cpu(Tensor vectors, Tensor ids, From c71326616ded0de9efed1f5bea14e028bce24c61 Mon Sep 17 00:00:00 2001 From: Jason Date: Thu, 17 Apr 2025 16:01:09 -0500 Subject: [PATCH 12/15] pass build params directly to kmeans --- src/cpp/include/clustering.h | 16 +-- src/cpp/include/common.h | 8 +- src/cpp/src/clustering.cpp | 174 +++++++++++++++--------------- src/cpp/src/partition_manager.cpp | 6 +- src/cpp/src/quake_index.cpp | 5 +- test/cpp/clustering.cpp | 51 +++++---- 6 files changed, 132 insertions(+), 128 deletions(-) diff --git a/src/cpp/include/clustering.h b/src/cpp/include/clustering.h index 130b5a1d..d83d563b 100644 --- a/src/cpp/include/clustering.h +++ b/src/cpp/include/clustering.h @@ -24,9 +24,7 @@ class IndexPartition; */ shared_ptr kmeans_cpu(Tensor vectors, Tensor ids, - int n_clusters, - MetricType metric_type, - int niter = 5, + shared_ptr build_params, Tensor initial_centroids = Tensor()); /** @@ -42,12 +40,7 @@ shared_ptr kmeans_cpu(Tensor vectors, */ #ifdef QUAKE_ENABLE_GPU shared_ptr kmeans_cuvs_sample_and_predict( - Tensor vectors, Tensor ids, - int num_clusters, - MetricType metric, - int sample_size, - int niter, - int gpu_batch_size); + Tensor vectors, Tensor ids, shared_ptr build_params); #endif /** @@ -63,10 +56,7 @@ shared_ptr kmeans_cuvs_sample_and_predict( */ shared_ptr kmeans(Tensor vectors, Tensor ids, - int n_clusters, - MetricType metric_type, - int niter = 5, - bool use_gpu = false, + shared_ptr build_params, Tensor initial_centroids = Tensor()); diff --git a/src/cpp/include/common.h b/src/cpp/include/common.h index ff95b2a2..b9ac90ed 100644 --- a/src/cpp/include/common.h +++ b/src/cpp/include/common.h @@ -63,6 +63,8 @@ constexpr int DEFAULT_NLIST = 0; ///< Default number of cluste constexpr int DEFAULT_NITER = 5; ///< Default number of k-means iterations used during clustering. constexpr const char* DEFAULT_METRIC = "l2"; ///< Default distance metric (either "l2" for Euclidean or "ip" for inner product). constexpr int DEFAULT_NUM_WORKERS = 0; ///< Default number of workers (0 means single-threaded). +constexpr int DEFAULT_GPU_BATCH_SIZE = 100000; ///< Default batch size for GPU index building. +constexpr int DEFAULT_GPU_SAMPLE_SIZE = 1000000; ///< Default sample size for GPU index building. // Default constants for search parameters constexpr int DEFAULT_K = 1; ///< Default number of neighbors to return. @@ -124,11 +126,15 @@ struct IndexBuildParams { bool use_adaptive_nprobe = false; bool use_numa = false; - bool use_gpu = false; bool verify_numa = false; bool same_core = true; bool verbose = false; + // gpu index build params + bool use_gpu = false; + int gpu_batch_size = DEFAULT_GPU_BATCH_SIZE; + int gpu_sample_size = DEFAULT_GPU_SAMPLE_SIZE; + shared_ptr parent_params = nullptr; IndexBuildParams() = default; diff --git a/src/cpp/src/clustering.cpp b/src/cpp/src/clustering.cpp index b2a3f15a..b5f82682 100644 --- a/src/cpp/src/clustering.cpp +++ b/src/cpp/src/clustering.cpp @@ -10,7 +10,7 @@ #include "index_partition.h" #include -// #ifdef QUAKE_ENABLE_GPU +#ifdef QUAKE_ENABLE_GPU #include #include // RAFT resources (handle) #include // RAFT device view (make_device_matrix_view, etc.) @@ -18,52 +18,54 @@ shared_ptr kmeans_cuvs_sample_and_predict( Tensor vectors, Tensor ids, - int num_clusters, - MetricType metric, - int sample_size, - int niter, - int gpu_batch_size) { - - TORCH_CHECK(vectors.dim() == 2, "vectors must be [N,D]"); - TORCH_CHECK(ids.dim() == 1, "ids must be [N]"); - int64_t N = vectors.size(0), D = vectors.size(1); - TORCH_CHECK(sample_size > 0 && sample_size <= N, + shared_ptr build_params) { + + int num_clusters = build_params->nlist; + int niter = build_params->niter; + int gpu_batch_size = build_params->gpu_batch_size; + int gpu_sample_size = build_params->gpu_sample_size; + MetricType metric = str_to_metric_type(build_params->metric); + + TORCH_CHECK(vectors.dim() == 2, "vectors must be [N,D]"); + TORCH_CHECK(ids.dim() == 1, "ids must be [N]"); + int64_t N = vectors.size(0), D = vectors.size(1); + TORCH_CHECK(gpu_sample_size > 0 && gpu_sample_size <= N, "invalid sample_size"); - // 1) pin + normalize if needed - Tensor cpu_pts = vectors.contiguous().pin_memory(); - if (metric == faiss::METRIC_INNER_PRODUCT) { + // 1) pin + normalize if needed + Tensor cpu_pts = vectors.contiguous().pin_memory(); + if (metric == faiss::METRIC_INNER_PRODUCT) { auto norms = cpu_pts.norm(2, 1, true); cpu_pts = cpu_pts.div(norms); - } + } - // 2) choose a random sample of indices - auto perm = torch::randperm(N, torch::kLong); - auto samp_idx = perm.slice(0, 0, sample_size); - Tensor samp_pts = cpu_pts.index_select(0, samp_idx); + // 2) choose a random sample of indices + auto perm = torch::randperm(N, torch::kLong); + auto samp_idx = perm.slice(0, 0, gpu_sample_size); + Tensor samp_pts = cpu_pts.index_select(0, samp_idx); - // 3) move sample to GPU - Tensor samp_gpu = samp_pts.to(torch::kCUDA, /*non_blocking=*/true).contiguous(); + // 3) move sample to GPU + Tensor samp_gpu = samp_pts.to(torch::kCUDA, /*non_blocking=*/true).contiguous(); - // 4) prepare RAFT handle & cuVS params - raft::resources handle; - cudaStream_t stream = c10::cuda::getCurrentCUDAStream(); - raft::resource::set_cuda_stream(handle, stream); + // 4) prepare RAFT handle & cuVS params + raft::resources handle; + cudaStream_t stream = c10::cuda::getCurrentCUDAStream(); + raft::resource::set_cuda_stream(handle, stream); - cuvs::cluster::kmeans::params params; - params.n_clusters = num_clusters; - params.init = cuvs::cluster::kmeans::params::InitMethod::Random; - params.max_iter = niter; + cuvs::cluster::kmeans::params params; + params.n_clusters = num_clusters; + params.init = cuvs::cluster::kmeans::params::InitMethod::Random; + params.max_iter = niter; - // 5) allocate centroids on GPU - Tensor cent_gpu = torch::empty({num_clusters, D}, + // 5) allocate centroids on GPU + Tensor cent_gpu = torch::empty({num_clusters, D}, torch::TensorOptions() .dtype(torch::kFloat32) .device(torch::kCUDA)) .contiguous(); - // 6) run fit on just the sample - { + // 6) run fit on just the sample + { // host scalars float inertia = 0.0f; int actual_iter= 0; @@ -72,7 +74,7 @@ shared_ptr kmeans_cuvs_sample_and_predict( auto X_view = raft::make_device_matrix_view( samp_gpu.data_ptr(), - (int)sample_size, (int)D); + (int) gpu_sample_size, (int)D); auto C_view = raft::make_device_matrix_view( cent_gpu.data_ptr(), num_clusters, (int)D); @@ -85,11 +87,11 @@ shared_ptr kmeans_cuvs_sample_and_predict( host_inertia, host_iter ); - } + } - Tensor all_labels = torch::empty({N}, torch::kLong); + Tensor all_labels = torch::empty({N}, torch::kLong); - auto predict_fn = [&](Tensor batch_cpu, int64_t off) { + auto predict_fn = [&](Tensor batch_cpu, int64_t off) { int64_t bs = batch_cpu.size(0); // allocate exactly bs labels on the GPU @@ -129,19 +131,19 @@ shared_ptr kmeans_cuvs_sample_and_predict( labels32.to(torch::kLong) .to(torch::kCPU) ); - }; + }; - // predict the sample slice - predict_fn(samp_pts, /*off=*/0); + // predict the sample slice + predict_fn(samp_pts, /*off=*/0); - // predict the rest - for (int64_t off = 0; off < N; off += gpu_batch_size) { + // predict the rest + for (int64_t off = 0; off < N; off += gpu_batch_size) { int64_t bs = std::min(gpu_batch_size, N - off); - if (off < sample_size) { - if (off + bs <= sample_size) { + if (off < gpu_sample_size) { + if (off + bs <= gpu_sample_size) { continue; } else { - int64_t overlap = sample_size - off; + int64_t overlap = gpu_sample_size - off; Tensor rest_chunk = cpu_pts.slice(0, off + overlap, off + bs); predict_fn(rest_chunk, /*off=*/off + overlap); continue; @@ -149,43 +151,43 @@ shared_ptr kmeans_cuvs_sample_and_predict( } Tensor rest_chunk = cpu_pts.slice(0, off, off + bs); predict_fn(rest_chunk, /*off=*/off); - } - // 8) group on CPU - Tensor sorted_lbl, sorted_idx; - std::tie(sorted_lbl, sorted_idx) = torch::sort(all_labels); - Tensor sorted_vecs = vectors.index_select(0, sorted_idx); - Tensor sorted_ids = ids.index_select(0, sorted_idx); - - Tensor counts = torch::bincount(sorted_lbl, /*weights=*/{}, num_clusters); - auto cnt_cpu = counts.to(torch::kCPU); - std::vector split_sizes( + } + // 8) group on CPU + Tensor sorted_lbl, sorted_idx; + std::tie(sorted_lbl, sorted_idx) = torch::sort(all_labels); + Tensor sorted_vecs = vectors.index_select(0, sorted_idx); + Tensor sorted_ids = ids.index_select(0, sorted_idx); + + Tensor counts = torch::bincount(sorted_lbl, /*weights=*/{}, num_clusters); + auto cnt_cpu = counts.to(torch::kCPU); + std::vector split_sizes( cnt_cpu.data_ptr(), cnt_cpu.data_ptr() + num_clusters - ); + ); - auto cluster_vecs = torch::split(sorted_vecs, split_sizes, 0); - auto cluster_ids = torch::split(sorted_ids, split_sizes, 0); + auto cluster_vecs = torch::split(sorted_vecs, split_sizes, 0); + auto cluster_ids = torch::split(sorted_ids, split_sizes, 0); - auto out = std::make_shared(); - out->centroids = cent_gpu.cpu().contiguous(); - out->partition_ids = torch::arange(num_clusters, torch::kLong); - out->vectors = std::move(cluster_vecs); - out->vector_ids = std::move(cluster_ids); + auto out = std::make_shared(); + out->centroids = cent_gpu.cpu().contiguous(); + out->partition_ids = torch::arange(num_clusters, torch::kLong); + out->vectors = std::move(cluster_vecs); + out->vector_ids = std::move(cluster_ids); - return out; + return out; } -// #endif +#endif shared_ptr kmeans_cpu(Tensor vectors, Tensor ids, - int n_clusters, - MetricType metric_type, - int niter, + shared_ptr build_params, Tensor /* initial_centroids */) { // Ensure enough vectors are available and sizes match. - assert(vectors.size(0) >= n_clusters * 2); + assert(vectors.size(0) >= build_params->nlist * 2); assert(vectors.size(0) == ids.size(0)); + MetricType metric_type = str_to_metric_type(build_params->metric); + // Normalize vectors for inner product if (metric_type == faiss::METRIC_INNER_PRODUCT) vectors = vectors / vectors.norm(2, 1).unsqueeze(1); @@ -200,13 +202,13 @@ shared_ptr kmeans_cpu(Tensor vectors, index_ptr = new faiss::IndexFlatL2(d); faiss::ClusteringParameters cp; - cp.niter = niter; + cp.niter = build_params->niter; - faiss::Clustering clus(d, n_clusters, cp); + faiss::Clustering clus(d, build_params->nlist, cp); clus.train(n, vectors.data_ptr(), *index_ptr); // Retrieve centroids as a torch Tensor. - Tensor centroids = torch::from_blob(clus.centroids.data(), {n_clusters, d}, torch::kFloat32).clone(); + Tensor centroids = torch::from_blob(clus.centroids.data(), {build_params->nlist, d}, torch::kFloat32).clone(); if (metric_type == faiss::METRIC_INNER_PRODUCT) centroids = centroids / centroids.norm(2, 1).unsqueeze(1); @@ -223,7 +225,7 @@ shared_ptr kmeans_cpu(Tensor vectors, Tensor sorted_ids = ids.index_select(0, sorted_indices); // Compute counts per cluster using bincount. - Tensor counts_tensor = torch::bincount(sorted_assignments, /*weights=*/{}, n_clusters); + Tensor counts_tensor = torch::bincount(sorted_assignments, /*weights=*/{}, build_params->nlist); // Ensure counts are on CPU to extract split sizes. counts_tensor = counts_tensor.to(torch::kCPU); // Convert counts tensor to std::vector @@ -234,7 +236,7 @@ shared_ptr kmeans_cpu(Tensor vectors, vector cluster_vectors = torch::split(sorted_vectors, counts_vector, 0); vector cluster_ids = torch::split(sorted_ids, counts_vector, 0); - Tensor partition_ids = torch::arange(n_clusters, torch::kInt64); + Tensor partition_ids = torch::arange(build_params->nlist, torch::kInt64); shared_ptr clustering = std::make_shared(); clustering->centroids = centroids; @@ -249,25 +251,23 @@ shared_ptr kmeans_cpu(Tensor vectors, shared_ptr kmeans(Tensor vectors, Tensor ids, - int n_clusters, - MetricType metric_type, - int niter, - bool use_gpu /*=false*/, + shared_ptr build_params, Tensor /* initial_centroids */) { - if (use_gpu) { + if (build_params->use_gpu) { #ifdef QUAKE_ENABLE_GPU - const int sample_size = std::min(1000000, vectors.size(0)); - const int gpu_batch_size = 100000; // or from build_params return kmeans_cuvs_sample_and_predict( - vectors, ids, - n_clusters, metric_type, - sample_size, niter, - gpu_batch_size); - #elif + vectors, + ids, + n_clusters, + metric_type, + build_params->gpu-sample_size, + niter, + build_params->gpu_batch_size); + #else throw std::runtime_error("GPU support is not enabled. Please compile with QUAKE_ENABLE_GPU."); #endif } else { - return kmeans_cpu(vectors, ids, n_clusters, metric_type, niter); + return kmeans_cpu(vectors, ids, build_params); } } diff --git a/src/cpp/src/partition_manager.cpp b/src/cpp/src/partition_manager.cpp index 3b738856..9e7695fd 100644 --- a/src/cpp/src/partition_manager.cpp +++ b/src/cpp/src/partition_manager.cpp @@ -412,11 +412,13 @@ shared_ptr PartitionManager::split_partitions(const Tensor &partitio for (int64_t i = 0; i < partition_ids.size(0); ++i) { // Ensure enough vectors to split assert(clustering->cluster_size(i) >= 4 && "Partition must have at least 8 vectors to split."); + shared_ptr build_params = make_shared(); + build_params->nlist = num_splits; + build_params->metric = parent_->metric_; shared_ptr curr_split_clustering = kmeans( clustering->vectors[i], clustering->vector_ids[i], - num_splits, - parent_->metric_ + build_params ); for (size_t j = 0; j < curr_split_clustering->nlist(); ++j) { diff --git a/src/cpp/src/quake_index.cpp b/src/cpp/src/quake_index.cpp index 1f7cc853..9fb6f5ff 100644 --- a/src/cpp/src/quake_index.cpp +++ b/src/cpp/src/quake_index.cpp @@ -44,10 +44,7 @@ shared_ptr QuakeIndex::build(Tensor x, Tensor ids, shared_ptr clustering = kmeans( x, ids, - build_params_->nlist, - metric_, - build_params_->niter, - build_params_->use_gpu + build_params_ ); auto e1 = std::chrono::high_resolution_clock::now(); timing_info->train_time_us = std::chrono::duration_cast(e1 - s1).count(); diff --git a/test/cpp/clustering.cpp b/test/cpp/clustering.cpp index c94cb44a..bd373ba0 100644 --- a/test/cpp/clustering.cpp +++ b/test/cpp/clustering.cpp @@ -54,8 +54,11 @@ class ClusteringTest : public ::testing::Test { // Test existing CPU kmeans TEST_F(ClusteringTest, KMeansCPU_L2) { - auto cl = kmeans_cpu(vectors_cpu, ids_cpu, num_clusters, - faiss::METRIC_L2, /*niter=*/10, torch::Tensor()); + shared_ptr build_params = std::make_shared(); + build_params->nlist = num_clusters; + build_params->metric = "l2"; + build_params->niter = 10; + auto cl = kmeans_cpu(vectors_cpu, ids_cpu, build_params, torch::Tensor()); ASSERT_EQ(cl->centroids.sizes(), (std::vector{num_clusters, dim})); int64_t tot=0; for (int i=0;i build_params = std::make_shared(); + build_params->nlist = num_clusters; + build_params->metric = "l2"; + build_params->niter = 10; + build_params->use_gpu = false; + auto cl = kmeans(vectors_cpu, ids_cpu, build_params, torch::Tensor()); ASSERT_EQ(cl->centroids.sizes(), (std::vector{num_clusters, dim})); int64_t tot=0; for (int i=0;i build_params = std::make_shared(); + build_params->nlist = num_clusters; + build_params->metric = "l2"; + build_params->niter = 10; + build_params->use_gpu = true; + build_params->gpu_sample_size = 2000; + build_params->gpu_batch_size = 100; auto cl = kmeans_cuvs_sample_and_predict( - vectors_cpu, ids_cpu, - num_clusters, - faiss::METRIC_L2, - sample_size, - niter, - gpu_batch_size); + vectors_cpu, ids_cpu, build_params); // centroids must live on CPU and have correct shape ASSERT_EQ(cl->centroids.device().type(), torch::kCPU); @@ -105,9 +111,10 @@ TEST_F(ClusteringTest, SampleAndPredict_GPU_L2) { } ASSERT_EQ(tot, num_vectors); + build_params->use_gpu = false; + // Optional quality check: rough MSE vs CPU run - auto cl_cpu = kmeans_cpu(vectors_cpu, ids_cpu, num_clusters, - faiss::METRIC_L2, niter, torch::Tensor()); + auto cl_cpu = kmeans_cpu(vectors_cpu, ids_cpu, build_params, torch::Tensor()); double mse_cpu = compute_mse(cl_cpu->centroids, cl_cpu->vectors); double mse_gpu = compute_mse(cl->centroids, cl->vectors); ASSERT_NEAR(mse_cpu, mse_gpu, mse_cpu * 0.30); @@ -115,13 +122,15 @@ TEST_F(ClusteringTest, SampleAndPredict_GPU_L2) { // Full wrapper test for GPU TEST_F(ClusteringTest, KMeansWrapper_GPU) { - const int gpu_sample = 2000; - const int gpu_iter = 3; - const int gpu_batch = 1024; + shared_ptr build_params = std::make_shared(); + build_params->nlist = num_clusters; + build_params->metric = "l2"; + build_params->niter = 10; + build_params->use_gpu = true; + build_params->gpu_sample_size = 2000; + build_params->gpu_batch_size = 100; - auto cl = kmeans(vectors_cpu, ids_cpu, num_clusters, - faiss::METRIC_L2, gpu_iter, - /*use_gpu=*/true, + auto cl = kmeans(vectors_cpu, ids_cpu, build_params, torch::Tensor()); ASSERT_EQ(cl->centroids.device().type(), torch::kCPU); ASSERT_EQ(cl->vectors.size(), size_t(num_clusters)); From 65eb128c60f8f82595e11100950b7c014b16a4ce Mon Sep 17 00:00:00 2001 From: Jason Date: Fri, 18 Apr 2025 13:21:54 -0500 Subject: [PATCH 13/15] update conda --- environments/ubuntu-cuda/conda.yaml | 4 ++++ src/cpp/src/partition_manager.cpp | 6 +++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/environments/ubuntu-cuda/conda.yaml b/environments/ubuntu-cuda/conda.yaml index 715744f2..99265b48 100644 --- a/environments/ubuntu-cuda/conda.yaml +++ b/environments/ubuntu-cuda/conda.yaml @@ -3,6 +3,8 @@ channels: - pytorch - defaults - conda-forge + - nvidia + - libcuvs dependencies: - python=3.11 - numpy @@ -10,6 +12,8 @@ dependencies: - faiss-gpu - matplotlib - pytest + - libcuvs + - cuda-version=12.8 - pip - pip: - sphinx diff --git a/src/cpp/src/partition_manager.cpp b/src/cpp/src/partition_manager.cpp index 9e7695fd..c838541b 100644 --- a/src/cpp/src/partition_manager.cpp +++ b/src/cpp/src/partition_manager.cpp @@ -409,12 +409,12 @@ shared_ptr PartitionManager::split_partitions(const Tensor &partitio shared_ptr clustering = select_partitions(partition_ids); + shared_ptr build_params = make_shared(); + build_params->nlist = num_splits; + build_params->metric = metric_type_to_str(parent_->metric_); for (int64_t i = 0; i < partition_ids.size(0); ++i) { // Ensure enough vectors to split assert(clustering->cluster_size(i) >= 4 && "Partition must have at least 8 vectors to split."); - shared_ptr build_params = make_shared(); - build_params->nlist = num_splits; - build_params->metric = parent_->metric_; shared_ptr curr_split_clustering = kmeans( clustering->vectors[i], clustering->vector_ids[i], From d10cd64918db57164438f54c5e27ff5a0d9e3890 Mon Sep 17 00:00:00 2001 From: Jason Mohoney Date: Fri, 18 Apr 2025 20:12:47 +0000 Subject: [PATCH 14/15] fix gpu clustering --- src/cpp/src/clustering.cpp | 11 ++++++----- test/cpp/quake_index.cpp | 12 ++++++------ 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/cpp/src/clustering.cpp b/src/cpp/src/clustering.cpp index b5f82682..903996ea 100644 --- a/src/cpp/src/clustering.cpp +++ b/src/cpp/src/clustering.cpp @@ -26,9 +26,14 @@ shared_ptr kmeans_cuvs_sample_and_predict( int gpu_sample_size = build_params->gpu_sample_size; MetricType metric = str_to_metric_type(build_params->metric); + TORCH_CHECK(vectors.dim() == 2, "vectors must be [N,D]"); TORCH_CHECK(ids.dim() == 1, "ids must be [N]"); int64_t N = vectors.size(0), D = vectors.size(1); + + gpu_sample_size = std::min(gpu_sample_size, (int) N); + gpu_batch_size = std::min(gpu_batch_size, (int) N); + TORCH_CHECK(gpu_sample_size > 0 && gpu_sample_size <= N, "invalid sample_size"); @@ -258,11 +263,7 @@ shared_ptr kmeans(Tensor vectors, return kmeans_cuvs_sample_and_predict( vectors, ids, - n_clusters, - metric_type, - build_params->gpu-sample_size, - niter, - build_params->gpu_batch_size); + build_params); #else throw std::runtime_error("GPU support is not enabled. Please compile with QUAKE_ENABLE_GPU."); #endif diff --git a/test/cpp/quake_index.cpp b/test/cpp/quake_index.cpp index 12dd3bf0..b8f0199c 100644 --- a/test/cpp/quake_index.cpp +++ b/test/cpp/quake_index.cpp @@ -257,14 +257,14 @@ TEST(QuakeIndexStressTest, LargeBuildTest) { // Attempt to build an index with a large number of vectors. // Adjust these numbers based on your available memory/compute. int64_t dimension = 128; // Medium-high dimension - int64_t num_vectors = 10e6; // 1 million vectors + int64_t num_vectors = 1e6; // 1 million vectors auto data_vectors = generate_random_data(num_vectors, dimension); auto data_ids = generate_sequential_ids(num_vectors, 0); QuakeIndex index; auto build_params = std::make_shared(); - build_params->nlist = 5120; + build_params->nlist = 512; build_params->metric = "l2"; // Keep the iteration count modest to avoid overly long tests build_params->niter = 5; @@ -288,7 +288,7 @@ TEST(QuakeIndexStressTestGPU, LargeBuildTest) { // Attempt to build an index with a large number of vectors. // Adjust these numbers based on your available memory/compute. int64_t dimension = 128; // Medium-high dimension - int64_t num_vectors = 10e6; // 1 million vectors + int64_t num_vectors = 1e6; // 1 million vectors auto data_vectors = generate_random_data(num_vectors, dimension).contiguous(); auto data_ids = generate_sequential_ids(num_vectors, 0).contiguous(); @@ -297,7 +297,7 @@ TEST(QuakeIndexStressTestGPU, LargeBuildTest) { std::cout << "generated\n"; auto build_params = std::make_shared(); - build_params->nlist = 5120; + build_params->nlist = 512; build_params->metric = "l2"; build_params->use_gpu = true; // Keep the iteration count modest to avoid overly long tests @@ -533,8 +533,8 @@ TEST(QuakeIndexStressTest, SearchAddRemoveMaintenanceTest) { // Test build with GPU enabled TEST(QuakeIndexGPUTest, BuildWithGPUTest) { int64_t dimension = 32; - int64_t num_vectors = 200; - int64_t nlist = 5; + int64_t num_vectors = 10000; + int64_t nlist = 10; torch::Tensor data_vectors = generate_random_data(num_vectors, dimension); torch::Tensor data_ids = generate_sequential_ids(num_vectors, 0); From b8b714e2cf17f30a4f245b1c0808c22f38705646 Mon Sep 17 00:00:00 2001 From: Kefan Zheng Date: Sun, 20 Apr 2025 18:22:33 -0500 Subject: [PATCH 15/15] adapt cuvs to cuda 12.4 (#61) * adapt cuvs to cuda 12.4 * separate compilation scripts for cuda118 and cuda124 --- .../Dockerfile | 0 .../conda.yaml | 0 environments/ubuntu-cuda124/Dockerfile | 64 +++++++++++++++++++ environments/ubuntu-cuda124/conda.yaml | 24 +++++++ 4 files changed, 88 insertions(+) rename environments/{ubuntu-cuda => ubuntu-cuda118}/Dockerfile (100%) rename environments/{ubuntu-cuda => ubuntu-cuda118}/conda.yaml (100%) create mode 100644 environments/ubuntu-cuda124/Dockerfile create mode 100644 environments/ubuntu-cuda124/conda.yaml diff --git a/environments/ubuntu-cuda/Dockerfile b/environments/ubuntu-cuda118/Dockerfile similarity index 100% rename from environments/ubuntu-cuda/Dockerfile rename to environments/ubuntu-cuda118/Dockerfile diff --git a/environments/ubuntu-cuda/conda.yaml b/environments/ubuntu-cuda118/conda.yaml similarity index 100% rename from environments/ubuntu-cuda/conda.yaml rename to environments/ubuntu-cuda118/conda.yaml diff --git a/environments/ubuntu-cuda124/Dockerfile b/environments/ubuntu-cuda124/Dockerfile new file mode 100644 index 00000000..0462c5c9 --- /dev/null +++ b/environments/ubuntu-cuda124/Dockerfile @@ -0,0 +1,64 @@ +# Use a CUDA-enabled Ubuntu base image +FROM nvidia/cuda:12.4.1-cudnn-devel-ubuntu22.04 + +# ----------------------------- +# Set up environment variables +# ----------------------------- +ENV CONDA_DIR=/opt/miniconda +ENV PATH="${CONDA_DIR}/bin:${PATH}" +ENV DEBIAN_FRONTEND=noninteractive + +# ----------------------------- +# Install system dependencies +# ----------------------------- +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + wget \ + curl \ + build-essential \ + ca-certificates \ + swig \ + git \ + libomp5 \ + libomp-dev \ + graphviz \ + libnuma-dev \ + && rm -rf /var/lib/apt/lists/* + + +# Install CMake 3.24.2 +RUN wget -qO /tmp/cmake.sh https://github.com/Kitware/CMake/releases/download/v3.30.4/cmake-3.30.4-linux-x86_64.sh && \ + chmod +x /tmp/cmake.sh && \ + /tmp/cmake.sh --skip-license --prefix=/usr/local && \ + rm /tmp/cmake.sh + + +# ----------------------------- +# Install Miniconda +# ----------------------------- +RUN wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O /tmp/miniconda.sh && \ + bash /tmp/miniconda.sh -b -p $CONDA_DIR && \ + rm /tmp/miniconda.sh + +# ----------------------------- +# Copy in your conda environment YAML +# ----------------------------- +COPY environments/ubuntu-cuda/conda.yaml /tmp/conda.yaml + +# Create quake-env +RUN conda env create -f /tmp/conda.yaml && conda clean -afy + +# ----------------------------- +# Install GPU-enabled PyTorch +# ----------------------------- +RUN conda run -n quake-env pip install --no-cache-dir torch --index-url https://download.pytorch.org/whl/cu124 + +# ----------------------------- +# Debug: show conda information +# ----------------------------- +RUN echo "===== DEBUG: which conda =====" && which conda +RUN echo "===== DEBUG: conda info =====" && conda info +RUN echo "===== DEBUG: conda env list =====" && conda env list +RUN echo "===== DEBUG: quake-env check =====" && conda run -n quake-env python -c "import sys; print('OK in quake-env; python:', sys.executable)" + +CMD ["/bin/bash"] \ No newline at end of file diff --git a/environments/ubuntu-cuda124/conda.yaml b/environments/ubuntu-cuda124/conda.yaml new file mode 100644 index 00000000..f895341b --- /dev/null +++ b/environments/ubuntu-cuda124/conda.yaml @@ -0,0 +1,24 @@ +name: quake-env +channels: + - rapidsai + - pytorch + - defaults + - conda-forge + - nvidia +dependencies: + - python=3.11 + - numpy + - pandas + - faiss-gpu + - matplotlib + - pytest + - libcuvs + - cuda-nvrtc-dev=12.4 + - cuda-version=12.4 + - pip + - pip: + - sphinx + - sphinx_rtd_theme + - sphinxcontrib-mermaid + - graphviz + - pyyaml \ No newline at end of file