Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package com.parquet.parquetdataformat.vsr;

import com.parquet.parquetdataformat.bridge.ArrowExport;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.c.ArrowArray;
Expand All @@ -13,15 +10,12 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static org.apache.arrow.vector.BitVectorHelper.byteIndex;
import org.apache.arrow.vector.types.pojo.Schema;

/**
* Managed wrapper around VectorSchemaRoot that handles state transitions
* and provides thread-safe access for the ACTIVE/FROZEN lifecycle.
* for the ACTIVE/FROZEN/CLOSED lifecycle with controlled access methods.
*/
public class ManagedVSR implements AutoCloseable {

Expand All @@ -30,43 +24,23 @@ public class ManagedVSR implements AutoCloseable {
private final String id;
private final VectorSchemaRoot vsr;
private final BufferAllocator allocator;
private final AtomicReference<VSRState> state;
private final ReadWriteLock lock;
private final long createdTime;
private VSRState state;


public ManagedVSR(String id, VectorSchemaRoot vsr, BufferAllocator allocator) {
public ManagedVSR(String id, Schema schema, BufferAllocator allocator) {
this.id = id;
this.vsr = vsr;
this.vsr = VectorSchemaRoot.create(schema, allocator);
this.allocator = allocator;
this.state = new AtomicReference<>(VSRState.ACTIVE);
this.lock = new ReentrantReadWriteLock();
this.createdTime = System.currentTimeMillis();
}

/**
* Gets the underlying VectorSchemaRoot.
* Should only be used when holding appropriate locks.
*
* @return VectorSchemaRoot instance
*/
public VectorSchemaRoot getVSR() {
return vsr;
this.state = VSRState.ACTIVE;
}

/**
* Gets the current row count in this VSR.
* Thread-safe read operation.
*
* @return Number of rows currently in the VSR
*/
public int getRowCount() {
lock.readLock().lock();
try {
return vsr.getRowCount();
} finally {
lock.readLock().unlock();
}
return vsr.getRowCount();
}

/**
Expand All @@ -77,94 +51,114 @@ public int getRowCount() {
* @throws IllegalStateException if VSR is not active or is immutable
*/
public void setRowCount(int rowCount) {
lock.writeLock().lock();
try {
if (state.get() != VSRState.ACTIVE) {
throw new IllegalStateException("Cannot modify VSR in state: " + state.get());
}
vsr.setRowCount(rowCount);
} finally {
lock.writeLock().unlock();
if (state != VSRState.ACTIVE) {
throw new IllegalStateException("Cannot modify VSR in state: " + state);
}
vsr.setRowCount(rowCount);
}

/**
* Gets a field vector by name.
* Thread-safe read operation.
* Only allowed when VSR is in ACTIVE state.
*
* @param fieldName Name of the field
* @return FieldVector for the field, or null if not found
* @throws IllegalStateException if VSR is not in ACTIVE state
*/
public FieldVector getVector(String fieldName) {
lock.readLock().lock();
try {
return vsr.getVector(fieldName);
} finally {
lock.readLock().unlock();
if (state != VSRState.ACTIVE) {
throw new IllegalStateException("Cannot access vector in VSR state: " + state + ". VSR must be ACTIVE to access vectors.");
}
return vsr.getVector(fieldName);
}

/**
* Changes the state of this VSR.
* Handles state transition logic and immutability.
* This method is private to ensure controlled state transitions.
*
* @param newState New state to transition to
*/
public void setState(VSRState newState) {
VSRState oldState = state.getAndSet(newState);
private void setState(VSRState newState) {
VSRState oldState = state;
state = newState;

logger.debug("State transition: {} -> {} for VSR {}", oldState, newState, id);
}

/**
* Transitions the VSR from ACTIVE to FROZEN state.
* This is the only way to freeze a VSR.
*
* @throws IllegalStateException if VSR is not in ACTIVE state
*/
public void moveToFrozen() {
if (state != VSRState.ACTIVE) {
throw new IllegalStateException(String.format(
"Cannot freeze VSR %s: expected ACTIVE state but was %s", id, state));
}
setState(VSRState.FROZEN);
}

/**
* Transitions the VSR from FROZEN to CLOSED state.
* This method is private and only called by close().
*
* @throws IllegalStateException if VSR is not in FROZEN state
*/
private void moveToClosed() {
if (state != VSRState.FROZEN) {
throw new IllegalStateException(String.format(
"Cannot close VSR %s: expected FROZEN state but was %s", id, state));
}
setState(VSRState.CLOSED);

// Clean up resources
if (vsr != null) {
vsr.close();
}
if (allocator != null) {
allocator.close();
}
}

/**
* Gets the current state of this VSR.
*
* @return Current VSRState
*/
public VSRState getState() {
return state.get();
return state;
}

/**
* Exports this VSR to Arrow C Data Interface for Rust handoff.
* Only allowed when VSR is FROZEN or FLUSHING.
* Only allowed when VSR is FROZEN.
*
* @return ArrowExport containing ArrowArray and ArrowSchema
* @throws IllegalStateException if VSR is not in correct state
*/
public ArrowExport exportToArrow() {
VSRState currentState = state.get();
if (currentState != VSRState.FROZEN &&
currentState != VSRState.FLUSHING) {
throw new IllegalStateException("Cannot export VSR in state: " + currentState);
if (state != VSRState.FROZEN) {
throw new IllegalStateException("Cannot export VSR in state: " + state + ". VSR must be FROZEN to export.");
}

lock.readLock().lock();
try {
ArrowArray arrowArray = ArrowArray.allocateNew(allocator);
ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator);
ArrowArray arrowArray = ArrowArray.allocateNew(allocator);
ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator);

// Export the VectorSchemaRoot to C Data Interface
Data.exportVectorSchemaRoot(allocator, vsr, null, arrowArray, arrowSchema);
// Export the VectorSchemaRoot to C Data Interface
Data.exportVectorSchemaRoot(allocator, vsr, null, arrowArray, arrowSchema);

return new ArrowExport(arrowArray, arrowSchema);
} finally {
lock.readLock().unlock();
}
return new ArrowExport(arrowArray, arrowSchema);
}

public ArrowExport exportSchema() {
lock.readLock().lock();
try {
ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator);
ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator);

// Export the VectorSchemaRoot to C Data Interface
Data.exportSchema(allocator, vsr.getSchema(), null, arrowSchema);
// Export the VectorSchemaRoot to C Data Interface
Data.exportSchema(allocator, vsr.getSchema(), null, arrowSchema);

return new ArrowExport(null, arrowSchema);
} finally {
lock.readLock().unlock();
}
return new ArrowExport(null, arrowSchema);
}

/**
Expand All @@ -173,8 +167,7 @@ public ArrowExport exportSchema() {
* @return true if VSR cannot be modified
*/
public boolean isImmutable() {
VSRState currentState = state.get();
return currentState != VSRState.ACTIVE;
return state != VSRState.ACTIVE;
}


Expand All @@ -187,15 +180,6 @@ public String getId() {
return id;
}

/**
* Gets the creation timestamp.
*
* @return Creation time in milliseconds
*/
public long getCreatedTime() {
return createdTime;
}

/**
* Gets the associated BufferAllocator.
*
Expand All @@ -207,25 +191,38 @@ public BufferAllocator getAllocator() {

/**
* Closes this VSR and releases all resources.
* This is the only way to transition a VSR to CLOSED state.
* VSR must be in FROZEN state before it can be closed.
*
* @throws IllegalStateException if VSR is in ACTIVE state (must freeze first)
*/
@Override
public void close() {
lock.writeLock().lock();
try {
if (state.get() != VSRState.CLOSED) {
state.set(VSRState.CLOSED);
vsr.close();
allocator.close();
}
} finally {
lock.writeLock().unlock();
// If already CLOSED, do nothing (idempotent)
if (state == VSRState.CLOSED) {
return;
}

// If ACTIVE, must freeze first
if (state == VSRState.ACTIVE) {
throw new IllegalStateException(String.format(
"Cannot close VSR %s: VSR is still ACTIVE. Must freeze VSR before closing.", id));
}

// If FROZEN, transition to CLOSED
if (state == VSRState.FROZEN) {
moveToClosed();
} else {
// This should never happen with current states, but defensive programming
throw new IllegalStateException(String.format(
"Cannot close VSR %s: unexpected state %s", id, state));
Comment on lines +201 to +218
Copy link
Contributor

@Bukhtawar Bukhtawar Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we need a dedicated centralised class VSRState for state machine validation

    private static final Set<VSRState> ACTIVE_VALID_TRANSITIONS = EnumSet.of(FROZEN, CLOSED);
    private static final Set<VSRState> FROZEN_VALID_TRANSITIONS = EnumSet.of(FLUSHING, CLOSED);
    private static final Set<VSRState> FLUSHING_VALID_TRANSITIONS = EnumSet.of(CLOSED);
    private static final Set<VSRState> CLOSED_VALID_TRANSITIONS = EnumSet.noneOf(VSRState.class);


    private Set<VSRState> getValidTransitions() {
        switch (this) {
            case ACTIVE:
                return ACTIVE_VALID_TRANSITIONS;
            case FROZEN:
                return FROZEN_VALID_TRANSITIONS;
            case FLUSHING:
                return FLUSHING_VALID_TRANSITIONS;
            case CLOSED:
                return CLOSED_VALID_TRANSITIONS;
            default:
                throw new IllegalStateException("Unknown state: " + this);
        }
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see a usecase for the FLUSHING state as of now, hence I removed it. Between ACTIVE, FROZEN and CLOSED, we have only a small set of valid transitions, do you think we need the FLUSHING state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if we do introduce a FLUSHING state, should we allow FROZEN to go to CLOSED? Ideally every FROZEN VSR should first go to FLUSHING state then CLOSED.

}
}


@Override
public String toString() {
return String.format("ManagedVSR{id='%s', state=%s, rows=%d, immutable=%s}",
id, state.get(), getRowCount(), isImmutable());
id, state, getRowCount(), isImmutable());
}
}
Loading
Loading