diff --git a/plugins/engine-datafusion/jni/src/lib.rs b/plugins/engine-datafusion/jni/src/lib.rs index dcd4fd9a1c796..788a5e8651cca 100644 --- a/plugins/engine-datafusion/jni/src/lib.rs +++ b/plugins/engine-datafusion/jni/src/lib.rs @@ -210,7 +210,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createD } #[no_mangle] -pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_destroyReader( +pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_closeDatafusionReader( mut env: JNIEnv, _class: JClass, ptr: jlong diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionQueryJNI.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionQueryJNI.java index 8294bbeb9cf33..3431760695bf5 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionQueryJNI.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionQueryJNI.java @@ -8,6 +8,10 @@ package org.opensearch.datafusion; +import org.opensearch.index.engine.exec.FileMetadata; + +import java.util.Collection; + /** * JNI wrapper for DataFusion operations */ diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java index dfbdcb0667efa..1f32127486455 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java @@ -110,7 +110,7 @@ protected DatafusionSearcher acquireSearcherInternal(String source) { @Override protected void doClose() { try { - reader.decRef(); + datafusionReaderManager.release(reader); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReader.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReader.java index ec01a01b57720..352df8f21540b 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReader.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReader.java @@ -35,7 +35,7 @@ public class DatafusionReader implements Closeable { * The cache pointer. */ public long cachePtr; - private AtomicInteger refCount = new AtomicInteger(0); + private AtomicInteger refCount = new AtomicInteger(1); /** * Constructor @@ -56,7 +56,6 @@ public DatafusionReader(String directoryPath, Collection files) { System.out.println("Directory path: " + directoryPath); this.cachePtr = DataFusionQueryJNI.createDatafusionReader(directoryPath, fileNames); - incRef(); } /** @@ -95,8 +94,7 @@ public void close() throws IOException { if(cachePtr == -1L) { throw new IllegalStateException("Listing table has been already closed"); } - -// closeDatafusionReader(this.cachePtr); + closeDatafusionReader(cachePtr); this.cachePtr = -1; } } diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java index ba14055170dad..5040d58073616 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java @@ -64,7 +64,6 @@ public void afterRefresh(boolean didRefresh, CatalogSnapshot catalogSnapshot) th release(old); } this.current = new DatafusionReader(this.path, catalogSnapshot.getSearchableFiles(dataFormat)); - this.current.incRef(); } } } diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcher.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcher.java index c08bf0ca10f67..0437deb8fbf24 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcher.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcher.java @@ -8,8 +8,11 @@ package org.opensearch.datafusion.search; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.datafusion.DataFusionQueryJNI; +import org.opensearch.datafusion.DataFusionService; import org.opensearch.datafusion.core.DefaultRecordBatchStream; import org.opensearch.index.engine.EngineSearcher; import org.opensearch.search.aggregations.SearchResultsCollector; @@ -25,9 +28,11 @@ public class DatafusionSearcher implements EngineSearcher 1 for latest catalogSnapshot holder, 1 for search + assertEquals(2,getRefCount(reader)); + + assertEquals(2, reader.files.stream().toList().get(0).getFiles().size()); + assertNotEquals(-1, reader.cachePtr); + + searcher.close(); + // Assert RefCount 1 -> 1 for latest catalogSnapshot holder + assertEquals(1, getRefCount(reader)); + reader.close(); + assertEquals(-1, reader.getCachePtr()); + } + + /** Test that multiple searchers share the same reader instance for efficiency */ + public void testMultipleSearchersShareSameReader() throws IOException { + ShardPath shardPath = createShardPathWithResourceFiles("test-index", 0, "parquet_file_generation_0.parquet"); + DatafusionEngine engine = new DatafusionEngine(DataFormat.PARQUET, Collections.emptyList(), service, shardPath); + DatafusionReaderManager readerManager = engine.getReferenceManager(INTERNAL); + + RefreshResult refreshResult = new RefreshResult(); + WriterFileSet writerFileSet = new WriterFileSet(shardPath.getDataPath(), 1); + writerFileSet.add(shardPath.getDataPath() + "/parquet_file_generation_0.parquet"); + + refreshResult.add(getMockDataFormat(), List.of(writerFileSet)); + readerManager.afterRefresh(true, new CatalogSnapshot(refreshResult, 1)); + + DatafusionSearcher searcher1 = engine.acquireSearcher("test1"); + DatafusionSearcher searcher2 = engine.acquireSearcher("test2"); + + DatafusionReader reader = searcher1.getReader(); + // Both searchers should share the same reader instance + assertSame(searcher1.getReader(), searcher2.getReader()); + + searcher1.close(); + assertEquals(2, getRefCount(reader)); + searcher2.close(); + assertEquals(1, getRefCount(reader)); + reader.decRef(); + assertEquals(0,getRefCount(reader)); + assertEquals(-1, reader.getCachePtr()); + } + + /** Test that reader stays alive when only some searchers are closed (reference counting) */ + public void testReaderSurvivesPartialSearcherClose() throws IOException { + ShardPath shardPath = createShardPathWithResourceFiles("test-index", 0, "parquet_file_generation_0.parquet"); + DatafusionEngine engine = new DatafusionEngine(DataFormat.PARQUET, Collections.emptyList(), service, shardPath); + DatafusionReaderManager readerManager = engine.getReferenceManager(INTERNAL); + + RefreshResult refreshResult = new RefreshResult(); + WriterFileSet writerFileSet = new WriterFileSet(shardPath.getDataPath(), 1); + writerFileSet.add(shardPath.getDataPath() + "/parquet_file_generation_0.parquet"); + + refreshResult.add(getMockDataFormat(), List.of(writerFileSet)); + readerManager.afterRefresh(true, new CatalogSnapshot(refreshResult, 1)); + + DatafusionSearcher searcher1 = engine.acquireSearcher("test1"); + DatafusionSearcher searcher2 = engine.acquireSearcher("test2"); + DatafusionReader reader = searcher1.getReader(); + + // Close first searcher - reader should stay alive + searcher1.close(); + assertEquals(2,getRefCount(reader)); + assertNotEquals(-1, reader.cachePtr); + + // Close second searcher - reader should not be closed + searcher2.close(); + assertEquals(1,getRefCount(reader)); + assertNotEquals(-1, reader.cachePtr); + } + + /** Test that refresh creates a new reader with updated file list */ + public void testRefreshCreatesNewReader() throws IOException { + ShardPath shardPath = createShardPathWithResourceFiles("test-index", 0, "parquet_file_generation_2.parquet"); + DatafusionEngine engine = new DatafusionEngine(DataFormat.PARQUET, Collections.emptyList(), service, shardPath); + DatafusionReaderManager readerManager = engine.getReferenceManager(INTERNAL); + + // Initial refresh + RefreshResult refreshResult1 = new RefreshResult(); + WriterFileSet writerFileSet1 = new WriterFileSet(shardPath.getDataPath(), 1); + addFilesToShardPath(shardPath, "parquet_file_generation_0.parquet"); + writerFileSet1.add(shardPath.getDataPath() + "/parquet_file_generation_0.parquet"); + refreshResult1.add(getMockDataFormat(), List.of(writerFileSet1)); + readerManager.afterRefresh(true, new CatalogSnapshot(refreshResult1, 1)); + + DatafusionSearcher searcher1 = engine.acquireSearcher("test1"); + DatafusionReader reader1 = searcher1.getReader(); + assertEquals(2, getRefCount(reader1)); + + // Add new file and refresh + addFilesToShardPath(shardPath, "parquet_file_generation_1.parquet"); + RefreshResult refreshResult2 = new RefreshResult(); + WriterFileSet writerFileSet2 = new WriterFileSet(shardPath.getDataPath(), 2); + writerFileSet2.add(shardPath.getDataPath() + "/parquet_file_generation_0.parquet"); + writerFileSet2.add(shardPath.getDataPath() + "/parquet_file_generation_1.parquet"); + refreshResult2.add(getMockDataFormat(), List.of(writerFileSet2)); + readerManager.afterRefresh(true, new CatalogSnapshot(refreshResult2, 2)); + + DatafusionSearcher searcher2 = engine.acquireSearcher("test2"); + DatafusionReader reader2 = searcher2.getReader(); + + // Check refCount of initial Reader + assertEquals(1, getRefCount(reader1)); + assertEquals(2, getRefCount(reader2)); + + // Should have different readers + assertNotSame(reader1, reader2); + assertEquals(1, reader1.files.stream().toList().getFirst().getFiles().size()); + assertEquals(2, reader2.files.stream().toList().getFirst().getFiles().size()); + + searcher1.close(); + assertEquals(0, getRefCount(reader1)); + searcher2.close(); + assertEquals(1, getRefCount(reader2)); + } + + /** Test that calling decRef on an already closed reader throws IllegalStateException */ + public void testDecRefAfterCloseThrowsException() throws IOException { + ShardPath shardPath = createShardPathWithResourceFiles("test-index", 0, "parquet_file_generation_2.parquet"); + DatafusionEngine engine = new DatafusionEngine(DataFormat.PARQUET, Collections.emptyList(), service, shardPath); + DatafusionReaderManager readerManager = engine.getReferenceManager(INTERNAL); + + RefreshResult refreshResult = new RefreshResult(); + WriterFileSet writerFileSet = new WriterFileSet(shardPath.getDataPath(), 1); + writerFileSet.add(shardPath.getDataPath() + "/parquet_file_generation_2.parquet"); + refreshResult.add(getMockDataFormat(), List.of(writerFileSet)); + readerManager.afterRefresh(true, new CatalogSnapshot(refreshResult, 1)); + + DatafusionSearcher searcher = engine.acquireSearcher("test"); + DatafusionReader reader = searcher.getReader(); + + searcher.close(); + reader.decRef(); + assertEquals(-1, reader.cachePtr); + + // Calling decRef on closed reader should throw + assertThrows(IllegalStateException.class, reader::decRef); + } + +// R1 -> f1,f2,f3 + // S1 -> f1, f2, f3 + // S2 -> f1, f2, f3 + //R2 -> f2,f3 + //S3 -> f2,f3 + + public void testReaderClosesAfterSearchRelease() throws IOException { + Map finalRes = new HashMap<>(); + DatafusionSearcher datafusionSearcher = null; + + ShardPath shardPath = createShardPathWithResourceFiles("test-index", 0, "parquet_file_generation_2.parquet", "parquet_file_generation_1.parquet"); + + try { + DatafusionEngine engine = new DatafusionEngine(DataFormat.PARQUET, Collections.emptyList(), service, shardPath); + DatafusionReaderManager readerManager = engine.getReferenceManager(INTERNAL); + + RefreshResult refreshResult = new RefreshResult(); + WriterFileSet writerFileSet = new WriterFileSet(shardPath.getDataPath(), 1); + writerFileSet.add(shardPath.getDataPath() + "/parquet_file_generation_2.parquet"); + writerFileSet.add(shardPath.getDataPath() + "/parquet_file_generation_1.parquet"); + List writerFiles = List.of(writerFileSet); + + refreshResult.add(getMockDataFormat(), writerFiles); + readerManager.afterRefresh(true, new CatalogSnapshot(refreshResult, 1)); + + // DatafusionReader readerR1 = readerManager.acquire(); + DatafusionSearcher datafusionSearcherS1 = engine.acquireSearcher("Search"); + DatafusionReader readerR1 = datafusionSearcherS1.getReader(); + assertEquals(readerR1.files.size(), datafusionSearcherS1.getReader().files.size()); + + DatafusionSearcher datafusionSearcher1v2 = engine.acquireSearcher("Search"); + DatafusionReader readerR1v2 = datafusionSearcher1v2.getReader(); + assertEquals(readerR1v2.files.size(), datafusionSearcher1v2.getReader().files.size()); + + // Check if same reader is referenced by both Searches + assertEquals(readerR1v2, readerR1); + + addFilesToShardPath(shardPath, "parquet_file_generation_0.parquet"); + // now trigger refresh to have new Reader with F2, F3 + RefreshResult refreshResultR2 = new RefreshResult(); + WriterFileSet writerFileSet2 = new WriterFileSet(shardPath.getDataPath(), 2); + writerFileSet2.add(shardPath.getDataPath() + "/parquet_file_generation_1.parquet"); + writerFileSet2.add(shardPath.getDataPath() + "/parquet_file_generation_0.parquet"); + List writerFiles2 = List.of(writerFileSet2); + + refreshResultR2.add(getMockDataFormat(), writerFiles2); + readerManager.afterRefresh(true, new CatalogSnapshot(refreshResultR2, 2)); + + // now check if new Reader is created with F2, F3 + // DatafusionReader readerR2 = readerManager.acquire(); + DatafusionSearcher datafusionSearcherS2 = engine.acquireSearcher("Search"); + DatafusionReader readerR2 = datafusionSearcherS2.getReader(); + assertEquals(readerR2.files.size(), datafusionSearcherS2.getReader().files.size()); + + //now we close S1 and automatically R1 will be closed + datafusionSearcherS1.close(); + // 1 for SearcherS1v2 + assertEquals(1, getRefCount(readerR1)); + // 1 for SearcherS2 and 1 for CatalogSnapshot + assertEquals(2, getRefCount(readerR2)); + assertNotEquals(-1, readerR1.cachePtr); + datafusionSearcher1v2.close(); + assertEquals(-1, readerR1v2.cachePtr); + + assertThrows(IllegalStateException.class, () -> readerR1.decRef()); + datafusionSearcherS2.close(); + assertEquals(1, getRefCount(readerR2)); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + if (datafusionSearcher != null) { + datafusionSearcher.close(); + } + } + } + + /** Test end-to-end search functionality with substrait plan execution and result verification */ + public void testSearch() throws IOException { + + ShardPath shardPath = createShardPathWithResourceFiles("index-7", 0, "parquet_file_generation_0.parquet"); + DatafusionEngine engine = new DatafusionEngine(DataFormat.PARQUET, Collections.emptyList(), service, shardPath); + DatafusionReaderManager readerManager = engine.getReferenceManager(INTERNAL); + + // Initial refresh + RefreshResult refreshResult1 = new RefreshResult(); + WriterFileSet writerFileSet1 = new WriterFileSet(shardPath.getDataPath(), 1); + writerFileSet1.add(shardPath.getDataPath() + "/parquet_file_generation_0.parquet"); + refreshResult1.add(getMockDataFormat(), List.of(writerFileSet1)); + readerManager.afterRefresh(true, new CatalogSnapshot(refreshResult1, 1)); + + DatafusionSearcher searcher1 = engine.acquireSearcher("search"); + DatafusionReader reader1 = searcher1.getReader(); + + byte[] protoContent; + + try (InputStream is = getClass().getResourceAsStream("/substrait_plan_test.pb")) { + protoContent = is.readAllBytes(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + DatafusionQuery datafusionQuery = new DatafusionQuery("index-7", protoContent, new java.util.ArrayList<>()); + Map expectedResults = new HashMap<>(); + expectedResults.put("min", 2L); + expectedResults.put("max", 4L); + expectedResults.put("count()", 2L); + + verifySearchResults(searcher1,datafusionQuery,expectedResults); + + logger.info("AFTER REFRESH"); + + addFilesToShardPath(shardPath, "parquet_file_generation_1.parquet"); + RefreshResult refreshResult2 = new RefreshResult(); + WriterFileSet writerFileSet2 = new WriterFileSet(shardPath.getDataPath(), 2); + writerFileSet2.add(shardPath.getDataPath() + "/parquet_file_generation_1.parquet"); + refreshResult2.add(getMockDataFormat(), List.of(writerFileSet2)); + readerManager.afterRefresh(true, new CatalogSnapshot(refreshResult2, 2)); + + expectedResults = new HashMap<>(); + expectedResults.put("min", 4L); + expectedResults.put("max", 8L); + expectedResults.put("count()", 2L); + + DatafusionSearcher searcher2 = engine.acquireSearcher("test2"); + verifySearchResults(searcher2,datafusionQuery,expectedResults); + + DatafusionReader reader2 = searcher2.getReader(); + + // Should have different readers + assertNotSame(reader1, reader2); + assertEquals(1, reader1.files.stream().toList().getFirst().getFiles().size()); + assertEquals(1, reader2.files.stream().toList().getFirst().getFiles().size()); + + searcher1.close(); + assertEquals(-1,reader1.cachePtr); + searcher2.close(); + } + + // ========== Helper Methods ========== + + private int getRefCount(DatafusionReader reader) { + try { + java.lang.reflect.Field refCountField = DatafusionReader.class.getDeclaredField("refCount"); + refCountField.setAccessible(true); + return ((AtomicInteger) refCountField.get(reader)).get(); + } catch (Exception e) { + throw new RuntimeException("Failed to get ref count", e); + } + } + + private org.opensearch.index.engine.exec.DataFormat getMockDataFormat() { + return new org.opensearch.index.engine.exec.DataFormat() { + @Override + public Setting dataFormatSettings() { return null; } + + @Override + public Setting clusterLeveldataFormatSettings() { return null; } + + @Override + public String name() { return "parquet"; } + + @Override + public void configureStore() {} + }; + } + + private ShardPath createCustomShardPath(String indexName, int shardId) { + Index index = new Index(indexName, UUID.randomUUID().toString()); + ShardId shId = new ShardId(index, shardId); + Path dataPath = createTempDir().resolve("indices").resolve(index.getUUID()).resolve(String.valueOf(shardId)); + return new ShardPath(false, dataPath, dataPath, shId); + } + + private void addFilesToShardPath(ShardPath shardPath, String... fileNames) throws IOException { + for (String resourceFileName : fileNames) { + try (InputStream is = getClass().getResourceAsStream("/" + resourceFileName)) { + Path targetPath = shardPath.getDataPath().resolve(resourceFileName); + java.nio.file.Files.createDirectories(targetPath.getParent()); + if (is != null) { + java.nio.file.Files.copy(is, targetPath); + } else { + java.nio.file.Files.createFile(targetPath); + } + } + } + } + + private ShardPath createShardPathWithResourceFiles(String indexName, int shardId, String... resourceFileNames) throws IOException { + ShardPath shardPath = createCustomShardPath(indexName, shardId); + + for (String resourceFileName : resourceFileNames) { + try (InputStream is = getClass().getResourceAsStream("/" + resourceFileName)) { + Path targetPath = shardPath.getDataPath().resolve(resourceFileName); + java.nio.file.Files.createDirectories(targetPath.getParent()); + if (is != null) { + java.nio.file.Files.copy(is, targetPath); + } else { + java.nio.file.Files.createFile(targetPath); + } + } + } + + return shardPath; + } + + private void verifySearchResults(DatafusionSearcher searcher, DatafusionQuery datafusionQuery, Map expectedResults) throws IOException { + Map finalRes = new HashMap<>(); + Long streamPointer = searcher.search(datafusionQuery, service.getTokioRuntimePointer()); + + RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); + RecordBatchStream stream = new RecordBatchStream(streamPointer, service.getTokioRuntimePointer(), allocator); + + SearchResultsCollector collector = new SearchResultsCollector() { + @Override + public void collect(RecordBatchStream value) { + VectorSchemaRoot root = value.getVectorSchemaRoot(); + for (Field field : root.getSchema().getFields()) { + String filedName = field.getName(); + FieldVector fieldVector = root.getVector(filedName); + Object[] fieldValues = new Object[fieldVector.getValueCount()]; + for (int i = 0; i < fieldVector.getValueCount(); i++) { + fieldValues[i] = fieldVector.getObject(i); + } + finalRes.put(filedName, fieldValues); + } + } + }; + + while (stream.loadNextBatch().join()) { + collector.collect(stream); + } + + for (Map.Entry entry : finalRes.entrySet()) { + logger.info("{}: {}", entry.getKey(), java.util.Arrays.toString(entry.getValue())); + assertEquals(Long.valueOf(entry.getValue()[0].toString()), expectedResults.get(entry.getKey())); + } + } + + private byte[] readSubstraitPlanFromResources(String fileName) throws IOException { + try (InputStream is = getClass().getResourceAsStream("/" + fileName)) { + if (is == null) { + throw new IOException("Substrait plan file not found: " + fileName); + } + return is.readAllBytes(); + } + } +} + + + + + + diff --git a/plugins/engine-datafusion/src/test/resources/parquet_file_generation_0.parquet b/plugins/engine-datafusion/src/test/resources/parquet_file_generation_0.parquet new file mode 100644 index 0000000000000..ba936ae7b7de4 Binary files /dev/null and b/plugins/engine-datafusion/src/test/resources/parquet_file_generation_0.parquet differ diff --git a/plugins/engine-datafusion/src/test/resources/parquet_file_generation_1.parquet b/plugins/engine-datafusion/src/test/resources/parquet_file_generation_1.parquet new file mode 100644 index 0000000000000..2b4a7ac2e8bef Binary files /dev/null and b/plugins/engine-datafusion/src/test/resources/parquet_file_generation_1.parquet differ diff --git a/plugins/engine-datafusion/src/test/resources/parquet_file_generation_2.parquet b/plugins/engine-datafusion/src/test/resources/parquet_file_generation_2.parquet new file mode 100644 index 0000000000000..ba936ae7b7de4 Binary files /dev/null and b/plugins/engine-datafusion/src/test/resources/parquet_file_generation_2.parquet differ diff --git a/plugins/engine-datafusion/src/test/resources/substrait_plan_test.pb b/plugins/engine-datafusion/src/test/resources/substrait_plan_test.pb new file mode 100644 index 0000000000000..61e5597b10b04 Binary files /dev/null and b/plugins/engine-datafusion/src/test/resources/substrait_plan_test.pb differ