Skip to content

Commit 8c44afb

Browse files
committed
Removing atomic handling for ManagedVSR and cleaning up state transitions
Signed-off-by: Raghuvansh Raj <raghraaj@amazon.com>
1 parent c3a8415 commit 8c44afb

File tree

4 files changed

+98
-213
lines changed

4 files changed

+98
-213
lines changed
Lines changed: 88 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,40 @@
11
package com.parquet.parquetdataformat.vsr;
22

33
import com.parquet.parquetdataformat.bridge.ArrowExport;
4-
import org.apache.arrow.memory.ArrowBuf;
54
import org.apache.arrow.memory.BufferAllocator;
6-
import org.apache.arrow.memory.RootAllocator;
7-
import org.apache.arrow.vector.BigIntVector;
85
import org.apache.arrow.vector.VectorSchemaRoot;
96
import org.apache.arrow.vector.FieldVector;
107
import org.apache.arrow.c.ArrowArray;
118
import org.apache.arrow.c.ArrowSchema;
129
import org.apache.arrow.c.Data;
13-
14-
import java.util.concurrent.atomic.AtomicReference;
15-
import java.util.concurrent.locks.ReadWriteLock;
16-
import java.util.concurrent.locks.ReentrantReadWriteLock;
17-
18-
import static org.apache.arrow.vector.BitVectorHelper.byteIndex;
10+
import org.apache.arrow.vector.types.pojo.Schema;
1911

