Skip to content

Commit

Permalink
Java API for merge operator.
Browse files Browse the repository at this point in the history
This is the first implementation of the merge operator API,
where merge operators can be implemented in Java without any C++ code.
It prefers simplicity and community feedback is appreciated.
  • Loading branch information
rhubner committed Dec 5, 2023
1 parent acc078f commit 42a4836
Show file tree
Hide file tree
Showing 13 changed files with 608 additions and 23 deletions.
4 changes: 4 additions & 0 deletions java/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ set(JNI_NATIVE_SOURCES
rocksjni/hyper_clock_cache.cc
rocksjni/ingest_external_file_options.cc
rocksjni/iterator.cc
rocksjni/jni_merge_operator_v2.cc
rocksjni/jnicallback.cc
rocksjni/loggerjnicallback.cc
rocksjni/lru_cache.cc
Expand Down Expand Up @@ -174,6 +175,7 @@ set(JAVA_MAIN_CLASSES
src/main/java/org/rocksdb/HistogramType.java
src/main/java/org/rocksdb/Holder.java
src/main/java/org/rocksdb/ImportColumnFamilyOptions.java
src/main/java/org/rocksdb/InBuiltMergeOperator.java
src/main/java/org/rocksdb/HyperClockCache.java
src/main/java/org/rocksdb/IndexShorteningMode.java
src/main/java/org/rocksdb/IndexType.java
Expand All @@ -192,6 +194,8 @@ set(JAVA_MAIN_CLASSES
src/main/java/org/rocksdb/MemTableConfig.java
src/main/java/org/rocksdb/MemTableInfo.java
src/main/java/org/rocksdb/MergeOperator.java
src/main/java/org/rocksdb/MergeOperatorV2.java
src/main/java/org/rocksdb/MergeOperatorOutput.java
src/main/java/org/rocksdb/MutableColumnFamilyOptions.java
src/main/java/org/rocksdb/MutableColumnFamilyOptionsInterface.java
src/main/java/org/rocksdb/MutableDBOptions.java
Expand Down
220 changes: 220 additions & 0 deletions java/rocksjni/jni_merge_operator_v2.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
//
// Created by rhubner on 29-Nov-23.
//

#include "include/org_rocksdb_MergeOperatorV2.h"
#include "jni_merge_operator_v2.h"
#include "rocksjni/cplusplus_to_java_convert.h"
#include "rocksjni/portal.h"
#include <iostream>


jlong Java_org_rocksdb_MergeOperatorV2_toCString
(JNIEnv* env, jclass, jstring operator_name) {
auto operator_name_utf = env->GetStringUTFChars(operator_name, nullptr);
if(operator_name_utf == nullptr) {
return 0; //Exception
}
auto operator_name_len = env->GetStringUTFLength(operator_name);

char* ret_value = new char[operator_name_len + 1];
strcpy_s(ret_value, operator_name_len + 1, operator_name_utf);

env->ReleaseStringUTFChars(operator_name, operator_name_utf);

return GET_CPLUSPLUS_POINTER(ret_value);
}

jlong Java_org_rocksdb_MergeOperatorV2_newMergeOperator
(JNIEnv* env, jobject java_merge_operator, jlong _operator_name) {

char* operator_name = reinterpret_cast<char*>(_operator_name);

auto* jni_merge_operator =
new std::shared_ptr<ROCKSDB_NAMESPACE::MergeOperator>(
new rocksdb::JniMergeOperatorV2(env, java_merge_operator, operator_name)
);

return GET_CPLUSPLUS_POINTER(jni_merge_operator);
}

void Java_org_rocksdb_MergeOperatorV2_disposeInternal
(JNIEnv *, jclass, jlong j_handle) {
auto* jni_merge_operator = reinterpret_cast<std::shared_ptr<ROCKSDB_NAMESPACE::MergeOperator>*>(j_handle);
delete jni_merge_operator;
}

namespace ROCKSDB_NAMESPACE {

JniMergeOperatorV2::JniMergeOperatorV2(JNIEnv* env, jobject java_merge_operator, char* _operator_name)
: JniCallback(env,java_merge_operator) {
operator_name = _operator_name;

j_merge_class = env->GetObjectClass(java_merge_operator);
if(j_merge_class == nullptr) {
return; //Exception
}
j_merge_class = static_cast<jclass>(env->NewGlobalRef(j_merge_class));
if(j_merge_class == nullptr) {
return; //Exception
}

j_merge_internal = env->GetMethodID(j_merge_class, "mergeInternal",
"(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;[Ljava/nio/ByteBuffer;)Lorg/rocksdb/MergeOperatorOutput;");
if(j_merge_internal == nullptr) {
return;
}

return_value_clazz = env->FindClass("org/rocksdb/MergeOperatorOutput");
if(return_value_clazz == nullptr) {
return ; //Exception
}
return_value_clazz = static_cast<jclass>(env->NewGlobalRef(return_value_clazz));
if(return_value_clazz == nullptr) {
return ; //Exception
}


return_value_method = env->GetMethodID(return_value_clazz, "getDirectValue", "()Ljava/nio/ByteBuffer;");
if(return_value_method == nullptr) {
return ;
}

return_status_method = env->GetMethodID(return_value_clazz, "getOpStatus",
"()I");
if(return_status_method == nullptr) {
return ;
}

j_byte_buffer_class = ByteBufferJni::getJClass(env);
if(j_byte_buffer_class == nullptr) {
return ;
}
j_byte_buffer_class = static_cast<jclass>(env->NewGlobalRef(j_byte_buffer_class));
if(j_byte_buffer_class == nullptr) {
return; //Exception
}

byte_buffer_position = env->GetMethodID(j_byte_buffer_class, "position", "()I");
if(byte_buffer_position == nullptr) {
return ;
}

byte_buffer_remaining = env->GetMethodID(j_byte_buffer_class, "remaining", "()I");
if(byte_buffer_remaining == nullptr) {
return ;
}


return ;
}

bool JniMergeOperatorV2::FullMergeV2(const MergeOperationInput &merge_in, MergeOperationOutput *merge_out) const {

jboolean attached_thread = JNI_FALSE;
auto env = getJniEnv(&attached_thread);

auto j_operand_list = env->NewObjectArray(static_cast<jsize>(merge_in.operand_list.size()),
j_byte_buffer_class, nullptr);
if(j_operand_list == nullptr) {
return clean_and_return_error(attached_thread, merge_out);
}

for(int i = 0; i < merge_in.operand_list.size(); i++) {
//TODO - Setup array
auto operand = merge_in.operand_list[i];
//auto byte_buffer = env->NewDirectByteBuffer((void *)operand.data(), operand.size()); //TODO - replace with C++ cast
auto byte_buffer = env->NewDirectByteBuffer(
const_cast<void*>(reinterpret_cast<const void*>(operand.data())),
operand.size()); //TODO - replace with C++ cast

if(byte_buffer == nullptr) {
return clean_and_return_error(attached_thread, merge_out);
}
env->SetObjectArrayElement(j_operand_list, i, byte_buffer);
}

auto key = env->NewDirectByteBuffer(
const_cast<void*>(reinterpret_cast<const void*>(merge_in.key.data())),
merge_in.key.size());
if(key == nullptr) {
return clean_and_return_error(attached_thread, merge_out);
}

jobject exising_value = nullptr;
if(merge_in.existing_value != nullptr) {
exising_value = env->NewDirectByteBuffer(
const_cast<void*>(reinterpret_cast<const void*>(merge_in.existing_value->data())),
merge_in.existing_value->size());
}


jobject result = env->CallObjectMethod(m_jcallback_obj, j_merge_internal,
key, exising_value, j_operand_list );
if(env->ExceptionCheck() == JNI_TRUE) {

env->ExceptionClear();
Error(merge_in.logger, "Unable to merge, Java code throw exception");
return clean_and_return_error(attached_thread, merge_out);
}

if(result == nullptr) {
Error(merge_in.logger, "Unable to merge, Java code return nullptr result");
return clean_and_return_error(attached_thread, merge_out);
}

merge_out->op_failure_scope = javaToOpFailureScope(env->CallIntMethod(result, return_status_method));
if(merge_out->op_failure_scope != MergeOperator::OpFailureScope::kDefault) {
releaseJniEnv(attached_thread);
return false;
}

auto result_byte_buff = env->CallObjectMethod(result, return_value_method);
if(result_byte_buff == nullptr) {
Error(merge_in.logger, "Unable to merge, Java code return nullptr ByteBuffer");
return clean_and_return_error(attached_thread, merge_out);
}

auto result_byte_buff_data = env->GetDirectBufferAddress(result_byte_buff);


auto position = env->CallIntMethod(result_byte_buff, byte_buffer_position);
auto remaining = env->CallIntMethod(result_byte_buff, byte_buffer_remaining);

merge_out->new_value.assign(static_cast<char*>(result_byte_buff_data) + position, remaining);

releaseJniEnv(attached_thread);

return true;
}

JniMergeOperatorV2::~JniMergeOperatorV2() {
jboolean attached_thread = JNI_FALSE;
auto env = getJniEnv(&attached_thread);
env->DeleteGlobalRef(j_merge_class);
env->DeleteGlobalRef(j_byte_buffer_class);
env->DeleteGlobalRef(return_value_clazz);
delete operator_name;
releaseJniEnv(attached_thread);
}

bool JniMergeOperatorV2::clean_and_return_error(jboolean& attached_thread, MergeOperationOutput *merge_out) const {
merge_out->op_failure_scope = MergeOperator::OpFailureScope::kOpFailureScopeMax;
releaseJniEnv(attached_thread);
return false;
}

const MergeOperator::OpFailureScope JniMergeOperatorV2::javaToOpFailureScope(jint failure) const {
switch (failure) {
case 0: return MergeOperator::OpFailureScope::kDefault;
case 1: return MergeOperator::OpFailureScope::kTryMerge;
case 2: return MergeOperator::OpFailureScope::kMustMerge;
case 3: return MergeOperator::OpFailureScope::kOpFailureScopeMax;
default: return MergeOperator::OpFailureScope::kOpFailureScopeMax;
}
}

const char *JniMergeOperatorV2::Name() const {
return operator_name;
}
}
39 changes: 39 additions & 0 deletions java/rocksjni/jni_merge_operator_v2.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//
// Created by rhubner on 29-Nov-23.
//
#ifndef ROCKSDB_JNI_MERGE_OPERATOR_V2_H
#define ROCKSDB_JNI_MERGE_OPERATOR_V2_H

#include <jni.h>
#include "rocksjni/jnicallback.h"
#include "rocksdb/merge_operator.h"

namespace ROCKSDB_NAMESPACE {

class JniMergeOperatorV2 : public JniCallback,
public MergeOperator {

public:
JniMergeOperatorV2(JNIEnv* env1, jobject java_merge_operator, char* operator_name);
bool FullMergeV2(const MergeOperationInput &merge_in, MergeOperationOutput *merge_out) const override;
const char* Name() const override;
~JniMergeOperatorV2() override;

private:
const MergeOperator::OpFailureScope javaToOpFailureScope(jint failure) const;
bool clean_and_return_error(jboolean& attached_thread, MergeOperationOutput *merge_out) const;
jclass j_merge_class;
jclass j_byte_buffer_class;
jmethodID j_merge_internal;
jclass return_value_clazz;
jmethodID return_value_method;
jmethodID return_status_method;
jmethodID byte_buffer_position;
jmethodID byte_buffer_remaining;
char* operator_name;


};

}
#endif // ROCKSDB_JNI_MERGE_OPERATOR_V2_H
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* CassandraValueMergeOperator is a merge operator that merges two cassandra wide column
* values.
*/
public class CassandraValueMergeOperator extends MergeOperator {
public class CassandraValueMergeOperator extends InBuiltMergeOperator {
public CassandraValueMergeOperator(final int gcGracePeriodInSeconds) {
super(newSharedCassandraValueMergeOperator(gcGracePeriodInSeconds, 0));
}
Expand Down
2 changes: 1 addition & 1 deletion java/src/main/java/org/rocksdb/ColumnFamilyOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public ColumnFamilyOptions setMergeOperatorName(final String name) {
@Override
public ColumnFamilyOptions setMergeOperator(
final MergeOperator mergeOperator) {
setMergeOperator(nativeHandle_, mergeOperator.nativeHandle_);
setMergeOperator(nativeHandle_, mergeOperator.nativeHandler());
return this;
}

Expand Down
31 changes: 31 additions & 0 deletions java/src/main/java/org/rocksdb/InBuiltMergeOperator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// Copyright (c) 2014, Vlad Balan (vlad.gm@gmail.com). All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

package org.rocksdb;

/**
* MergeOperator holds an operator to be applied when compacting
* two merge operands held under the same key in order to obtain a single
* value.
*/
public abstract class InBuiltMergeOperator extends RocksObject implements MergeOperator {
protected InBuiltMergeOperator(final long nativeHandle) {
super(nativeHandle);
}

@Override
public long nativeHandler() {
return nativeHandle_;
}
}


//
// InBuiltMergeOperator
// interface MergeOperator
//
//interface MergeOperatorV2 extends MergeOperator
//interface MergeOperatorV3 extends MergeOperator
25 changes: 7 additions & 18 deletions java/src/main/java/org/rocksdb/MergeOperator.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,7 @@
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// Copyright (c) 2014, Vlad Balan (vlad.gm@gmail.com). All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

package org.rocksdb;

/**
* MergeOperator holds an operator to be applied when compacting
* two merge operands held under the same key in order to obtain a single
* value.
*/
public abstract class MergeOperator extends RocksObject {
protected MergeOperator(final long nativeHandle) {
super(nativeHandle);
}
}
package org.rocksdb;

public interface MergeOperator {

public long nativeHandler();

}
Loading

0 comments on commit 42a4836

Please sign in to comment.