diff --git a/src/algorithm/hnswlib/algorithm_interface.h b/src/algorithm/hnswlib/algorithm_interface.h index 75860146..6d6e5b7e 100644 --- a/src/algorithm/hnswlib/algorithm_interface.h +++ b/src/algorithm/hnswlib/algorithm_interface.h @@ -54,9 +54,6 @@ class AlgorithmInterface { size_t ef, vsag::BaseFilterFunctor* isIdAllowed = nullptr) const; - virtual void - saveIndex(const std::string& location) = 0; - virtual void saveIndex(void* d) = 0; diff --git a/src/algorithm/hnswlib/block_manager.cpp b/src/algorithm/hnswlib/block_manager.cpp index a95cf2e0..60df5863 100644 --- a/src/algorithm/hnswlib/block_manager.cpp +++ b/src/algorithm/hnswlib/block_manager.cpp @@ -99,14 +99,6 @@ BlockManager::Serialize(std::ostream& ofs, size_t cur_element_count) { return this->SerializeImpl(writer, cur_element_count); } -bool -BlockManager::Deserialize(std::function read_func, - uint64_t cursor, - size_t cur_element_count) { - ReadFuncStreamReader reader(read_func, cursor); - return this->DeserializeImpl(reader, cur_element_count); -} - bool BlockManager::Deserialize(std::istream& ifs, size_t cur_element_count) { IOStreamReader reader(ifs); diff --git a/src/algorithm/hnswlib/block_manager.h b/src/algorithm/hnswlib/block_manager.h index 30c0e4f5..d76f617c 100644 --- a/src/algorithm/hnswlib/block_manager.h +++ b/src/algorithm/hnswlib/block_manager.h @@ -44,11 +44,6 @@ class BlockManager { bool Serialize(std::ostream& ofs, size_t cur_element_count); - bool - Deserialize(std::function read_func, - uint64_t cursor, - size_t cur_element_count); - bool Deserialize(std::istream& ifs, size_t cur_element_count); diff --git a/src/algorithm/hnswlib/hnswalg.cpp b/src/algorithm/hnswlib/hnswalg.cpp index e473fa51..578951a2 100644 --- a/src/algorithm/hnswlib/hnswalg.cpp +++ b/src/algorithm/hnswlib/hnswalg.cpp @@ -789,14 +789,6 @@ HierarchicalNSW::saveIndex(std::ostream& out_stream) { SerializeImpl(writer); } -void -HierarchicalNSW::saveIndex(const std::string& location) { - std::ofstream output(location, std::ios::binary); - IOStreamWriter writer(output); - SerializeImpl(writer); - output.close(); -} - template static void WriteOne(StreamWriter& writer, T& value) { @@ -979,44 +971,6 @@ HierarchicalNSW::markDeletedInternal(InnerIdType internalId) { } } -/* - * Removes the deleted mark of the node, does NOT really change the current graph. - * - * Note: the method is not safe to use when replacement of deleted elements is enabled, - * because elements marked as deleted can be completely removed by addPoint - */ -void -HierarchicalNSW::unmarkDelete(LabelType label) { - // lock all operations with element by label - std::unique_lock lock_table(label_lookup_lock_); - auto search = label_lookup_.find(label); - if (search == label_lookup_.end()) { - throw std::runtime_error("Label not found"); - } - InnerIdType internalId = search->second; - unmarkDeletedInternal(internalId); -} - -/* - * Remove the deleted mark of the node. - */ -void -HierarchicalNSW::unmarkDeletedInternal(InnerIdType internalId) { - assert(internalId < cur_element_count_); - if (isMarkedDeleted(internalId)) { - unsigned char* ll_cur = - (unsigned char*)data_level0_memory_->GetElementPtr(internalId, offsetLevel0_) + 2; - *ll_cur &= ~DELETE_MARK; - num_deleted_ -= 1; - if (allow_replace_deleted_) { - std::unique_lock lock_deleted_elements(deleted_elements_lock_); - deleted_elements_.erase(internalId); - } - } else { - throw std::runtime_error("The requested to undelete element is not deleted"); - } -} - /* * Adds point. */ @@ -1441,8 +1395,13 @@ HierarchicalNSW::searchKnn(const void* query_data, MaxHeap top_candidates(allocator_); - top_candidates = - searchBaseLayerST(currObj, query_data, std::max(ef, k), isIdAllowed); + if (num_deleted_ == 0) { + top_candidates = + searchBaseLayerST(currObj, query_data, std::max(ef, k), isIdAllowed); + } else { + top_candidates = + searchBaseLayerST(currObj, query_data, std::max(ef, k), isIdAllowed); + } while (top_candidates.size() > k) { top_candidates.pop(); @@ -1501,8 +1460,13 @@ HierarchicalNSW::searchRange(const void* query_data, } MaxHeap top_candidates(allocator_); - - top_candidates = searchBaseLayerST(currObj, query_data, radius, ef, isIdAllowed); + if (num_deleted_ == 0) { + top_candidates = + searchBaseLayerST(currObj, query_data, radius, ef, isIdAllowed); + } else { + top_candidates = + searchBaseLayerST(currObj, query_data, radius, ef, isIdAllowed); + } while (not top_candidates.empty()) { std::pair rez = top_candidates.top(); @@ -1513,39 +1477,4 @@ HierarchicalNSW::searchRange(const void* query_data, // std::cout << "hnswalg::result.size(): " << result.size() << std::endl; return result; } - -void -HierarchicalNSW::checkIntegrity() { - int connections_checked = 0; - vsag::Vector inbound_connections_num(cur_element_count_, 0, allocator_); - for (int i = 0; i < cur_element_count_; i++) { - for (int l = 0; l <= element_levels_[i]; l++) { - auto data_ll_cur = getLinklistAtLevelWithLock(i, l); - linklistsizeint* ll_cur = (linklistsizeint*)data_ll_cur.get(); - int size = getListCount(ll_cur); - auto* data = (InnerIdType*)(ll_cur + 1); - vsag::UnorderedSet s(allocator_); - for (int j = 0; j < size; j++) { - assert(data[j] > 0); - assert(data[j] < cur_element_count_); - assert(data[j] != i); - inbound_connections_num[data[j]]++; - s.insert(data[j]); - connections_checked++; - } - assert(s.size() == size); - } - } - if (cur_element_count_ > 1) { - int min1 = inbound_connections_num[0], max1 = inbound_connections_num[0]; - for (int i = 0; i < cur_element_count_; i++) { - assert(inbound_connections_num[i] > 0); - min1 = std::min(inbound_connections_num[i], min1); - max1 = std::max(inbound_connections_num[i], max1); - } - std::cout << "Min inbound: " << min1 << ", Max inbound:" << max1 << "\n"; - } - std::cout << "integrity ok, checked " << connections_checked << " connections\n"; -} - } // namespace hnswlib diff --git a/src/algorithm/hnswlib/hnswalg.h b/src/algorithm/hnswlib/hnswalg.h index f880e746..440bd6d6 100644 --- a/src/algorithm/hnswlib/hnswalg.h +++ b/src/algorithm/hnswlib/hnswalg.h @@ -302,9 +302,6 @@ class HierarchicalNSW : public AlgorithmInterface { void saveIndex(std::ostream& out_stream) override; - void - saveIndex(const std::string& location) override; - void SerializeImpl(StreamWriter& writer); @@ -329,20 +326,6 @@ class HierarchicalNSW : public AlgorithmInterface { void markDeletedInternal(InnerIdType internalId); - /* - * Removes the deleted mark of the node, does NOT really change the current graph. - * - * Note: the method is not safe to use when replacement of deleted elements is enabled, - * because elements marked as deleted can be completely removed by addPoint - */ - void - unmarkDelete(LabelType label); - /* - * Remove the deleted mark of the node. - */ - void - unmarkDeletedInternal(InnerIdType internalId); - /* * Checks the first 16 bits of the memory to see if the element is marked deleted. */ @@ -405,9 +388,6 @@ class HierarchicalNSW : public AlgorithmInterface { uint64_t ef, vsag::BaseFilterFunctor* isIdAllowed = nullptr) const override; - void - checkIntegrity(); - void reset(); diff --git a/src/algorithm/hnswlib/hnswalg_static.h b/src/algorithm/hnswlib/hnswalg_static.h index 4d37eb38..0e51d551 100644 --- a/src/algorithm/hnswlib/hnswalg_static.h +++ b/src/algorithm/hnswlib/hnswalg_static.h @@ -1144,39 +1144,6 @@ class StaticHierarchicalNSW : public AlgorithmInterface { out_stream.write((char*)node_cluster_dist_, max_elements_ * sizeof(float)); } - void - saveIndex(const std::string& location) override { - throw std::runtime_error("static hnsw does not support save index"); - // std::ofstream output(location, std::ios::binary); - // std::streampos position; - // - // writeBinaryPOD(output, offsetLevel0_); - // writeBinaryPOD(output, max_elements_); - // writeBinaryPOD(output, cur_element_count_); - // writeBinaryPOD(output, size_data_per_element_); - // writeBinaryPOD(output, label_offset_); - // writeBinaryPOD(output, offsetData_); - // writeBinaryPOD(output, maxlevel_); - // writeBinaryPOD(output, enterpoint_node_); - // writeBinaryPOD(output, maxM_); - // - // writeBinaryPOD(output, maxM0_); - // writeBinaryPOD(output, M_); - // writeBinaryPOD(output, mult_); - // writeBinaryPOD(output, ef_construction_); - // - // output.write(data_level0_memory_, cur_element_count_ * size_data_per_element_); - // - // for (size_t i = 0; i < cur_element_count_; i++) { - // unsigned int linkListSize = - // element_levels_[i] > 0 ? size_links_per_element_ * element_levels_[i] : 0; - // writeBinaryPOD(output, linkListSize); - // if (linkListSize) - // output.write(linkLists_[i], linkListSize); - // } - // output.close(); - } - // load index from a file stream void loadIndex(StreamReader& in_stream, SpaceInterface* s, size_t max_elements_i = 0) override { diff --git a/src/algorithm/hnswlib/hnswlib.h b/src/algorithm/hnswlib/hnswlib.h index f65408f3..3ac8974a 100644 --- a/src/algorithm/hnswlib/hnswlib.h +++ b/src/algorithm/hnswlib/hnswlib.h @@ -26,37 +26,6 @@ #endif #endif -#if defined(USE_AVX) || defined(USE_SSE) -#ifdef _MSC_VER -#include - -#include -void -cpuid(int32_t out[4], int32_t eax, int32_t ecx) { - __cpuidex(out, eax, ecx); -} -static __int64 -xgetbv(unsigned int x) { - return _xgetbv(x); -} -#else -#include -#include - -#include -#include -static void -cpuid(int32_t cpuInfo[4], int32_t eax, int32_t ecx) { - __cpuid_count(eax, ecx, cpuInfo[0], cpuInfo[1], cpuInfo[2], cpuInfo[3]); -} -static uint64_t -xgetbv(unsigned int index) { - uint32_t eax, edx; - __asm__ __volatile__("xgetbv" : "=a"(eax), "=d"(edx) : "c"(index)); - return ((uint64_t)edx << 32) | eax; -} -#endif - #if defined(USE_AVX512) #include #endif @@ -69,69 +38,6 @@ xgetbv(unsigned int index) { #define PORTABLE_ALIGN64 __declspec(align(64)) #endif -// Adapted from https://github.com/Mysticial/FeatureDetector -#define _XCR_XFEATURE_ENABLED_MASK 0 - -static bool -AVXCapable() { - int cpuInfo[4]; - - // CPU support - cpuid(cpuInfo, 0, 0); - int nIds = cpuInfo[0]; - - bool HW_AVX = false; - if (nIds >= 0x00000001) { - cpuid(cpuInfo, 0x00000001, 0); - HW_AVX = (cpuInfo[2] & ((int)1 << 28)) != 0; - } - - // OS support - cpuid(cpuInfo, 1, 0); - - bool osUsesXSAVE_XRSTORE = (cpuInfo[2] & (1 << 27)) != 0; - bool cpuAVXSuport = (cpuInfo[2] & (1 << 28)) != 0; - - bool avxSupported = false; - if (osUsesXSAVE_XRSTORE && cpuAVXSuport) { - uint64_t xcrFeatureMask = xgetbv(_XCR_XFEATURE_ENABLED_MASK); - avxSupported = (xcrFeatureMask & 0x6) == 0x6; - } - return HW_AVX && avxSupported; -} - -static bool -AVX512Capable() { - if (!AVXCapable()) - return false; - - int cpuInfo[4]; - - // CPU support - cpuid(cpuInfo, 0, 0); - int nIds = cpuInfo[0]; - - bool HW_AVX512F = false; - if (nIds >= 0x00000007) { // AVX512 Foundation - cpuid(cpuInfo, 0x00000007, 0); - HW_AVX512F = (cpuInfo[1] & ((int)1 << 16)) != 0; - } - - // OS support - cpuid(cpuInfo, 1, 0); - - bool osUsesXSAVE_XRSTORE = (cpuInfo[2] & (1 << 27)) != 0; - bool cpuAVXSuport = (cpuInfo[2] & (1 << 28)) != 0; - - bool avx512Supported = false; - if (osUsesXSAVE_XRSTORE && cpuAVXSuport) { - uint64_t xcrFeatureMask = xgetbv(_XCR_XFEATURE_ENABLED_MASK); - avx512Supported = (xcrFeatureMask & 0xe6) == 0xe6; - } - return HW_AVX512F && avx512Supported; -} -#endif - #include "hnswalg.h" #include "hnswalg_static.h" #include "space_ip.h" diff --git a/src/default_thread_pool.cpp b/src/default_thread_pool.cpp index b5bfb9af..f74c8937 100644 --- a/src/default_thread_pool.cpp +++ b/src/default_thread_pool.cpp @@ -28,7 +28,8 @@ DefaultThreadPool::Enqueue(std::function func) { void DefaultThreadPool::WaitUntilEmpty() { - pool_->wait_until_empty(); + // In progschj::ThreadPool, wait_until_nothing_in_flight indicates that all tasks have been completed, while wait_until_empty means that there are no tasks waiting. Therefore, what we actually need here is the semantics of wait_until_nothing_in_flight. + pool_->wait_until_nothing_in_flight(); } void diff --git a/src/index/hnsw_test.cpp b/src/index/hnsw_test.cpp index cdd0ce60..7f745bc3 100644 --- a/src/index/hnsw_test.cpp +++ b/src/index/hnsw_test.cpp @@ -418,6 +418,20 @@ TEST_CASE("static hnsw", "[ut][hnsw]") { REQUIRE_FALSE(result.has_value()); REQUIRE(result.error().type == ErrorType::UNSUPPORTED_INDEX_OPERATION); + SECTION("serialize to binaryset") { + auto binary_set = index->Serialize(); + REQUIRE(binary_set.has_value()); + + auto voidresult = index->Deserialize(binary_set.value()); + REQUIRE_FALSE(voidresult.has_value()); + REQUIRE(voidresult.error().type == ErrorType::INDEX_NOT_EMPTY); + auto another_index = std::make_shared(hnsw_obj, commom_param); + another_index->InitMemorySpace(); + auto deserialize_result = another_index->Deserialize(binary_set.value()); + REQUIRE(deserialize_result.has_value()); + index = another_index; + } + JsonType params{ {"hnsw", {{"ef_search", 100}}}, }; @@ -444,6 +458,20 @@ TEST_CASE("static hnsw", "[ut][hnsw]") { auto remove_result = index->Remove(ids[0]); REQUIRE_FALSE(remove_result.has_value()); REQUIRE(remove_result.error().type == ErrorType::UNSUPPORTED_INDEX_OPERATION); + + SECTION("serialize to fstream") { + fixtures::TempDir dir("hnsw_test_deserialize_on_not_empty_index"); + std::fstream out_stream(dir.path + "index.bin", std::ios::out | std::ios::binary); + auto serialize_result = index->Serialize(out_stream); + REQUIRE(serialize_result.has_value()); + out_stream.close(); + + std::fstream in_stream(dir.path + "index.bin", std::ios::in | std::ios::binary); + auto voidresult = index->Deserialize(in_stream); + REQUIRE_FALSE(voidresult.has_value()); + REQUIRE(voidresult.error().type == ErrorType::INDEX_NOT_EMPTY); + in_stream.close(); + } } TEST_CASE("hnsw add vector with duplicated id", "[ut][hnsw]") { diff --git a/src/safe_thread_pool_test.cpp b/src/safe_thread_pool_test.cpp new file mode 100644 index 00000000..f85a4478 --- /dev/null +++ b/src/safe_thread_pool_test.cpp @@ -0,0 +1,41 @@ + +// Copyright 2024-present the vsag project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "safe_thread_pool.h" + +#include + +TEST_CASE("test safe thread pool", "[ut][thread_pool]") { + auto thread_pool = vsag::SafeThreadPool::FactoryDefaultThreadPool(); + int data = 0; + std::vector> results; + std::mutex m; + thread_pool->SetPoolSize(4); + thread_pool->SetQueueSizeLimit(6); + int round = 10; + for (int i = 0; i < round; ++i) { + results.emplace_back(thread_pool->GeneralEnqueue( + [&data, &m](int i) -> int { + std::this_thread::sleep_for(std::chrono::seconds(1)); + std::lock_guard lock(m); + vsag::logger::info("current data:{}", data); + data++; + return i * i; + }, + i)); + } + thread_pool->WaitUntilEmpty(); + REQUIRE(data == round); +} diff --git a/tests/test_index_old.cpp b/tests/test_index_old.cpp index 0598db82..0950468a 100644 --- a/tests/test_index_old.cpp +++ b/tests/test_index_old.cpp @@ -524,7 +524,7 @@ TEST_CASE("remove vectors from the index", "[ft][index]") { vsag::Options::Instance().logger()->SetLevel(vsag::Logger::Level::kDEBUG); int64_t num_vectors = 1000; int64_t dim = 64; - auto index_name = GENERATE("fresh_hnsw", "diskann"); + auto index_name = GENERATE("fresh_hnsw", "diskann", "hnsw"); auto metric_type = GENERATE("cosine", "ip", "l2"); bool need_normalize = metric_type != std::string("cosine");