Skip to content

Commit

Permalink
Add Iterator::Refresh(Snapshot*) to RocksJava
Browse files Browse the repository at this point in the history
  • Loading branch information
adamretter committed Dec 14, 2023
1 parent c75fadc commit d5a635f
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 1 deletion.
19 changes: 19 additions & 0 deletions java/rocksjni/iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,25 @@ void Java_org_rocksdb_RocksIterator_refresh0(JNIEnv* env, jobject /*jobj*/,
ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s);
}

/*
* Class: org_rocksdb_RocksIterator
* Method: refresh1
* Signature: (JJ)V
*/
void Java_org_rocksdb_RocksIterator_refresh1(JNIEnv* env, jobject /*jobj*/,
jlong handle,
jlong snapshot_handle) {
auto* it = reinterpret_cast<ROCKSDB_NAMESPACE::Iterator*>(handle);
auto* snapshot = reinterpret_cast<ROCKSDB_NAMESPACE::Snapshot*>(snapshot_handle);
ROCKSDB_NAMESPACE::Status s = it->Refresh(snapshot);

if (s.ok()) {
return;
}

ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s);
}

/*
* Class: org_rocksdb_RocksIterator
* Method: seek0
Expand Down
20 changes: 20 additions & 0 deletions java/rocksjni/sst_file_reader_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -371,3 +371,23 @@ void Java_org_rocksdb_SstFileReaderIterator_refresh0(JNIEnv* env,

ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s);
}

/*
* Class: org_rocksdb_SstFileReaderIterator
* Method: refresh1
* Signature: (JJ)V
*/
void Java_org_rocksdb_SstFileReaderIterator_refresh1(JNIEnv* env,
jobject /*jobj*/,
jlong handle,
jlong snapshot_handle) {
auto* it = reinterpret_cast<ROCKSDB_NAMESPACE::Iterator*>(handle);
auto* snapshot = reinterpret_cast<ROCKSDB_NAMESPACE::Snapshot*>(snapshot_handle);
ROCKSDB_NAMESPACE::Status s = it->Refresh(snapshot);

if (s.ok()) {
return;
}

ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s);
}
12 changes: 12 additions & 0 deletions java/rocksjni/write_batch_with_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -952,3 +952,15 @@ void Java_org_rocksdb_WBWIRocksIterator_refresh0(JNIEnv* env, jobject /*jobj*/,
ROCKSDB_NAMESPACE::Status::NotSupported("Refresh() is not supported");
ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s);
}

/*
* Class: org_rocksdb_WBWIRocksIterator
* Method: refresh1
* Signature: (JJ)V
*/
void Java_org_rocksdb_WBWIRocksIterator_refresh1(JNIEnv* env, jobject /*jobj*/,
jlong /*handle*/, jlong /*snapshot_handle*/) {
ROCKSDB_NAMESPACE::Status s =
ROCKSDB_NAMESPACE::Status::NotSupported("Refresh(Snapshot*) is not supported");
ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s);
}
7 changes: 7 additions & 0 deletions java/src/main/java/org/rocksdb/AbstractRocksIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ public void refresh() throws RocksDBException {
refresh0(nativeHandle_);
}

@Override
public void refresh(final Snapshot snapshot) throws RocksDBException {
assert (isOwningHandle());
refresh1(nativeHandle_, snapshot.getNativeHandle());
}

