From 5b0df9588deda672b9aba83bd6ca94dbae96be92 Mon Sep 17 00:00:00 2001 From: Mariano Tepper Date: Fri, 3 Jan 2025 09:01:38 -0800 Subject: [PATCH] Make ravv usage thread-safe (#381) It solves the issue that @jkni caught when reviewing #374. It renders the parallel vector accesses during "encodeAll" thread safe. --- .../jvector/quantization/BinaryQuantization.java | 10 ++++++---- .../github/jbellis/jvector/quantization/PQVectors.java | 4 +++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/quantization/BinaryQuantization.java b/jvector-base/src/main/java/io/github/jbellis/jvector/quantization/BinaryQuantization.java index 5e956dd8..35635360 100644 --- a/jvector-base/src/main/java/io/github/jbellis/jvector/quantization/BinaryQuantization.java +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/quantization/BinaryQuantization.java @@ -64,14 +64,16 @@ public CompressedVectors createCompressedVectors(Object[] compressedVectors) { @Override public CompressedVectors encodeAll(RandomAccessVectorValues ravv, ForkJoinPool simdExecutor) { + var ravvCopy = ravv.threadLocalSupplier(); var cv = simdExecutor.submit(() -> IntStream.range(0, ravv.size()) .parallel() .mapToObj(i -> { - var vector = ravv.getVector(i); - return vector == null + var localRavv = ravvCopy.get(); + VectorFloat v = localRavv.getVector(i); + return v == null ? new long[compressedVectorSize() / Long.BYTES] - : encode(vector); - }) + : encode(v); + }) .toArray(long[][]::new)) .join(); return new ImmutableBQVectors(this, cv); diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/quantization/PQVectors.java b/jvector-base/src/main/java/io/github/jbellis/jvector/quantization/PQVectors.java index 1dfa554d..897bc610 100644 --- a/jvector-base/src/main/java/io/github/jbellis/jvector/quantization/PQVectors.java +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/quantization/PQVectors.java @@ -137,12 +137,14 @@ public static ImmutablePQVectors encodeAndBuild(ProductQuantization pq, int vect // Encode the vectors in parallel into the compressed data chunks // The changes are concurrent, but because they are coordinated and do not overlap, we can use parallel streams // and then we are guaranteed safe publication because we join the thread after completion. + var ravvCopy = ravv.threadLocalSupplier(); simdExecutor.submit(() -> IntStream.range(0, ravv.size()) .parallel() .forEach(ordinal -> { // Retrieve the slice and mutate it. + var localRavv = ravvCopy.get(); var slice = PQVectors.get(chunks, ordinal, vectorsPerChunk, pq.getSubspaceCount()); - var vector = ravv.getVector(ordinal); + var vector = localRavv.getVector(ordinal); if (vector != null) pq.encodeTo(vector, slice); else