2012
/**
2113
* Managed wrapper around VectorSchemaRoot that handles state transitions
22-
* and provides thread-safe access for the ACTIVE/FROZEN lifecycle.
14+
* for the ACTIVE/FROZEN/CLOSED lifecycle with controlled access methods.
2315
*/
2416
public class ManagedVSR implements AutoCloseable {
2517

2618
private final String id;
2719
private final VectorSchemaRoot vsr;
2820
private final BufferAllocator allocator;
29-
private final AtomicReference<VSRState> state;
30-
private final ReadWriteLock lock;
31-
private final long createdTime;
21+
private VSRState state;
3222

3323

34-
public ManagedVSR(String id, VectorSchemaRoot vsr, BufferAllocator allocator) {
24+
public ManagedVSR(String id, Schema schema, BufferAllocator allocator) {
3525
this.id = id;
36-
this.vsr = vsr;
26+
this.vsr = VectorSchemaRoot.create(schema, allocator);
3727
this.allocator = allocator;
38-
this.state = new AtomicReference<>(VSRState.ACTIVE);
39-
this.lock = new ReentrantReadWriteLock();
40-
this.createdTime = System.currentTimeMillis();
41-
}
42-
43-
/**
44-
* Gets the underlying VectorSchemaRoot.
45-
* Should only be used when holding appropriate locks.
46-
*
47-
* @return VectorSchemaRoot instance
48-
*/
49-
public VectorSchemaRoot getVSR() {
50-
return vsr;
28+
this.state = VSRState.ACTIVE;
5129
}
5230

5331
/**
5432
* Gets the current row count in this VSR.
55-
* Thread-safe read operation.
5633
*
5734
* @return Number of rows currently in the VSR
5835
*/
5936
public int getRowCount() {
60-
lock.readLock().lock();
61-
try {
62-
return vsr.getRowCount();
63-
} finally {
64-
lock.readLock().unlock();
65-
}
37+
return vsr.getRowCount();
6638
}
6739

6840
/**
@@ -73,96 +45,111 @@ public int getRowCount() {
7345
* @throws IllegalStateException if VSR is not active or is immutable
7446
*/
7547
public void setRowCount(int rowCount) {
76-
lock.writeLock().lock();
77-
try {
78-
if (state.get() != VSRState.ACTIVE) {
79-
throw new IllegalStateException("Cannot modify VSR in state: " + state.get());
80-
}
81-
vsr.setRowCount(rowCount);
82-
} finally {
83-
lock.writeLock().unlock();
48+
if (state != VSRState.ACTIVE) {
49+
throw new IllegalStateException("Cannot modify VSR in state: " + state);
8450
}
51+
vsr.setRowCount(rowCount);
8552
}
8653

8754
/**
8855
* Gets a field vector by name.
89-
* Thread-safe read operation.
9056
*
9157
* @param fieldName Name of the field
9258
* @return FieldVector for the field, or null if not found
9359
*/
9460
public FieldVector getVector(String fieldName) {
95-
lock.readLock().lock();
96-
try {
97-
return vsr.getVector(fieldName);
98-
} finally {
99-
lock.readLock().unlock();
100-
}
61+
return vsr.getVector(fieldName);
10162
}
10263

10364
/**
10465
* Changes the state of this VSR.
10566
* Handles state transition logic and immutability.
67+
* This method is private to ensure controlled state transitions.
10668
*
10769
* @param newState New state to transition to
10870
*/
109-
public void setState(VSRState newState) {
110-
VSRState oldState = state.getAndSet(newState);
71+
private void setState(VSRState newState) {
72+
VSRState oldState = state;
73+
state = newState;
11174

11275
System.out.println(String.format(
11376
"[VSR] State transition: %s -> %s for VSR %s",
11477
oldState, newState, id));
11578
}
11679

80+
/**
81+
* Transitions the VSR from ACTIVE to FROZEN state.
82+
* This is the only way to freeze a VSR.
83+
*
84+
* @throws IllegalStateException if VSR is not in ACTIVE state
85+
*/
86+
public void moveToFrozen() {
87+
if (state != VSRState.ACTIVE) {
88+
throw new IllegalStateException(String.format(
89+
"Cannot freeze VSR %s: expected ACTIVE state but was %s", id, state));
90+
}
91+
setState(VSRState.FROZEN);
92+
}
93+
94+
/**
95+
* Transitions the VSR from FROZEN to CLOSED state.
96+
* This method is private and only called by close().
97+
*
98+
* @throws IllegalStateException if VSR is not in FROZEN state
99+
*/
100+
private void moveToClosed() {
101+
if (state != VSRState.FROZEN) {
102+
throw new IllegalStateException(String.format(
103+
"Cannot close VSR %s: expected FROZEN state but was %s", id, state));
104+
}
105+
setState(VSRState.CLOSED);
106+
107+
// Clean up resources
108+
if (vsr != null) {
109+
vsr.close();
110+
}
111+
if (allocator != null) {
112+
allocator.close();
113+
}
114+
}
115+
117116
/**
118117
* Gets the current state of this VSR.
119118
*
120119
* @return Current VSRState
121120
*/
122121
public VSRState getState() {
123-
return state.get();
122+
return state;
124123
}
125124

126125
/**
127126
* Exports this VSR to Arrow C Data Interface for Rust handoff.
128-
* Only allowed when VSR is FROZEN or FLUSHING.
127+
* Only allowed when VSR is FROZEN.
129128
*
130129
* @return ArrowExport containing ArrowArray and ArrowSchema
131130
* @throws IllegalStateException if VSR is not in correct state
132131
*/
133132
public ArrowExport exportToArrow() {
134-
VSRState currentState = state.get();
135-
if (currentState != VSRState.FROZEN &&
136-
currentState != VSRState.FLUSHING) {
137-
throw new IllegalStateException("Cannot export VSR in state: " + currentState);
133+
if (state != VSRState.FROZEN) {
134+
throw new IllegalStateException("Cannot export VSR in state: " + state + ". VSR must be FROZEN to export.");
138135
}
139136

140-
lock.readLock().lock();
141-
try {
142-
ArrowArray arrowArray = ArrowArray.allocateNew(allocator);
143-
ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator);
137+
ArrowArray arrowArray = ArrowArray.allocateNew(allocator);
138+
ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator);
144139

145-
// Export the VectorSchemaRoot to C Data Interface
146-
Data.exportVectorSchemaRoot(allocator, vsr, null, arrowArray, arrowSchema);
140+
// Export the VectorSchemaRoot to C Data Interface
141+
Data.exportVectorSchemaRoot(allocator, vsr, null, arrowArray, arrowSchema);
147142

148-
return new ArrowExport(arrowArray, arrowSchema);
149-
} finally {
150-
lock.readLock().unlock();
151-
}
143+
return new ArrowExport(arrowArray, arrowSchema);
152144
}
153145

154146
public ArrowExport exportSchema() {
155-
lock.readLock().lock();
156-
try {
157-
ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator);
147+
ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator);
158148

159-
// Export the VectorSchemaRoot to C Data Interface
160-
Data.exportSchema(allocator, vsr.getSchema(), null, arrowSchema);
149+
// Export the VectorSchemaRoot to C Data Interface
150+
Data.exportSchema(allocator, vsr.getSchema(), null, arrowSchema);
161151

162-
return new ArrowExport(null, arrowSchema);
163-
} finally {
164-
lock.readLock().unlock();
165-
}
152+
return new ArrowExport(null, arrowSchema);
166153
}
167154

168155
/**
@@ -171,8 +158,7 @@ public ArrowExport exportSchema() {
171158
* @return true if VSR cannot be modified
172159
*/
173160
public boolean isImmutable() {
174-
VSRState currentState = state.get();
175-
return currentState != VSRState.ACTIVE;
161+
return state != VSRState.ACTIVE;
176162
}
177163

178164

@@ -185,15 +171,6 @@ public String getId() {
185171
return id;
186172
}
187173

188-
/**
189-
* Gets the creation timestamp.
190-
*
191-
* @return Creation time in milliseconds
192-
*/
193-
public long getCreatedTime() {
194-
return createdTime;
195-
}
196-
197174
/**
198175
* Gets the associated BufferAllocator.
199176
*
@@ -205,55 +182,38 @@ public BufferAllocator getAllocator() {
205182

206183
/**
207184
* Closes this VSR and releases all resources.
185+
* This is the only way to transition a VSR to CLOSED state.
186+
* VSR must be in FROZEN state before it can be closed.
187+
*
188+
* @throws IllegalStateException if VSR is in ACTIVE state (must freeze first)
208189
*/
209190
@Override
210191
public void close() {
211-
lock.writeLock().lock();
212-
try {
213-
if (state.get() != VSRState.CLOSED) {
214-
state.set(VSRState.CLOSED);
215-
vsr.close();
216-
allocator.close();
217-
}
218-
} finally {
219-
lock.writeLock().unlock();
192+
// If already CLOSED, do nothing (idempotent)
193+
if (state == VSRState.CLOSED) {
194+
return;
195+
}
196+
197+
// If ACTIVE, must freeze first
198+
if (state == VSRState.ACTIVE) {
199+
throw new IllegalStateException(String.format(
200+
"Cannot close VSR %s: VSR is still ACTIVE. Must freeze VSR before closing.", id));
201+
}
202+
203+
// If FROZEN, transition to CLOSED
204+
if (state == VSRState.FROZEN) {
205+
moveToClosed();
206+
} else {
207+
// This should never happen with current states, but defensive programming
208+
throw new IllegalStateException(String.format(
209+
"Cannot close VSR %s: unexpected state %s", id, state));
220210
}
221211
}
222212

223213

224214
@Override
225215
public String toString() {
226216
return String.format("ManagedVSR{id='%s', state=%s, rows=%d, immutable=%s}",
227-
id, state.get(), getRowCount(), isImmutable());
228-
}
229-
230-
public static void main(String[] args) {
231-
RootAllocator allocator = new RootAllocator();
232-
BigIntVector vector = new BigIntVector("vector", allocator);
233-
vector.allocateNew(10);
234-
vector.set(0, 100); // Set position 0
235-
// vector.setNull(1);
236-
vector.set(2, 300); // Set position 2
237-
// Position 1 is not set!
238-
vector.setValueCount(3); // Claims vector has 3 elements
239-
240-
// Position 1 now contains undefined data
241-
// long value = vector.get(1); // Could be any value!
242-
System.out.println(readBit(vector.getValidityBuffer(), 0));
243-
System.out.println(readBit(vector.getValidityBuffer(), 1));
244-
System.out.println(readBit(vector.getValidityBuffer(), 2));
245-
System.out.println(readBit(vector.getValidityBuffer(), 3));
246-
}
247-
248-
public static byte readBit(ArrowBuf validityBuffer, long index) {
249-
// it can be observed that some logic is duplicate of the logic in setValidityBit.
250-
// this is because JIT cannot always remove the if branch in setValidityBit,
251-
// so we give a dedicated implementation for setting bits.
252-
final long byteIndex = byteIndex(index);
253-
254-
// the byte is promoted to an int, because according to Java specification,
255-
// bytes will be promoted to ints automatically, upon expression evaluation.
256-
// by promoting it manually, we avoid the unnecessary conversions.
257-
return validityBuffer.getByte(byteIndex);
217+
id, state, getRowCount(), isImmutable());
258218
}
259219
}

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRManager.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,9 @@ public String flush(FlushIn flushIn) throws IOException {
118118
}
119119

120120
// Transition VSR to FROZEN state before flushing
121-
managedVSR.setState(VSRState.FROZEN);
121+
managedVSR.moveToFrozen();
122122
System.out.println("[JAVA] Flushing " + managedVSR.getRowCount() + " rows");
123123

124-
// Transition to FLUSHING state
125-
managedVSR.setState(VSRState.FLUSHING);
126-
127124
// Direct native call - write the managed VSR data
128125
try (ArrowExport export = managedVSR.exportToArrow()) {
129126
RustBridge.write(fileName, export.getArrayAddress(), export.getSchemaAddress());
@@ -181,7 +178,6 @@ public void maybeRotateActiveVSR() throws IOException {
181178
" with " + frozenVSR.getRowCount() + " rows");
182179

183180
// Write the frozen VSR data immediately
184-
frozenVSR.setState(VSRState.FLUSHING);
185181
try (ArrowExport export = frozenVSR.exportToArrow()) {
186182
RustBridge.write(fileName, export.getArrayAddress(), export.getSchemaAddress());
187183
}

0 commit comments

Comments
 (0)