Skip to content

Commit f813cbc

Browse files
committed
Adding unit tests for VSR management, bugfix for VSR close hierarchy
Signed-off-by: Raghuvansh Raj <raghraaj@amazon.com>
1 parent 4c13cf5 commit f813cbc

File tree

7 files changed

+1561
-64
lines changed

7 files changed

+1561
-64
lines changed

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/ManagedVSR.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,16 @@ public void setRowCount(int rowCount) {
5959

6060
/**
6161
* Gets a field vector by name.
62+
* Only allowed when VSR is in ACTIVE state.
6263
*
6364
* @param fieldName Name of the field
6465
* @return FieldVector for the field, or null if not found
66+
* @throws IllegalStateException if VSR is not in ACTIVE state
6567
*/
6668
public FieldVector getVector(String fieldName) {
69+
if (state != VSRState.ACTIVE) {
70+
throw new IllegalStateException("Cannot access vector in VSR state: " + state + ". VSR must be ACTIVE to access vectors.");
71+
}
6772
return vsr.getVector(fieldName);
6873
}
6974

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRManager.java

Lines changed: 28 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,13 @@
1212
import com.parquet.parquetdataformat.bridge.RustBridge;
1313
import com.parquet.parquetdataformat.memory.ArrowBufferPool;
1414
import com.parquet.parquetdataformat.writer.ParquetDocumentInput;
15-
import org.apache.arrow.vector.FieldVector;
16-
import org.apache.arrow.vector.types.pojo.Field;
1715
import org.apache.arrow.vector.types.pojo.Schema;
1816
import org.apache.logging.log4j.LogManager;
1917
import org.apache.logging.log4j.Logger;
2018
import org.opensearch.index.engine.exec.FlushIn;
2119
import org.opensearch.index.engine.exec.WriteResult;
2220

2321
import java.io.IOException;
24-
import java.util.HashMap;
25-
import java.util.Map;
2622

2723
/**
2824
* Manages VectorSchemaRoot lifecycle with integrated memory management and native call wrappers.
@@ -35,12 +31,11 @@
3531
* <li>{@link RustBridge} - Direct JNI calls to Rust backend</li>
3632
* </ul>
3733
*/
38-
public class VSRManager {
34+
public class VSRManager implements AutoCloseable {
3935

4036
private static final Logger logger = LogManager.getLogger(VSRManager.class);
4137

4238
private ManagedVSR managedVSR;
43-
private Map<String, FieldVector> fieldVectorMap;
4439
private final Schema schema;
4540
private final String fileName;
4641
private final VSRPool vsrPool;
@@ -54,8 +49,7 @@ public VSRManager(String fileName, Schema schema, ArrowBufferPool arrowBufferPoo
5449

5550
// Get active VSR from pool
5651
this.managedVSR = vsrPool.getActiveVSR();
57-
initializeFieldVectorMap();
58-
// Initialize writer lazily to avoid crashes
52+
5953
initializeWriter();
6054
}
6155

@@ -80,7 +74,6 @@ public WriteResult addToManagedVSR(ParquetDocumentInput document) throws IOExcep
8074
if (managedVSR == null) {
8175
throw new IOException("No active VSR available");
8276
}
83-
reinitializeFieldVectorMap();
8477
}
8578

8679
// Ensure VSR is in ACTIVE state for modifications
@@ -136,6 +129,7 @@ public String flush(FlushIn flushIn) throws IOException {
136129
}
137130
}
138131

132+
@Override
139133
public void close() {
140134
try {
141135
// Direct native calls
@@ -146,12 +140,28 @@ public void close() {
146140
logger.warn("Failed to close/flush writer for {}: {}", fileName, e.getMessage(), e);
147141
}
148142

149-
// Close VSR Pool
143+
// Close VSR Pool - handle IllegalStateException specially
150144
vsrPool.close();
151145
managedVSR = null;
152146

147+
} catch (IllegalStateException e) {
148+
// Direct IllegalStateException - re-throw for business logic validation
149+
logger.error("Error during close for {}: {}", fileName, e.getMessage(), e);
150+
throw e;
151+
} catch (RuntimeException e) {
152+
// Check if this is a wrapped IllegalStateException from defensive cleanup
153+
Throwable cause = e.getCause();
154+
if (cause instanceof IllegalStateException) {
155+
// Re-throw the original IllegalStateException for business logic validation
156+
logger.error("Error during close for {}: {}", fileName, cause.getMessage(), cause);
157+
throw (IllegalStateException) cause;
158+
}
159+
// For other RuntimeExceptions, log and re-throw
160+
logger.error("Error during close for {}: {}", fileName, e.getMessage(), e);
161+
throw new RuntimeException("Failed to close VSRManager: " + e.getMessage(), e);
153162
} catch (Exception e) {
154163
logger.error("Error during close for {}: {}", fileName, e.getMessage(), e);
164+
throw new RuntimeException("Failed to close VSRManager: " + e.getMessage(), e);
155165
}
156166
}
157167

@@ -198,9 +208,6 @@ public void maybeRotateActiveVSR() throws IOException {
198208
throw new IOException("No active VSR available after rotation");
199209
}
200210

201-
// Reinitialize field vector map with new VSR
202-
reinitializeFieldVectorMap();
203-
204211
logger.debug("VSR rotation completed for {}, new active VSR: {}, row count: {}",
205212
fileName, managedVSR.getId(), managedVSR.getRowCount());
206213
}
@@ -211,56 +218,20 @@ public void maybeRotateActiveVSR() throws IOException {
211218
}
212219

