diff --git a/src/cpp/bindings/wrap.cpp b/src/cpp/bindings/wrap.cpp index 8a987a94..6cc06f20 100644 --- a/src/cpp/bindings/wrap.cpp +++ b/src/cpp/bindings/wrap.cpp @@ -170,6 +170,8 @@ PYBIND11_MODULE(_bindings, m) { (std::string("Threshold to trigger recomputation of APS. default = ") + std::to_string(DEFAULT_RECOMPUTE_THRESHOLD)).c_str()) .def_readwrite("aps_flush_period_us", &SearchParams::aps_flush_period_us, (std::string("APS flush period in microseconds. default = ") + std::to_string(DEFAULT_APS_FLUSH_PERIOD_US)).c_str()) + .def_readwrite("buffer_size", &SearchParams::buffer_size, + (std::string("Buffer size in number of partitons. default") + std::to_string(1)).c_str()) .def("__repr__", [](const SearchParams &s) { std::ostringstream oss; oss << "{"; @@ -181,6 +183,7 @@ PYBIND11_MODULE(_bindings, m) { oss << "\"initial_search_fraction\": " << s.initial_search_fraction << ", "; oss << "\"recompute_threshold\": " << s.recompute_threshold << ", "; oss << "\"aps_flush_period_us\": " << s.aps_flush_period_us; + oss << "\"buffer_size\": " << s.buffer_size; oss << "}"; return oss.str(); }); diff --git a/src/cpp/include/buffer_manager.h b/src/cpp/include/buffer_manager.h new file mode 100644 index 00000000..8205a669 --- /dev/null +++ b/src/cpp/include/buffer_manager.h @@ -0,0 +1,40 @@ +#ifndef BUFFER_MANAGER_H +#define BUFFER_MANAGER_H + +#include +#include +#include +#include +#include + +// decide who to evict +// virtual class Policy { +// public: +// std::queue q; // who to evict +// std::vector findVictims(); +// void insert(int pid); +// private: +// void remove(int pid); // called by findVictim +// } + +// actually putting and removing things in buffer +class BufferManager { + public: + std::unordered_set u; // who is in the buffer (needs a better name - Atharva) + shared_ptr policy; + int bufSize; // number of partitions in the memory + int curSize; + bool debug_ = true; + + + BufferManager(); + ~BufferManager(); + + void put(int pid, shared_ptr fip, shared_ptr partition_manager_); // load the vectors and ids of a partition from disk + void flush(int pid, shared_ptr fip); // flush the vectors and ids of a partition to disk, while still keeping in memory + + private: + void evict(); // evict a partition based on the eviction policy, this will be called by putBuf if the buffer is full +}; + +#endif \ No newline at end of file diff --git a/src/cpp/include/common.h b/src/cpp/include/common.h index efdbfa53..78ae70ee 100644 --- a/src/cpp/include/common.h +++ b/src/cpp/include/common.h @@ -184,6 +184,7 @@ struct SearchParams { float recompute_threshold = DEFAULT_RECOMPUTE_THRESHOLD; float initial_search_fraction = DEFAULT_INITIAL_SEARCH_FRACTION; int aps_flush_period_us = DEFAULT_APS_FLUSH_PERIOD_US; + int buffer_size = 1; SearchParams() = default; }; diff --git a/src/cpp/include/dynamic_inverted_list.h b/src/cpp/include/dynamic_inverted_list.h index f38974e2..ebb18bfe 100644 --- a/src/cpp/include/dynamic_inverted_list.h +++ b/src/cpp/include/dynamic_inverted_list.h @@ -13,6 +13,7 @@ #include #include #include +#include namespace faiss { /** @@ -196,6 +197,15 @@ namespace faiss { */ void add_list(size_t list_no); + + /** + * @brief Add a new, empty disk-based partition (need to decide if we create a file for it also here) + * + * @param list_no The partition number to add. + * @throws std::runtime_error if the partition already exists. + */ + void add_list_file(size_t list_no); + /** * @brief Check if a given ID exists in a partition. * diff --git a/src/cpp/include/fifo_policy.h b/src/cpp/include/fifo_policy.h new file mode 100644 index 00000000..eba461f7 --- /dev/null +++ b/src/cpp/include/fifo_policy.h @@ -0,0 +1,22 @@ +#ifndef FIFO_POLICY_H +#define FIFO_POLICY_H + +#include +#include + +class FIFOPolicy : public Policy { + public: + std::queue fifo_q; // who to evict + bool debug_ = true; + + + FIFOPolicy(); + ~FIFOPolicy(); + std::vector findVictims(); + void insert(int pid); + + private: + void remove(int pid); // called by findVictim +}; + +#endif \ No newline at end of file diff --git a/src/cpp/include/file_index_partition.h b/src/cpp/include/file_index_partition.h new file mode 100644 index 00000000..648f8044 --- /dev/null +++ b/src/cpp/include/file_index_partition.h @@ -0,0 +1,88 @@ +#ifndef FILE_INDEX_PARTITION_H +#define FILE_INDEX_PARTITION_H + +#include +#include + + +class FileIndexPartition : public IndexPartition { +public: + std::string file_path_; + int fd_ = -1; // file descriptor + bool is_in_memory = false; // indicate whether the partition is in memory + bool is_dirty = false; // indicate whether the partition is dirty (changes haven't been synced to disk) + std::mutex ref_mutex; + int ref_cnt = 0; + + /// Default constructor. + FileIndexPartition() = default; + + /** + * @brief Parameterized constructor. + * + * Initializes the partition with a given number of vectors and copies in the provided codes and IDs. + * + * @param num_vectors The initial number of vectors. + * @param codes Pointer to the buffer holding the encoded vectors. + * @param ids Pointer to the vector IDs. + * @param code_size Size of each code in bytes. + */ + FileIndexPartition(int64_t num_vectors, + uint8_t* codes, + idx_t* ids, + int64_t code_size); + + /** + * @brief Move constructor. + * + * Transfers the contents from another partition into this one. + * + * @param other The partition to move from. + */ + FileIndexPartition(FileIndexPartition&& other) noexcept; + + /** + * @brief Move assignment operator. + * + * Transfers the contents from another partition into this one, clearing existing data. + * + * @param other The partition to move from. + * @return Reference to this partition. + */ + FileIndexPartition& operator=(FileIndexPartition&& other) noexcept; + + /// Destructor. Frees all allocated memory. + ~FileIndexPartition(); + + void append(int64_t n_entry, const idx_t* new_ids, const uint8_t* new_codes); + void update(int64_t offset, int64_t n_entry, const idx_t* new_ids, const uint8_t* new_codes); + void remove(int64_t index); + void resize(int64_t new_capacity); + void clear(); + int64_t find_id(idx_t id) const; + void reallocate(int64_t new_capacity); + + + + // disk specific method + void load(); // load the partition to memory from disk + void save(); // store the vectors on disk + void set_file_path(std::string file_path); + + + +#ifdef QUAKE_USE_NUMA + /** + * @brief Set the NUMA node for the partition. + * + * Moves the memory to the specified NUMA node if necessary. + * + * @param new_numa_node The target NUMA node. + */ + void set_numa_node(int new_numa_node); +#endif + + +}; + +#endif // FILE_INDEX_PARTITION_H \ No newline at end of file diff --git a/src/cpp/include/index_partition.h b/src/cpp/include/index_partition.h index 134282ec..9e2f7bd3 100644 --- a/src/cpp/include/index_partition.h +++ b/src/cpp/include/index_partition.h @@ -66,7 +66,7 @@ class IndexPartition { IndexPartition& operator=(IndexPartition&& other) noexcept; /// Destructor. Frees all allocated memory. - ~IndexPartition(); + virtual ~IndexPartition(); /** * @brief Set the code size. @@ -144,6 +144,8 @@ class IndexPartition { * @param new_capacity The new capacity (number of vectors). */ void reallocate_memory(int64_t new_capacity); + + void free_memory(); #ifdef QUAKE_USE_NUMA /** @@ -171,7 +173,7 @@ class IndexPartition { * * Releases the codes and IDs buffers. */ - void free_memory(); + // void free_memory(); /** * @brief Ensure capacity. @@ -181,7 +183,7 @@ class IndexPartition { * * @param required The minimum required number of vectors. */ - void ensure_capacity(int64_t required); + // void ensure_capacity(int64_t required); /** * @brief Allocate memory for a given type. @@ -195,5 +197,17 @@ class IndexPartition { */ template T* allocate_memory(size_t num_elements, int numa_node); + +protected: + /** + * @brief Ensure capacity. + * + * Checks that the internal buffer can hold at least the required number of vectors, + * and resizes if necessary. + * + * @param required The minimum required number of vectors. + */ + void ensure_capacity(int64_t required); + }; #endif //INDEX_PARTITION_H diff --git a/src/cpp/include/lru_policy.h b/src/cpp/include/lru_policy.h new file mode 100644 index 00000000..446ef1c7 --- /dev/null +++ b/src/cpp/include/lru_policy.h @@ -0,0 +1,26 @@ +#ifndef LRU_POLICY_H +#define LRU_POLICY_H + +#include +#include +#include +#include +#include + +class LRUPolicy : public Policy { +public: + LRUPolicy(); + ~LRUPolicy(); + + std::vector findVictims() override; + void insert(int pid) override; + +private: + void remove(int pid) override; + + std::list lru_list_; // most recent at back, least recent at front + std::unordered_map::iterator> lru_map_; + bool debug_ = true; +}; + +#endif // LRU_POLICY_H diff --git a/src/cpp/include/partition_manager.h b/src/cpp/include/partition_manager.h index 3c0da1e7..0c6a4019 100644 --- a/src/cpp/include/partition_manager.h +++ b/src/cpp/include/partition_manager.h @@ -27,7 +27,7 @@ class PartitionManager { std::shared_ptr partitions_ = nullptr; ///< Pointer to the inverted lists. int64_t curr_partition_id_ = 0; ///< Current partition ID. - bool debug_ = false; ///< If true, print debug information. + bool debug_ = true; ///< If true, print debug information. bool check_uniques_ = false; ///< If true, check that vector IDs are unique and don't already exist in the index. std::set resident_ids_; ///< Set of partition IDs. diff --git a/src/cpp/include/policy.h b/src/cpp/include/policy.h new file mode 100644 index 00000000..284c37cc --- /dev/null +++ b/src/cpp/include/policy.h @@ -0,0 +1,15 @@ +#ifndef POLICY_H +#define POLICY_H + +#include + +class Policy { + public: + virtual std::vector findVictims() = 0; + virtual void insert(int pid) = 0; + virtual ~Policy() {}; + private: + virtual void remove(int pid) = 0; // called by findVictim +}; + +#endif \ No newline at end of file diff --git a/src/cpp/include/quake_index.h b/src/cpp/include/quake_index.h index c0cccc21..eae8bd0a 100644 --- a/src/cpp/include/quake_index.h +++ b/src/cpp/include/quake_index.h @@ -27,7 +27,7 @@ class QuakeIndex { shared_ptr maintenance_policy_params_; ///< Parameters for the maintenance policy. int current_level_ = 0; ///< Current level of the index. - bool debug_ = false; ///< If true, print debug information. + bool debug_ = true; ///< If true, print debug information. /** * @brief Constructor for QuakeIndex. diff --git a/src/cpp/include/query_coordinator.h b/src/cpp/include/query_coordinator.h index ebe6ee02..dd82fe12 100644 --- a/src/cpp/include/query_coordinator.h +++ b/src/cpp/include/query_coordinator.h @@ -11,6 +11,7 @@ #include #include #include +#include class QuakeIndex; class PartitionManager; @@ -30,6 +31,7 @@ class QueryCoordinator { shared_ptr partition_manager_; shared_ptr maintenance_policy_; shared_ptr parent_; + shared_ptr buffer; MetricType metric_; vector worker_threads_; @@ -45,7 +47,7 @@ class QueryCoordinator { std::mutex result_mutex_; std::atomic stop_workers_; - bool debug_ = false; + bool debug_ = true; QueryCoordinator(shared_ptr parent, shared_ptr partition_manager, diff --git a/src/cpp/src/buffer_manager.cpp b/src/cpp/src/buffer_manager.cpp new file mode 100644 index 00000000..36608eb2 --- /dev/null +++ b/src/cpp/src/buffer_manager.cpp @@ -0,0 +1,60 @@ +#include +#include +#include +#include + +// for destructor, flush all pages + + +BufferManager::BufferManager() { + std::cout << "Constructing buffer manager" << std::endl; + bufSize = 1; + policy = make_shared(); + curSize = 0; +} + +BufferManager::~BufferManager() +{ + std::cout << "Flush contents? " << std::endl; +} + +void BufferManager::put(int pid, shared_ptr fip, shared_ptr partition_manager_) { + std::cout << "[BufferManager::put] Called with partition id: " << pid << std::endl; + if(curSize == bufSize) { // Buffer is full + auto victims = policy->findVictims(); + for(auto victim_pid : victims) { + if(debug_) { + std::cout << "Evicting partition ID: " << victim_pid << std::endl; + } + u.erase(victim_pid); + shared_ptr victim_fip = nullptr; + auto it = partition_manager_->partitions_->partitions_.find(victim_pid); + if (it == partition_manager_->partitions_->partitions_.end()) { + throw std::runtime_error("victim pid does not exist"); + } + victim_fip = std::dynamic_pointer_cast(partition_manager_->partitions_->partitions_[victim_pid]); + if(victim_fip) victim_fip->save(); + } + } else { + curSize ++; // Probably need locks around all these variables + } + policy->insert(pid); + std::cout << "Inserted partion ID: " << pid << " into the buffer" << std::endl; + u.insert(pid); + fip->load(); + if(debug_) { + std::cout << "Curent buffer state (not ordered): { "; + for (const int& element : u) { + std::cout << element << " "; + } + std::cout << " }" << std::endl; + } +} + +void BufferManager::flush(int pid, shared_ptr fip) { + std::cout << "[BufferManager] flush implementation goes here" << std::endl; +} + +void BufferManager::evict() { + std::cout << "[BufferManager] evict implementation goes here" << std::endl; +} \ No newline at end of file diff --git a/src/cpp/src/dynamic_inverted_list.cpp b/src/cpp/src/dynamic_inverted_list.cpp index a8b070fd..15d6c851 100644 --- a/src/cpp/src/dynamic_inverted_list.cpp +++ b/src/cpp/src/dynamic_inverted_list.cpp @@ -168,7 +168,12 @@ namespace faiss { part->set_code_size(static_cast(code_size)); } - part->append((int64_t) n_entry, ids, codes); + if (auto file_part = std::dynamic_pointer_cast(part)) { + file_part->append((int64_t) n_entry, ids, codes); + } else { + part->append((int64_t) n_entry, ids, codes); + } + return n_entry; } @@ -280,6 +285,35 @@ namespace faiss { nlist++; } + void DynamicInvertedLists::add_list_file(size_t list_no) { + if (partitions_.find(list_no) != partitions_.end()) { + throw std::runtime_error("List already exists in add_list"); + } + shared_ptr ip = std::make_shared(); + ip->set_code_size((int64_t) code_size); + partitions_[list_no] = ip; + + // create /data/quake_vectors directory if not exists + namespace fs = std::filesystem; + std::string dir_path = "data/quake_vectors"; + if (!fs::exists(dir_path)) { + fs::create_directories(dir_path); + } + + // create an empty file + std::ostringstream oss; + oss << "data/quake_vectors/p_" << list_no << ".qvecs"; + std::string filename = oss.str(); + ip->set_file_path(filename); + std::ofstream ofs(filename, std::ios::binary); + if (!ofs) { + throw std::runtime_error("Failed to create file: " + filename); + } + ofs.close(); + + nlist++; + } + bool DynamicInvertedLists::id_in_list(size_t list_no, idx_t id) const { auto it = partitions_.find(list_no); if (it == partitions_.end()) { diff --git a/src/cpp/src/fifo_policy.cpp b/src/cpp/src/fifo_policy.cpp new file mode 100644 index 00000000..7a3c896e --- /dev/null +++ b/src/cpp/src/fifo_policy.cpp @@ -0,0 +1,24 @@ +#include "fifo_policy.h" +#include + +FIFOPolicy::FIFOPolicy() { + std::cout << "FIFO Policy constructor" << std::endl; +}; + +std::vector FIFOPolicy::findVictims() { + auto q_front = fifo_q.front(); + fifo_q.pop(); + return std::vector(1, q_front); +} + +void FIFOPolicy::insert(int pid) { + fifo_q.push(pid); +} + +FIFOPolicy::~FIFOPolicy() { +} + +void FIFOPolicy::remove(int pid) +{ + std::cout << "Remove implementation goes here" << std::endl; +} \ No newline at end of file diff --git a/src/cpp/src/file_index_partition.cpp b/src/cpp/src/file_index_partition.cpp new file mode 100644 index 00000000..41c41f07 --- /dev/null +++ b/src/cpp/src/file_index_partition.cpp @@ -0,0 +1,185 @@ +#include "file_index_partition.h" +#include + +// Parameterized constructor +FileIndexPartition::FileIndexPartition(int64_t num_vectors, + uint8_t* codes, + idx_t* ids, + int64_t code_size) { + // Implementation here +} + +// Move constructor +FileIndexPartition::FileIndexPartition(FileIndexPartition&& other) noexcept { + // Implementation here +} + +// Move assignment operator +FileIndexPartition& FileIndexPartition::operator=(FileIndexPartition&& other) noexcept { + // Implementation here + return *this; +} + +// Destructor +FileIndexPartition::~FileIndexPartition() { + // munmap stuff needs to happen here? +} + +void FileIndexPartition::append(int64_t n_entry, const idx_t* new_ids, const uint8_t* new_codes) { + // std::cout << "Appending to FileIndexPartition file (" << file_path_ << ") goes here" << std::endl; + if (n_entry <= 0) return; + + std::ofstream ofs(file_path_, std::ios::binary | std::ios::app); // append mode + if (!ofs) { + throw std::runtime_error("Failed to open file for appending: " + file_path_); + } + + const size_t code_bytes = static_cast(code_size_); + for (int64_t i = 0; i < n_entry; ++i) { + ofs.write(reinterpret_cast(new_codes + i * code_bytes), code_bytes); + ofs.write(reinterpret_cast(&new_ids[i]), sizeof(idx_t)); + } + num_vectors_ += n_entry; + + ofs.close(); +} + +void FileIndexPartition::update(int64_t offset, int64_t n_entry, const idx_t* new_ids, const uint8_t* new_codes) { + // Implementation here +} + +void FileIndexPartition::remove(int64_t index) { + // Implementation here + // if (index < 0 || index >= num_vectors_) { + // throw std::runtime_error("Index out of range in remove"); + // } + // if (index == num_vectors_ - 1) { + // num_vectors_--; + // return; + // } + + // int64_t last_idx = num_vectors_ - 1; + // const size_t code_bytes = static_cast(code_size_); + + // if (is_in_memory) { + // std::lock_guard lock(ref_mutex); + // std::cout << "[File_index_partition] remove : Removing index " << index << " of partition ID (in memory) " << file_path_ << std::endl; + // std::memcpy(codes_ + index * code_bytes, codes_ + last_idx * code_bytes, code_bytes); + // ids_[index] = ids_[last_idx]; + // is_dirty = true; + // num_vectors_--; + // } + // else { + // std::cout << "[File_index_partition] remove : Removing index " << index << " of file path " << file_path_ << std::endl; + // std::fstream file(file_path_, std::ios::in | std::ios::out | std::ios::binary); + // if (!file) { + // throw std::runtime_error("Failed to open file for remove: " + file_path_); + // } + + // std::vector last_code(code_bytes); + // idx_t last_id; + + // // seek the last entry + // file.seekg(last_idx * (code_bytes + sizeof(idx_t)), std::ios::beg); + // file.read(reinterpret_cast(last_code.data()), code_bytes); + // file.read(reinterpret_cast(&last_id), sizeof(idx_t)); + + // // seek the deleted entry and replace with the last entry + // file.seekp(index * (code_bytes + sizeof(idx_t)), std::ios::beg); + // file.write(reinterpret_cast(last_code.data()), code_bytes); + // file.write(reinterpret_cast(&last_id), sizeof(idx_t)); + + // file.close(); + // num_vectors_--; + // } + +} + +void FileIndexPartition::resize(int64_t new_capacity) { + // Implementation here +} + +void FileIndexPartition::clear() { + // Implementation here +} + +int64_t FileIndexPartition::find_id(idx_t id) const { + // Implementation here + return -1; // Placeholder return +} + +void FileIndexPartition::reallocate(int64_t new_capacity) { + // Implementation here +} + +// for testing +void FileIndexPartition::load() { + // std::cout << "[FileIndexPartition] load" << std::endl; + std::ifstream in(file_path_, std::ios::binary); + if (!in) { + throw std::runtime_error("Unable to open file for reading: " + file_path_); + } + + std::lock_guard lock(ref_mutex); + + ref_cnt ++; + if (is_in_memory) return; + + ensure_capacity(num_vectors_); // allocate memory for codes_ and ids_ + + for (int64_t i = 0; i < num_vectors_; ++i) { + in.read(reinterpret_cast(codes_ + i * code_size_), code_size_); + in.read(reinterpret_cast(ids_ + i), sizeof(idx_t)); + } + + in.close(); + is_in_memory = true; +} + +// decrement the reference bit, if its zero called the buffer manager to flush the file +// the buffer manager should loop over the buffer pool and evict all buffers that belongs to the file +void FileIndexPartition::save() { + std::lock_guard lock(ref_mutex); + ref_cnt --; + if (ref_cnt == 0) { + buffer_size_ = 0; + free_memory(); // for now, we don't have a dedicated buffer pool so just free the memory + is_in_memory = false; + } + + if (is_dirty) { + std::ofstream out(file_path_, std::ios::binary); // if num_vectors_ is maintained correctly, no need to clear all vectors before writing back + if (!out) { + throw std::runtime_error("Unable to open file for writing"); + } + + const size_t code_bytes = static_cast(code_size_); + for (int64_t i = 0; i < num_vectors_; ++i) { + out.write(reinterpret_cast(codes_ + i * code_bytes), code_bytes); + out.write(reinterpret_cast(&ids_[i]), sizeof(idx_t)); + } + + out.close(); + } +} + +void FileIndexPartition::set_file_path(std::string file_path) { + file_path_ = file_path; +} + +// void FileIndexPartition::free_memory() { +// if (codes_ == nullptr && ids_ == nullptr) { +// return; +// } +// std::free(codes_); +// std::free(ids_); + +// codes_ = nullptr; +// ids_ = nullptr; +// } + +#ifdef QUAKE_USE_NUMA +void FileIndexPartition::set_numa_node(int new_numa_node) { + // Implementation here +} +#endif diff --git a/src/cpp/src/index_partition.cpp b/src/cpp/src/index_partition.cpp index 3a767f5f..d8993494 100644 --- a/src/cpp/src/index_partition.cpp +++ b/src/cpp/src/index_partition.cpp @@ -174,6 +174,7 @@ void IndexPartition::move_from(IndexPartition&& other) { } void IndexPartition::free_memory() { + // std::cout << "Entered free_memory() (parent class)" << std::endl; if (codes_ == nullptr && ids_ == nullptr) { return; } @@ -187,6 +188,7 @@ void IndexPartition::free_memory() { numa_free(ids_, buffer_size_ * sizeof(idx_t)); } #else + // std::cout << "About to free codes_ and ids_ (parent class)" << std::endl; std::free(codes_); std::free(ids_); #endif diff --git a/src/cpp/src/lru_policy.cpp b/src/cpp/src/lru_policy.cpp new file mode 100644 index 00000000..d9e43c04 --- /dev/null +++ b/src/cpp/src/lru_policy.cpp @@ -0,0 +1,40 @@ +#include "lru_policy.h" + +LRUPolicy::LRUPolicy() { + std::cout << "LRU Policy constructor" << std::endl; +} + +LRUPolicy::~LRUPolicy() = default; + +void LRUPolicy::insert(int pid) { + // Remove if already present + auto it = lru_map_.find(pid); + if (it != lru_map_.end()) { + lru_list_.erase(it->second); + } + + // Insert to back (most recently used) + lru_list_.push_back(pid); + lru_map_[pid] = std::prev(lru_list_.end()); + + if (debug_) { + std::cout << "[LRUPolicy::insert] Current buffer: "; + for (int id : lru_list_) std::cout << id << " "; + std::cout << std::endl; + } +} + +std::vector LRUPolicy::findVictims() { + if (lru_list_.empty()) return {}; + int victim = lru_list_.front(); // Least recently used + remove(victim); + return {victim}; +} + +void LRUPolicy::remove(int pid) { + auto it = lru_map_.find(pid); + if (it != lru_map_.end()) { + lru_list_.erase(it->second); + lru_map_.erase(it); + } +} diff --git a/src/cpp/src/partition_manager.cpp b/src/cpp/src/partition_manager.cpp index aac447dc..af8c77b5 100644 --- a/src/cpp/src/partition_manager.cpp +++ b/src/cpp/src/partition_manager.cpp @@ -22,6 +22,7 @@ static inline const uint8_t *as_uint8_ptr(const Tensor &float_tensor) { } PartitionManager::PartitionManager() { + std::cout << "inited partition manager" << std::endl; parent_ = nullptr; partitions_ = nullptr; } @@ -36,12 +37,21 @@ void PartitionManager::init_partitions( bool check_uniques ) { if (debug_) { - std::cout << "[PartitionManager] init_partitions: Entered." << std::endl; + // std::cout << "[PartitionManager] init_partitions: Entered." << std::endl; } parent_ = parent; int64_t nlist = clustering->nlist(); int64_t ntotal = clustering->ntotal(); int64_t dim = clustering->dim(); + int64_t curr_level = -1; + + std::cout << "nlist, ntotal, dim : " << nlist << " " << ntotal << " " << dim << std::endl; + + if(parent_ != nullptr) { + curr_level = parent_->current_level_ - 1; + } + + // std::cout << "Initing partitions for level: " << current_level << std::endl; if (nlist <= 0 && ntotal <= 0) { throw runtime_error("[PartitionManager] init_partitions: nlist and ntotal is <= 0."); @@ -67,13 +77,21 @@ void PartitionManager::init_partitions( // Add an empty list for each partition ID auto partition_ids_accessor = clustering->partition_ids.accessor(); for (int64_t i = 0; i < nlist; i++) { - partitions_->add_list(partition_ids_accessor[i]); + // Might have to change the function in here to init the correct disk version + if(curr_level == 0) { + // std::cout << "[PartitionManager] added empty file partition for " << partition_ids_accessor[i] << std::endl; + partitions_->add_list_file(partition_ids_accessor[i]); + } + else { + partitions_->add_list(partition_ids_accessor[i]); + } if (debug_) { - std::cout << "[PartitionManager] init_partitions: Added empty list for partition " << i << std::endl; + // std::cout << "[PartitionManager] init_partitions: Added empty list for partition " << i << std::endl; } } // Now insert the vectors into each partition + // std::cout << "Right before vector insertion, nlist is " << nlist << std::endl; for (int64_t i = 0; i < nlist; i++) { Tensor v = clustering->vectors[i]; Tensor id = clustering->vector_ids[i]; @@ -82,9 +100,10 @@ void PartitionManager::init_partitions( } size_t count = v.size(0); + // std::cout << "Partition with ID " << i << " for level " << current_level << " has " << count << " vectors" << std::endl; if (count == 0) { if (debug_) { - std::cout << "[PartitionManager] init_partitions: Partition " << i << " is empty." << std::endl; + // std::cout << "[PartitionManager] init_partitions: Partition " << i << " is empty." << std::endl; } continue; } else { @@ -100,14 +119,14 @@ void PartitionManager::init_partitions( } } partitions_->add_entries( - partition_ids_accessor[i], - count, - id.data_ptr(), - as_uint8_ptr(v) + partition_ids_accessor[i], + count, + id.data_ptr(), + as_uint8_ptr(v) ); if (debug_) { - std::cout << "[PartitionManager] init_partitions: Added " << count - << " entries to partition " << partition_ids_accessor[i] << std::endl; + // std::cout << "[PartitionManager] init_partitions: Added " << count + // << " entries to partition " << partition_ids_accessor[i] << std::endl; } } } @@ -730,7 +749,7 @@ Tensor PartitionManager::get_ids() { Tensor PartitionManager::get_partition_sizes(Tensor partition_ids) { if (debug_) { - std::cout << "[PartitionManager] get_partition_sizes: Getting sizes for partitions." << std::endl; + // std::cout << "[PartitionManager] get_partition_sizes: Getting sizes for partitions." << std::endl; } if (!partitions_) { throw runtime_error("[PartitionManager] get_partition_sizes: partitions_ is null."); @@ -746,8 +765,8 @@ Tensor PartitionManager::get_partition_sizes(Tensor partition_ids) { int64_t list_no = partition_ids_accessor[i]; partition_sizes_accessor[i] = partitions_->list_size(list_no); if (debug_) { - std::cout << "[PartitionManager] get_partition_sizes: Partition " << list_no - << " size: " << partition_sizes_accessor[i] << std::endl; + // std::cout << "[PartitionManager] get_partition_sizes: Partition " << list_no + // << " size: " << partition_sizes_accessor[i] << std::endl; } } return partition_sizes; diff --git a/src/cpp/src/quake_index.cpp b/src/cpp/src/quake_index.cpp index 85e80e8b..08294182 100644 --- a/src/cpp/src/quake_index.cpp +++ b/src/cpp/src/quake_index.cpp @@ -26,6 +26,7 @@ QuakeIndex::~QuakeIndex() { } shared_ptr QuakeIndex::build(Tensor x, Tensor ids, shared_ptr build_params) { + std::cout << "[QuakeIndex::build] called with tensor size = " << x.size(0) << "," << x.size(1) << " nlist = " << build_params->nlist << std::endl; build_params_ = build_params; metric_ = str_to_metric_type(build_params_->metric); @@ -55,6 +56,7 @@ shared_ptr QuakeIndex::build(Tensor x, Tensor ids, shared_ptr(current_level_ + 1); auto parent_build_params = make_shared(); parent_build_params->metric = build_params_->metric; + // std::cout << "About to pass nlist = " << parent_build_params->nlist << " to parent build call" << std::endl; parent_->build(clustering->centroids, clustering->partition_ids, parent_build_params); // initialize the partition manager @@ -92,6 +94,12 @@ QuakeIndex::search(Tensor x, shared_ptr search_params) { if (!query_coordinator_) { throw std::runtime_error("[QuakeIndex::search()] No query coordinator. Did you build the index?"); } + std::cout << "Search params has buffer size set to: " << search_params->buffer_size << std::endl; + if(search_params->buffer_size != query_coordinator_->buffer->bufSize) { + std::cout << "Changing buffer size from " << query_coordinator_->buffer->bufSize << " to " << search_params->buffer_size << std::endl; + query_coordinator_->buffer->bufSize = search_params->buffer_size; + } + return query_coordinator_->search(x, search_params); } diff --git a/src/cpp/src/query_coordinator.cpp b/src/cpp/src/query_coordinator.cpp index 2754d773..8401562c 100644 --- a/src/cpp/src/query_coordinator.cpp +++ b/src/cpp/src/query_coordinator.cpp @@ -23,6 +23,7 @@ QueryCoordinator::QueryCoordinator(shared_ptr parent, metric_(metric), num_workers_(num_workers), stop_workers_(false) { + buffer = make_shared(); if (num_workers_ > 0) { initialize_workers(num_workers_); } @@ -549,6 +550,21 @@ shared_ptr QueryCoordinator::serial_scan(Tensor x, Tensor partitio continue; // Skip invalid partitions } + shared_ptr fip = nullptr; + // Get the partition’s data. + if (partition_manager_->parent_ && partition_manager_->parent_->current_level_ == 1) { + auto it = partition_manager_->partitions_->partitions_.find(pi); + if (it == partition_manager_->partitions_->partitions_.end()) { + throw std::runtime_error("pid does not exist"); + } + fip = std::dynamic_pointer_cast(partition_manager_->partitions_->partitions_[pi]); + if(fip) { + // std::cout << "[QueryCoordinator] serial_scan: Loading level " << partition_manager_->parent_->current_level_ - 1 << " partition ID: " << pi << std::endl; + // dip->load(); + buffer->put(pi, fip, partition_manager_); + } + } + start_time = std::chrono::high_resolution_clock::now(); float *list_vectors = (float *) partition_manager_->partitions_->get_codes(pi); int64_t *list_ids = (int64_t *) partition_manager_->partitions_->get_ids(pi); @@ -580,6 +596,9 @@ shared_ptr QueryCoordinator::serial_scan(Tensor x, Tensor partitio break; } } + + // release the buffer + // if(dip) dip->save(); } // Retrieve the top-k results for query q. all_topk_dists[q] = topk_buf->get_topk(); @@ -617,6 +636,13 @@ shared_ptr QueryCoordinator::search(Tensor x, shared_ptrparent_) { + std::cout << "Entered QueryCoordinator::search for level : " << partition_manager_->parent_->current_level_ << std::endl; + } + else { + std::cout << "Entered QueryCoordinator::search for level : 0" << std::endl; + } + x = x.contiguous(); auto parent_timing_info = std::make_shared(); @@ -736,8 +762,17 @@ shared_ptr QueryCoordinator::batched_serial_scan( Tensor indices_tensor = torch::tensor(query_indices, torch::kInt64); Tensor x_subset = x.index_select(0, indices_tensor); int64_t batch_size = x_subset.size(0); - + // Get the partition’s data. + // if (partition_manager_->parent_ && partition_manager_->parent_->current_level_ == 1) { + // auto it = partition_manager_->partitions_->partitions_.find(pid); + // if (it == partition_manager_->partitions_->partitions_.end()) { + // throw std::runtime_error("pid does not exist"); + // } + // auto dip = std::dynamic_pointer_cast(partition_manager_->partitions_->partitions_[pid]); + // std::cout << "[QueryCoordinator] batched_serial_scan: Loading level " << partition_manager_->parent_->current_level_ - 1 << " partition ID: " << pid << std::endl; + // dip->load(); + // } const float *list_codes = (float *) partition_manager_->partitions_->get_codes(pid); const int64_t *list_ids = partition_manager_->partitions_->get_ids(pid); int64_t list_size = partition_manager_->partitions_->list_size(pid); @@ -764,6 +799,10 @@ shared_ptr QueryCoordinator::batched_serial_scan( // Merge: global buffer adds the new candidate distances/ids. global_buffers[global_q]->batch_add(local_dists.data(), local_ids.data(), local_ids.size()); } + + // Free memory here? + // dip->save() + // Or do this in the destructor? } // Aggregate the final results into output tensors. diff --git a/test/cpp/quake_index.cpp b/test/cpp/quake_index.cpp index e16a1d6b..aee8100f 100644 --- a/test/cpp/quake_index.cpp +++ b/test/cpp/quake_index.cpp @@ -6,6 +6,7 @@ #include #include "quake_index.h" +#include "testing.h" #include // Helper functions for random data @@ -79,6 +80,33 @@ TEST_F(QuakeIndexTest, BuildTest) { EXPECT_EQ(timing_info->d, data_vectors_.size(1)); } +TEST_F(QuakeIndexTest, VizBuildTest) { + QuakeIndex index; + + // create build_params + auto build_params = std::make_shared(); + build_params->nlist = nlist_; // Use multi-partition + build_params->metric = "l2"; + build_params->niter = 5; // small kmeans iteration + + auto timing_info = index.build(data_vectors_, data_ids_, build_params); + + // std::cout << "Level: " << index.current_level_ << std::endl; + print_index_hierarchy(index); + // auto temp_index = index; + // while(temp_index) { + // std::cout << "Level: " << temp_index.current_level_ << std::endl; + // temp_index = temp_index.parent_; + // } + index.partition_manager_->partitions_->print_stats(); + + EXPECT_NE(index.partition_manager_, nullptr); + EXPECT_NE(index.query_coordinator_, nullptr); + EXPECT_NE(index.build_params_, nullptr); + + +} + // Building the index with nlist <= 1 => a "flat" scenario TEST_F(QuakeIndexTest, BuildFlatTest) { QuakeIndex index;