Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/cpp/bindings/wrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ PYBIND11_MODULE(_bindings, m) {
(std::string("Threshold to trigger recomputation of APS. default = ") + std::to_string(DEFAULT_RECOMPUTE_THRESHOLD)).c_str())
.def_readwrite("aps_flush_period_us", &SearchParams::aps_flush_period_us,
(std::string("APS flush period in microseconds. default = ") + std::to_string(DEFAULT_APS_FLUSH_PERIOD_US)).c_str())
.def_readwrite("buffer_size", &SearchParams::buffer_size,
(std::string("Buffer size in number of partitons. default") + std::to_string(1)).c_str())
.def("__repr__", [](const SearchParams &s) {
std::ostringstream oss;
oss << "{";
Expand All @@ -181,6 +183,7 @@ PYBIND11_MODULE(_bindings, m) {
oss << "\"initial_search_fraction\": " << s.initial_search_fraction << ", ";
oss << "\"recompute_threshold\": " << s.recompute_threshold << ", ";
oss << "\"aps_flush_period_us\": " << s.aps_flush_period_us;
oss << "\"buffer_size\": " << s.buffer_size;
oss << "}";
return oss.str();
});
Expand Down
40 changes: 40 additions & 0 deletions src/cpp/include/buffer_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#ifndef BUFFER_MANAGER_H
#define BUFFER_MANAGER_H

#include <unordered_set>
#include <queue>
#include <partition_manager.h>
#include <file_index_partition.h>
#include <policy.h>

// decide who to evict
// virtual class Policy {
// public:
// std::queue<int> q; // who to evict
// std::vector findVictims();
// void insert(int pid);
// private:
// void remove(int pid); // called by findVictim
// }

// actually putting and removing things in buffer
class BufferManager {
public:
std::unordered_set<int> u; // who is in the buffer (needs a better name - Atharva)
shared_ptr<Policy> policy;
int bufSize; // number of partitions in the memory
int curSize;
bool debug_ = true;


BufferManager();
~BufferManager();

void put(int pid, shared_ptr<FileIndexPartition> fip, shared_ptr<PartitionManager> partition_manager_); // load the vectors and ids of a partition from disk
void flush(int pid, shared_ptr<FileIndexPartition> fip); // flush the vectors and ids of a partition to disk, while still keeping in memory

private:
void evict(); // evict a partition based on the eviction policy, this will be called by putBuf if the buffer is full
};

#endif
1 change: 1 addition & 0 deletions src/cpp/include/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ struct SearchParams {
float recompute_threshold = DEFAULT_RECOMPUTE_THRESHOLD;
float initial_search_fraction = DEFAULT_INITIAL_SEARCH_FRACTION;
int aps_flush_period_us = DEFAULT_APS_FLUSH_PERIOD_US;
int buffer_size = 1;

SearchParams() = default;
};
Expand Down
10 changes: 10 additions & 0 deletions src/cpp/include/dynamic_inverted_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <common.h>
#include <faiss/invlists/InvertedLists.h>
#include <index_partition.h>
#include <file_index_partition.h>

namespace faiss {
/**
Expand Down Expand Up @@ -196,6 +197,15 @@ namespace faiss {
*/
void add_list(size_t list_no);


/**
* @brief Add a new, empty disk-based partition (need to decide if we create a file for it also here)
*
* @param list_no The partition number to add.
* @throws std::runtime_error if the partition already exists.
*/
void add_list_file(size_t list_no);

/**
* @brief Check if a given ID exists in a partition.
*
Expand Down
22 changes: 22 additions & 0 deletions src/cpp/include/fifo_policy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#ifndef FIFO_POLICY_H
#define FIFO_POLICY_H

#include <policy.h>
#include <queue>

class FIFOPolicy : public Policy {
public:
std::queue<int> fifo_q; // who to evict
bool debug_ = true;


FIFOPolicy();
~FIFOPolicy();
std::vector<int> findVictims();
void insert(int pid);

private:
void remove(int pid); // called by findVictim
};

#endif
88 changes: 88 additions & 0 deletions src/cpp/include/file_index_partition.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#ifndef FILE_INDEX_PARTITION_H
#define FILE_INDEX_PARTITION_H

#include <index_partition.h>
#include <string>


class FileIndexPartition : public IndexPartition {
public:
std::string file_path_;
int fd_ = -1; // file descriptor
bool is_in_memory = false; // indicate whether the partition is in memory
bool is_dirty = false; // indicate whether the partition is dirty (changes haven't been synced to disk)
std::mutex ref_mutex;
int ref_cnt = 0;

/// Default constructor.
FileIndexPartition() = default;

/**
* @brief Parameterized constructor.
*
* Initializes the partition with a given number of vectors and copies in the provided codes and IDs.
*
* @param num_vectors The initial number of vectors.
* @param codes Pointer to the buffer holding the encoded vectors.
* @param ids Pointer to the vector IDs.
* @param code_size Size of each code in bytes.
*/
FileIndexPartition(int64_t num_vectors,
uint8_t* codes,
idx_t* ids,
int64_t code_size);

/**
* @brief Move constructor.
*
* Transfers the contents from another partition into this one.
*
* @param other The partition to move from.
*/
FileIndexPartition(FileIndexPartition&& other) noexcept;

/**
* @brief Move assignment operator.
*
* Transfers the contents from another partition into this one, clearing existing data.
*
* @param other The partition to move from.
* @return Reference to this partition.
*/
FileIndexPartition& operator=(FileIndexPartition&& other) noexcept;

/// Destructor. Frees all allocated memory.
~FileIndexPartition();

void append(int64_t n_entry, const idx_t* new_ids, const uint8_t* new_codes);
void update(int64_t offset, int64_t n_entry, const idx_t* new_ids, const uint8_t* new_codes);
void remove(int64_t index);
void resize(int64_t new_capacity);
void clear();
int64_t find_id(idx_t id) const;
void reallocate(int64_t new_capacity);



// disk specific method
void load(); // load the partition to memory from disk
void save(); // store the vectors on disk
void set_file_path(std::string file_path);



#ifdef QUAKE_USE_NUMA
/**
* @brief Set the NUMA node for the partition.
*
* Moves the memory to the specified NUMA node if necessary.
*
* @param new_numa_node The target NUMA node.
*/
void set_numa_node(int new_numa_node);
#endif


};

#endif // FILE_INDEX_PARTITION_H
20 changes: 17 additions & 3 deletions src/cpp/include/index_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class IndexPartition {
IndexPartition& operator=(IndexPartition&& other) noexcept;

/// Destructor. Frees all allocated memory.
~IndexPartition();
virtual ~IndexPartition();

/**
* @brief Set the code size.
Expand Down Expand Up @@ -144,6 +144,8 @@ class IndexPartition {
* @param new_capacity The new capacity (number of vectors).
*/
void reallocate_memory(int64_t new_capacity);

void free_memory();

#ifdef QUAKE_USE_NUMA
/**
Expand Down Expand Up @@ -171,7 +173,7 @@ class IndexPartition {
*
* Releases the codes and IDs buffers.
*/
void free_memory();
// void free_memory();

/**
* @brief Ensure capacity.
Expand All @@ -181,7 +183,7 @@ class IndexPartition {
*
* @param required The minimum required number of vectors.
*/
void ensure_capacity(int64_t required);
// void ensure_capacity(int64_t required);

/**
* @brief Allocate memory for a given type.
Expand All @@ -195,5 +197,17 @@ class IndexPartition {
*/
template <typename T>
T* allocate_memory(size_t num_elements, int numa_node);

protected:
/**
* @brief Ensure capacity.
*
* Checks that the internal buffer can hold at least the required number of vectors,
* and resizes if necessary.
*
* @param required The minimum required number of vectors.
*/
void ensure_capacity(int64_t required);

};
#endif //INDEX_PARTITION_H
26 changes: 26 additions & 0 deletions src/cpp/include/lru_policy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#ifndef LRU_POLICY_H
#define LRU_POLICY_H

#include <policy.h>
#include <unordered_map>
#include <list>
#include <vector>
#include <iostream>

class LRUPolicy : public Policy {
public:
LRUPolicy();
~LRUPolicy();

std::vector<int> findVictims() override;
void insert(int pid) override;

private:
void remove(int pid) override;

std::list<int> lru_list_; // most recent at back, least recent at front
std::unordered_map<int, std::list<int>::iterator> lru_map_;
bool debug_ = true;
};

#endif // LRU_POLICY_H
2 changes: 1 addition & 1 deletion src/cpp/include/partition_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class PartitionManager {
std::shared_ptr<faiss::DynamicInvertedLists> partitions_ = nullptr; ///< Pointer to the inverted lists.
int64_t curr_partition_id_ = 0; ///< Current partition ID.

bool debug_ = false; ///< If true, print debug information.
bool debug_ = true; ///< If true, print debug information.
bool check_uniques_ = false; ///< If true, check that vector IDs are unique and don't already exist in the index.

std::set<int64_t> resident_ids_; ///< Set of partition IDs.
Expand Down
15 changes: 15 additions & 0 deletions src/cpp/include/policy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#ifndef POLICY_H
#define POLICY_H

#include <vector>

class Policy {
public:
virtual std::vector<int> findVictims() = 0;
virtual void insert(int pid) = 0;
virtual ~Policy() {};
private:
virtual void remove(int pid) = 0; // called by findVictim
};

#endif
2 changes: 1 addition & 1 deletion src/cpp/include/quake_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class QuakeIndex {
shared_ptr<MaintenancePolicyParams> maintenance_policy_params_; ///< Parameters for the maintenance policy.
int current_level_ = 0; ///< Current level of the index.

bool debug_ = false; ///< If true, print debug information.
bool debug_ = true; ///< If true, print debug information.

/**
* @brief Constructor for QuakeIndex.
Expand Down
4 changes: 3 additions & 1 deletion src/cpp/include/query_coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <list_scanning.h>
#include <maintenance_policies.h>
#include <blockingconcurrentqueue.h>
#include <buffer_manager.h>

class QuakeIndex;
class PartitionManager;
Expand All @@ -30,6 +31,7 @@ class QueryCoordinator {
shared_ptr<PartitionManager> partition_manager_;
shared_ptr<MaintenancePolicy> maintenance_policy_;
shared_ptr<QuakeIndex> parent_;
shared_ptr<BufferManager> buffer;
MetricType metric_;

vector<std::thread> worker_threads_;
Expand All @@ -45,7 +47,7 @@ class QueryCoordinator {
std::mutex result_mutex_;
std::atomic<bool> stop_workers_;

bool debug_ = false;
bool debug_ = true;

QueryCoordinator(shared_ptr<QuakeIndex> parent,
shared_ptr<PartitionManager> partition_manager,
Expand Down
Loading