@Override
public void status() throws RocksDBException {
assert (isOwningHandle());
Expand Down Expand Up @@ -135,6 +141,7 @@ protected void disposeInternal() {
abstract void next0(long handle);
abstract void prev0(long handle);
abstract void refresh0(long handle) throws RocksDBException;
abstract void refresh1(long handle, long snapshotHandle) throws RocksDBException;
abstract void seek0(long handle, byte[] target, int targetLen);
abstract void seekForPrev0(long handle, byte[] target, int targetLen);
abstract void seekDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen);
Expand Down
1 change: 1 addition & 0 deletions java/src/main/java/org/rocksdb/RocksIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public int value(final ByteBuffer value) {
@Override final native void next0(long handle);
@Override final native void prev0(long handle);
@Override final native void refresh0(long handle);
@Override final native void refresh1(long handle, long snapshotHandle);
@Override final native void seek0(long handle, byte[] target, int targetLen);
@Override final native void seekForPrev0(long handle, byte[] target, int targetLen);
@Override
Expand Down
6 changes: 6 additions & 0 deletions java/src/main/java/org/rocksdb/RocksIteratorInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,10 @@ public interface RocksIteratorInterface {
* underlying native library
*/
void refresh() throws RocksDBException;

/**
* Similar to {@link #refresh()} but the iterator will be reading the latest DB state under the
* given snapshot.
*/
void refresh(Snapshot snapshot) throws RocksDBException;
}
1 change: 1 addition & 0 deletions java/src/main/java/org/rocksdb/SstFileReaderIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public int value(final ByteBuffer value) {
@Override final native void next0(long handle);
@Override final native void prev0(long handle);
@Override final native void refresh0(long handle) throws RocksDBException;
@Override final native void refresh1(long handle, long snapshotHandle);
@Override final native void seek0(long handle, byte[] target, int targetLen);
@Override final native void seekForPrev0(long handle, byte[] target, int targetLen);
@Override final native void status0(long handle) throws RocksDBException;
Expand Down
1 change: 1 addition & 0 deletions java/src/main/java/org/rocksdb/WBWIRocksIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public WriteEntry entry() {
@Override final native void next0(long handle);
@Override final native void prev0(long handle);
@Override final native void refresh0(final long handle) throws RocksDBException;
@Override final native void refresh1(long handle, long snapshotHandle);
@Override final native void seek0(long handle, byte[] target, int targetLen);
@Override final native void seekForPrev0(long handle, byte[] target, int targetLen);
@Override final native void status0(long handle) throws RocksDBException;
Expand Down
66 changes: 65 additions & 1 deletion java/src/test/java/org/rocksdb/RocksIteratorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void rocksIterator() throws RocksDBException {
@Test
public void rocksIteratorSeekAndInsert() throws RocksDBException {
try (final Options options =
new Options().setCreateIfMissing(true).setCreateMissingColumnFamilies(true);
new Options().setCreateIfMissing(true).setCreateMissingColumnFamilies(true);
final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) {
db.put("key1".getBytes(), "value1".getBytes());
db.put("key2".getBytes(), "value2".getBytes());
Expand Down Expand Up @@ -236,6 +236,70 @@ public void rocksIteratorSeekAndInsert() throws RocksDBException {
}
}

@Test
public void rocksIteratorSeekAndInsertOnSnapshot() throws RocksDBException {
try (final Options options =
new Options().setCreateIfMissing(true).setCreateMissingColumnFamilies(true);
final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) {
db.put("key1".getBytes(), "value1".getBytes());
db.put("key2".getBytes(), "value2".getBytes());

try (final Snapshot snapshot = db.getSnapshot()) {

try (final RocksIterator iterator = db.newIterator()) {
// check for just keys 1 and 2
iterator.seek("key0".getBytes());
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key1".getBytes());

iterator.seek("key2".getBytes());
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key2".getBytes());

iterator.seek("key3".getBytes());
assertThat(iterator.isValid()).isFalse();
}

// add a new key (after the snapshot was taken)
db.put("key3".getBytes(), "value3".getBytes());

try (final RocksIterator iterator = db.newIterator()) {
// check for keys 1, 2, and 3
iterator.seek("key0".getBytes());
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key1".getBytes());

iterator.seek("key2".getBytes());
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key2".getBytes());

iterator.seek("key3".getBytes());
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key3".getBytes());

iterator.seek("key4".getBytes());
assertThat(iterator.isValid()).isFalse();

// reset iterator to snapshot, iterator should now only see keys
// there were present in the db when the snapshot was taken
iterator.refresh(snapshot);

// again check for just keys 1 and 2
iterator.seek("key0".getBytes());
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key1".getBytes());

iterator.seek("key2".getBytes());
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key2".getBytes());

iterator.seek("key3".getBytes());
assertThat(iterator.isValid()).isFalse();
}
}
}
}

@Test
public void rocksIteratorReleaseAfterCfClose() throws RocksDBException {
try (final Options options = new Options()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,11 @@ public void refresh() throws RocksDBException {
offset = -1;
}

@Override
public void refresh(final Snapshot snapshot) throws RocksDBException {
offset = -1;
}

@Override
public void status() throws RocksDBException {
if(offset < 0 || offset >= entries.size()) {
Expand Down

0 comments on commit d5a635f

Please sign in to comment.