Skip to content

Commit 5acf690

Browse files
committed
Adding unit tests for VSR management, bugfix for VSR close hierarchy
Signed-off-by: Raghuvansh Raj <raghraaj@amazon.com>
1 parent 928f358 commit 5acf690

File tree

7 files changed

+1568
-43
lines changed

7 files changed

+1568
-43
lines changed

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/ManagedVSR.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,16 @@ public void setRowCount(int rowCount) {
5959

6060
/**
6161
* Gets a field vector by name.
62+
* Only allowed when VSR is in ACTIVE state.
6263
*
6364
* @param fieldName Name of the field
6465
* @return FieldVector for the field, or null if not found
66+
* @throws IllegalStateException if VSR is not in ACTIVE state
6567
*/
6668
public FieldVector getVector(String fieldName) {
69+
if (state != VSRState.ACTIVE) {
70+
throw new IllegalStateException("Cannot access vector in VSR state: " + state + ". VSR must be ACTIVE to access vectors.");
71+
}
6772
return vsr.getVector(fieldName);
6873
}
6974

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRManager.java

Lines changed: 35 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import com.parquet.parquetdataformat.memory.ArrowBufferPool;
1414
import com.parquet.parquetdataformat.writer.ParquetDocumentInput;
1515
import org.apache.arrow.vector.FieldVector;
16-
import org.apache.arrow.vector.types.pojo.Field;
1716
import org.apache.arrow.vector.types.pojo.Schema;
1817
import org.apache.logging.log4j.LogManager;
1918
import org.apache.logging.log4j.Logger;
@@ -37,16 +36,16 @@
3736
* <li>{@link com.parquet.parquetdataformat.bridge.RustBridge} - Direct JNI calls to Rust backend</li>
3837
* </ul>
3938
*/
40-
public class VSRManager implements Closeable {
39+
public class VSRManager implements AutoCloseable {
40+
41+
private static final Logger logger = LogManager.getLogger(VSRManager.class);
42+
4143
private final AtomicReference<ManagedVSR> managedVSR = new AtomicReference<>();
42-
private Map<String, FieldVector> fieldVectorMap;
4344
private final Schema schema;
4445
private final String fileName;
4546
private final VSRPool vsrPool;
4647
private NativeParquetWriter writer;
4748

48-
private static final Logger logger = LogManager.getLogger(VSRManager.class);
49-
5049

5150
public VSRManager(String fileName, Schema schema, ArrowBufferPool arrowBufferPool) {
5251
this.fileName = fileName;
@@ -57,7 +56,7 @@ public VSRManager(String fileName, Schema schema, ArrowBufferPool arrowBufferPoo
5756

5857
// Get active VSR from pool
5958
this.managedVSR.set(vsrPool.getActiveVSR());
60-
initializeFieldVectorMap();
59+
6160
// Initialize writer lazily to avoid crashes
6261
initializeWriter();
6362
}
@@ -75,11 +74,7 @@ private void initializeWriter() {
7574
public WriteResult addToManagedVSR(ParquetDocumentInput document) throws IOException {
7675
ManagedVSR currentVSR = managedVSR.updateAndGet(vsr -> {
7776
if (vsr == null) {
78-
ManagedVSR newVSR = vsrPool.getActiveVSR();
79-
if (newVSR != null) {
80-
reinitializeFieldVectorMap();
81-
}
82-
return newVSR;
77+
return vsrPool.getActiveVSR();
8378
}
8479
return vsr;
8580
});
@@ -147,10 +142,29 @@ public void close() {
147142
writer.flush();
148143
writer.close();
149144
}
145+
146+
// Close VSR Pool - handle IllegalStateException specially
150147
vsrPool.close();
151148
managedVSR.set(null);
149+
150+
} catch (IllegalStateException e) {
151+
// Direct IllegalStateException - re-throw for business logic validation
152+
logger.error("Error during close for {}: {}", fileName, e.getMessage(), e);
153+
throw e;
154+
} catch (RuntimeException e) {
155+
// Check if this is a wrapped IllegalStateException from defensive cleanup
156+
Throwable cause = e.getCause();
157+
if (cause instanceof IllegalStateException) {
158+
// Re-throw the original IllegalStateException for business logic validation
159+
logger.error("Error during close for {}: {}", fileName, cause.getMessage(), cause);
160+
throw (IllegalStateException) cause;
161+
}
162+
// For other RuntimeExceptions, log and re-throw
163+
logger.error("Error during close for {}: {}", fileName, e.getMessage(), e);
164+
throw new RuntimeException("Failed to close VSRManager: " + e.getMessage(), e);
152165
} catch (Exception e) {
153166
logger.error("Error during close for {}: {}", fileName, e.getMessage(), e);
167+
throw new RuntimeException("Failed to close VSRManager: " + e.getMessage(), e);
154168
}
155169
}
156170

@@ -199,9 +213,6 @@ public void maybeRotateActiveVSR() throws IOException {
199213
}
200214
updateVSRAndReinitialize(oldVSR, newVSR);
201215

202-
// Reinitialize field vector map with new VSR
203-
reinitializeFieldVectorMap();
204-
205216
logger.debug("VSR rotation completed for {}, new active VSR: {}, row count: {}",
206217
fileName, newVSR.getId(), newVSR.getRowCount());
207218
}
@@ -239,28 +250,7 @@ private void checkAndHandleVSRRotation() throws IOException {
239250
* Atomically updates managedVSR and reinitializes field vector map.
240251
*/
241252
private void updateVSRAndReinitialize(ManagedVSR oldVSR, ManagedVSR newVSR) {
242-
if (managedVSR.compareAndSet(oldVSR, newVSR)) {
243-
reinitializeFieldVectorMap();
244-
}
245-
}
246-
247-
/**
248-
* Reinitializes the field vector map with the current managed VSR.
249-
* Called after VSR rotation to update vector references.
250-
*/
251-
private void reinitializeFieldVectorMap() {
252-
fieldVectorMap.clear();
253-
initializeFieldVectorMap();
254-
}
255-
256-
private void initializeFieldVectorMap() {
257-
fieldVectorMap = new HashMap<>();
258-
for (Field field : schema.getFields()) {
259-
String fieldName = field.getName();
260-
FieldVector fieldVector = managedVSR.get().getVector(fieldName);
261-
// Vector is already properly typed from ManagedVSR.getVector()
262-
fieldVectorMap.put(fieldName, fieldVector);
263-
}
253+
managedVSR.compareAndSet(oldVSR, newVSR);
264254
}
265255

266256
/**
@@ -271,4 +261,13 @@ private void initializeFieldVectorMap() {
271261
public ManagedVSR getActiveManagedVSR() {
272262
return managedVSR.get();
273263
}
264+
265+
/**
266+
* Gets the current frozen VSR for testing purposes.
267+
*
268+
* @return The current frozen VSR instance, or null if none exists
269+
*/
270+
public ManagedVSR getFrozenVSR() {
271+
return vsrPool.getFrozenVSR();
272+
}
274273
}

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRPool.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* in the Project Mustang design. Each ParquetWriter maintains a single ACTIVE VSR
1818
* for writing and a single FROZEN VSR for Rust handoff.
1919
*/
20-
public class VSRPool {
20+
public class VSRPool implements AutoCloseable {
2121

2222
private static final Logger logger = LogManager.getLogger(VSRPool.class);
2323

@@ -181,18 +181,45 @@ public void freezeAll() {
181181

182182
/**
183183
* Closes the pool and cleans up all resources.
184+
* Uses defensive cleanup to ensure resources are not orphaned if close operations fail.
184185
*/
186+
@Override
185187
public void close() {
186-
// Close active VSR
187-
ManagedVSR active = activeVSR.getAndSet(null);
188+
// Get references without clearing them yet - defensive cleanup approach
189+
ManagedVSR active = activeVSR.get();
190+
ManagedVSR frozen = frozenVSR.get();
191+
192+
Exception firstException = null;
193+
194+
// Try to close active VSR
188195
if (active != null) {
189-
active.close();
196+
try {
197+
active.close();
198+
activeVSR.set(null); // Only clear if successful
199+
} catch (Exception e) {
200+
firstException = e;
201+
// Don't set to null - leave reference so subsequent close attempts can retry
202+
}
190203
}
191204

192-
// Close frozen VSR
193-
ManagedVSR frozen = frozenVSR.getAndSet(null);
205+
// Try to close frozen VSR regardless of active VSR result
194206
if (frozen != null) {
195-
frozen.close();
207+
try {
208+
frozen.close();
209+
frozenVSR.set(null); // Only clear if successful
210+
} catch (Exception e) {
211+
if (firstException != null) {
212+
firstException.addSuppressed(e);
213+
} else {
214+
firstException = e;
215+
}
216+
// Don't set to null - leave reference so subsequent close attempts can retry
217+
}
218+
}
219+
220+
// Throw the most relevant exception after attempting all cleanup
221+
if (firstException != null) {
222+
throw new RuntimeException("VSRPool cleanup failed", firstException);
196223
}
197224
}
198225

0 commit comments

Comments
 (0)