213220
/**
214-
* Checks if VSR rotation is needed based on row count and memory pressure.
215-
* If rotation occurs, updates the managed VSR reference and reinitializes field vectors.
221+
* Gets the current active ManagedVSR for document input creation.
216222
*
217-
* @deprecated Use handleVSRRotationAfterAddToManagedVSR() instead for safer rotation after document processing
218-
*/
219-
@Deprecated
220-
private void checkAndHandleVSRRotation() throws IOException {
221-
// Get active VSR from pool - this will trigger rotation if needed
222-
ManagedVSR currentActive = vsrPool.getActiveVSR();
223-
224-
// Check if we got a different VSR (rotation occurred)
225-
if (currentActive != managedVSR) {
226-
logger.debug("VSR rotation detected for {}, updating references", fileName);
227-
228-
// Update the managed VSR reference
229-
managedVSR = currentActive;
230-
231-
// Reinitialize field vector map with new VSR
232-
reinitializeFieldVectorMap();
233-
234-
// Note: Writer initialization is not needed per VSR as it's per file
235-
logger.debug("VSR rotation completed for {}, new row count: {}", fileName, managedVSR.getRowCount());
236-
}
237-
}
238-
239-
/**
240-
* Reinitializes the field vector map with the current managed VSR.
241-
* Called after VSR rotation to update vector references.
223+
* @return The current managed VSR instance
242224
*/
243-
private void reinitializeFieldVectorMap() {
244-
fieldVectorMap.clear();
245-
initializeFieldVectorMap();
246-
}
247-
248-
private void initializeFieldVectorMap() {
249-
fieldVectorMap = new HashMap<>();
250-
for (Field field : schema.getFields()) {
251-
String fieldName = field.getName();
252-
FieldVector fieldVector = managedVSR.getVector(fieldName);
253-
// Vector is already properly typed from ManagedVSR.getVector()
254-
fieldVectorMap.put(fieldName, fieldVector);
255-
}
225+
public ManagedVSR getActiveManagedVSR() {
226+
return managedVSR;
256227
}
257228

258229
/**
259-
* Gets the current active ManagedVSR for document input creation.
230+
* Gets the current frozen VSR for testing purposes.
260231
*
261-
* @return The current managed VSR instance
232+
* @return The current frozen VSR instance, or null if none exists
262233
*/
263-
public ManagedVSR getActiveManagedVSR() {
264-
return managedVSR;
234+
public ManagedVSR getFrozenVSR() {
235+
return vsrPool.getFrozenVSR();
265236
}
266237
}

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRPool.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* in the Project Mustang design. Each ParquetWriter maintains a single ACTIVE VSR
1818
* for writing and a single FROZEN VSR for Rust handoff.
1919
*/
20-
public class VSRPool {
20+
public class VSRPool implements AutoCloseable {
2121

2222
private static final Logger logger = LogManager.getLogger(VSRPool.class);
2323

@@ -181,18 +181,45 @@ public void freezeAll() {
181181

182182
/**
183183
* Closes the pool and cleans up all resources.
184+
* Uses defensive cleanup to ensure resources are not orphaned if close operations fail.
184185
*/
186+
@Override
185187
public void close() {
186-
// Close active VSR
187-
ManagedVSR active = activeVSR.getAndSet(null);
188+
// Get references without clearing them yet - defensive cleanup approach
189+
ManagedVSR active = activeVSR.get();
190+
ManagedVSR frozen = frozenVSR.get();
191+
192+
Exception firstException = null;
193+
194+
// Try to close active VSR
188195
if (active != null) {
189-
active.close();
196+
try {
197+
active.close();
198+
activeVSR.set(null); // Only clear if successful
199+
} catch (Exception e) {
200+
firstException = e;
201+
// Don't set to null - leave reference so subsequent close attempts can retry
202+
}
190203
}
191204

192-
// Close frozen VSR
193-
ManagedVSR frozen = frozenVSR.getAndSet(null);
205+
// Try to close frozen VSR regardless of active VSR result
194206
if (frozen != null) {
195-
frozen.close();
207+
try {
208+
frozen.close();
209+
frozenVSR.set(null); // Only clear if successful
210+
} catch (Exception e) {
211+
if (firstException != null) {
212+
firstException.addSuppressed(e);
213+
} else {
214+
firstException = e;
215+
}
216+
// Don't set to null - leave reference so subsequent close attempts can retry
217+
}
218+
}
219+
220+
// Throw the most relevant exception after attempting all cleanup
221+
if (firstException != null) {
222+
throw new RuntimeException("VSRPool cleanup failed", firstException);
196223
}
197224
}
198225

0 commit comments

Comments
 (0)