-
Notifications
You must be signed in to change notification settings - Fork 2.3k
DataFusionReaderManager UTs #19910
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/datafusion
Are you sure you want to change the base?
DataFusionReaderManager UTs #19910
Conversation
|
❌ Gradle check result for 7d1dada: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
| assertEquals(2, reader.files.stream().toList().get(0).getFiles().size()); | ||
| assertNotEquals(-1, reader.cachePtr); | ||
|
|
||
| searcher.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we assert the refcount please in all tests ? I do see that in some tests its added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also can we assert for -1 when we expect cache ptr to be cleared ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Added this check in the test testDecRefAfterCloseThrowsException.
Let me add it to all other tests as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| throw new IllegalStateException("Listing table has been already closed"); | ||
| } | ||
|
|
||
| // closeDatafusionReader(this.cachePtr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did we remove this, shouldn't we close the datafusion reader / clear the listing table cache ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rightt. Let me add it back
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
|
||
| logger.info("AFTER REFRESH"); | ||
|
|
||
| addFilesToShardPath(shardPath, "parquet_file_generation_1.parquet"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test in general is awesome - this covers complete flow.
Can we also add tests where the refresh removes files too and can we check cases where the files removed are part of search / their ref count once search completes etc ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This test case does the similar thing.
Initial Refresh -> F1
We check for the search results
Second Refresh -> has a new Catalog Snapshot with file F2
We verify if Search Result shows the stats only for the new file
Im assuming CatalogSnapshot refers to latest checkpoint of files and not the diff. Correct me if I am wrong here.
Let me add the refCount checks as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| System.out.println("Directory path: " + directoryPath); | ||
|
|
||
| this.cachePtr = DataFusionQueryJNI.createDatafusionReader(directoryPath, fileNames); | ||
| incRef(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we are removing it? When we are creating it, we will want to keep it 1 right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are already incrementing it from ReaderManager where the call is made to this constructor.
We can either increment when
- ReaderManager calls to create DataFusionReader (assuming Reader is only initialized by readerManager)
-> this way it is only incremented after ReaderManager is updated with Reader reference - Within constructor of Reader.
My assumption was more around Reader increment being associated with ReaderManager
Any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.current = new DatafusionReader(this.path, catalogSnapshot.getSearchableFiles(dataFormat)); this.current.incRef();
I think this is not right, since this kind of does not enforce current ref count to be 1 and is more error prone. The manager can incRef and decRef based on acquire/release but the initialisation with 1 ref count should be handled by the Reader itself. This way user don't have to worry about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or we can do something like this which is being done in IndexReader:
public abstract sealed class IndexReader implements Closeable permits CompositeReader, LeafReader {
private boolean closed = false;
private boolean closedByChild = false;
private final AtomicInteger refCount = new AtomicInteger(1);
private final Set<IndexReader> parentReaders = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap()));
Let's initialise it with 1 always.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted. Let me initialize the refCount from 1
|
❌ Gradle check result for 400f808: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
| // ========== Test Cases ========== | ||
|
|
||
| /** Test that a reader is created with correct file count and cache pointer after initial refresh */ | ||
| public void testInitialReaderCreation() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to add an case where:
- Search started with searcher1
- Refresh happened --> new searcher is created
- Make sure searcher 1 is cleared post the search is completed
We can validate the ref counting during all of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now as we are relying on UTs ,
Assuming Searcher to be closed when search is completed -> we are manually calling close() on the Searcher.
I have covered the case in testRefreshCreatesNewReader.
If we also want such case, Im thinking below would be the flow which would internally acquire and release searcherSuppliers. Search API triggered -> Searcher Initialization
I believe we can test that scenario in a better way with an IT. Let me know if we need to cover it as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, let's cover it in IT. we will need some IT as well anyways
Description
Adding UTs for :
Related Issues
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.