Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugins/engine-datafusion/jni/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createD
}

#[no_mangle]
pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_destroyReader(
pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_closeDatafusionReader(
mut env: JNIEnv,
_class: JClass,
ptr: jlong
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ protected DatafusionSearcher acquireSearcherInternal(String source) {
@Override
protected void doClose() {
try {
reader.decRef();
datafusionReaderManager.release(reader);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public DatafusionReader(String directoryPath, Collection<WriterFileSet> files) {
System.out.println("Directory path: " + directoryPath);

this.cachePtr = DataFusionQueryJNI.createDatafusionReader(directoryPath, fileNames);
incRef();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we are removing it? When we are creating it, we will want to keep it 1 right?

Copy link
Contributor Author

@abhita abhita Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are already incrementing it from ReaderManager where the call is made to this constructor.
We can either increment when

  1. ReaderManager calls to create DataFusionReader (assuming Reader is only initialized by readerManager)
    -> this way it is only incremented after ReaderManager is updated with Reader reference
  2. Within constructor of Reader.

My assumption was more around Reader increment being associated with ReaderManager
Any thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

       this.current = new DatafusionReader(this.path, catalogSnapshot.getSearchableFiles(dataFormat));
       this.current.incRef();

I think this is not right, since this kind of does not enforce current ref count to be 1 and is more error prone. The manager can incRef and decRef based on acquire/release but the initialisation with 1 ref count should be handled by the Reader itself. This way user don't have to worry about it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can do something like this which is being done in IndexReader:

public abstract sealed class IndexReader implements Closeable permits CompositeReader, LeafReader {
    private boolean closed = false;
    private boolean closedByChild = false;
    private final AtomicInteger refCount = new AtomicInteger(1);
    private final Set<IndexReader> parentReaders = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap()));

Let's initialise it with 1 always.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted. Let me initialize the refCount from 1

}

/**
Expand Down Expand Up @@ -95,8 +94,7 @@ public void close() throws IOException {
if(cachePtr == -1L) {
throw new IllegalStateException("Listing table has been already closed");
}

// closeDatafusionReader(this.cachePtr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we remove this, shouldn't we close the datafusion reader / clear the listing table cache ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rightt. Let me add it back

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

closeDatafusionReader(cachePtr);
this.cachePtr = -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.datafusion.DataFusionQueryJNI;
import org.opensearch.datafusion.DataFusionService;
import org.opensearch.datafusion.core.DefaultRecordBatchStream;
import org.opensearch.index.engine.EngineSearcher;
import org.opensearch.search.aggregations.SearchResultsCollector;
Expand All @@ -20,15 +19,16 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class DatafusionSearcher implements EngineSearcher<DatafusionQuery, RecordBatchStream> {
private final String source;
private DatafusionReader reader;
private Closeable closeable;

public DatafusionSearcher(String source, DatafusionReader reader, Closeable close) {
this.source = source;
this.reader = reader;
this.closeable = close;
}

@Override
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -94,64 +94,4 @@ public void testGetVersion() {
// // Verify context is gone
// assertNull(service.getContext(defaultContext.getContext()));
// }

// TO run update proper directory path for generation-1-optimized.parquet file in
// this.datafusionReaderManager = new DatafusionReaderManager("TODO://FigureOutPath", formatCatalogSnapshot);
public void testQueryPhaseExecutor() throws IOException {
Map<String, Object[]> finalRes = new HashMap<>();
DatafusionSearcher datafusionSearcher = null;
try {
DatafusionEngine engine = new DatafusionEngine(DataFormat.CSV, List.of(new FileMetadata(new TextDF(), "hits_data.parquet")), service);
datafusionSearcher = engine.acquireSearcher("Search");


byte[] protoContent;

try (InputStream is = getClass().getResourceAsStream("/substrait_plan.pb")) {
protoContent = is.readAllBytes();
} catch (IOException e) {
throw new RuntimeException(e);
}

long streamPointer = datafusionSearcher.search(new DatafusionQuery("test-index",protoContent, new ArrayList<>()), service.getTokioRuntimePointer());
RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
RecordBatchStream stream = new RecordBatchStream(streamPointer, service.getTokioRuntimePointer() , allocator);

// We can have some collectors passed like this which can collect the results and convert to InternalAggregation
// Is the possible? need to check

SearchResultsCollector<RecordBatchStream> collector = new SearchResultsCollector<RecordBatchStream>() {
@Override
public void collect(RecordBatchStream value) {
VectorSchemaRoot root = value.getVectorSchemaRoot();
for (Field field : root.getSchema().getFields()) {
String filedName = field.getName();
FieldVector fieldVector = root.getVector(filedName);
Object[] fieldValues = new Object[fieldVector.getValueCount()];
for (int i = 0; i < fieldVector.getValueCount(); i++) {
fieldValues[i] = fieldVector.getObject(i);
}
finalRes.put(filedName, fieldValues);
}
}
};

while (stream.loadNextBatch().join()) {
collector.collect(stream);
}

logger.info("Final Results:");
for (Map.Entry<String, Object[]> entry : finalRes.entrySet()) {
logger.info("{}: {}", entry.getKey(), java.util.Arrays.toString(entry.getValue()));
}

} catch (Exception exception) {
logger.error("Failed to execute Substrait query plan", exception);
}
finally {
if(datafusionSearcher != null) {
datafusionSearcher.close();
}
}
}
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading