From 42a4836c08c12368624b3cca2d5fba20317cdf2a Mon Sep 17 00:00:00 2001 From: Radek Hubner Date: Tue, 5 Dec 2023 21:49:20 +0400 Subject: [PATCH] Java API for merge operator. This is the first implementation of the merge operator API, where merge operators can be implemented in Java without any C++ code. It prefers simplicity and community feedback is appreciated. --- java/CMakeLists.txt | 4 + java/rocksjni/jni_merge_operator_v2.cc | 220 ++++++++++++++++++ java/rocksjni/jni_merge_operator_v2.h | 39 ++++ .../rocksdb/CassandraValueMergeOperator.java | 2 +- .../java/org/rocksdb/ColumnFamilyOptions.java | 2 +- .../org/rocksdb/InBuiltMergeOperator.java | 31 +++ .../main/java/org/rocksdb/MergeOperator.java | 25 +- .../java/org/rocksdb/MergeOperatorOutput.java | 51 ++++ .../java/org/rocksdb/MergeOperatorV2.java | 64 +++++ java/src/main/java/org/rocksdb/Options.java | 2 +- .../org/rocksdb/StringAppendOperator.java | 2 +- .../java/org/rocksdb/UInt64AddOperator.java | 2 +- .../java/org/rocksdb/MergeOperatorV2Test.java | 187 +++++++++++++++ 13 files changed, 608 insertions(+), 23 deletions(-) create mode 100644 java/rocksjni/jni_merge_operator_v2.cc create mode 100644 java/rocksjni/jni_merge_operator_v2.h create mode 100644 java/src/main/java/org/rocksdb/InBuiltMergeOperator.java create mode 100644 java/src/main/java/org/rocksdb/MergeOperatorOutput.java create mode 100644 java/src/main/java/org/rocksdb/MergeOperatorV2.java create mode 100644 java/src/test/java/org/rocksdb/MergeOperatorV2Test.java diff --git a/java/CMakeLists.txt b/java/CMakeLists.txt index 0fc503e69ed7..b31c563f32ca 100644 --- a/java/CMakeLists.txt +++ b/java/CMakeLists.txt @@ -46,6 +46,7 @@ set(JNI_NATIVE_SOURCES rocksjni/hyper_clock_cache.cc rocksjni/ingest_external_file_options.cc rocksjni/iterator.cc + rocksjni/jni_merge_operator_v2.cc rocksjni/jnicallback.cc rocksjni/loggerjnicallback.cc rocksjni/lru_cache.cc @@ -174,6 +175,7 @@ set(JAVA_MAIN_CLASSES src/main/java/org/rocksdb/HistogramType.java src/main/java/org/rocksdb/Holder.java src/main/java/org/rocksdb/ImportColumnFamilyOptions.java + src/main/java/org/rocksdb/InBuiltMergeOperator.java src/main/java/org/rocksdb/HyperClockCache.java src/main/java/org/rocksdb/IndexShorteningMode.java src/main/java/org/rocksdb/IndexType.java @@ -192,6 +194,8 @@ set(JAVA_MAIN_CLASSES src/main/java/org/rocksdb/MemTableConfig.java src/main/java/org/rocksdb/MemTableInfo.java src/main/java/org/rocksdb/MergeOperator.java + src/main/java/org/rocksdb/MergeOperatorV2.java + src/main/java/org/rocksdb/MergeOperatorOutput.java src/main/java/org/rocksdb/MutableColumnFamilyOptions.java src/main/java/org/rocksdb/MutableColumnFamilyOptionsInterface.java src/main/java/org/rocksdb/MutableDBOptions.java diff --git a/java/rocksjni/jni_merge_operator_v2.cc b/java/rocksjni/jni_merge_operator_v2.cc new file mode 100644 index 000000000000..b396a0e9b872 --- /dev/null +++ b/java/rocksjni/jni_merge_operator_v2.cc @@ -0,0 +1,220 @@ +// +// Created by rhubner on 29-Nov-23. +// + +#include "include/org_rocksdb_MergeOperatorV2.h" +#include "jni_merge_operator_v2.h" +#include "rocksjni/cplusplus_to_java_convert.h" +#include "rocksjni/portal.h" +#include + + +jlong Java_org_rocksdb_MergeOperatorV2_toCString + (JNIEnv* env, jclass, jstring operator_name) { + auto operator_name_utf = env->GetStringUTFChars(operator_name, nullptr); + if(operator_name_utf == nullptr) { + return 0; //Exception + } + auto operator_name_len = env->GetStringUTFLength(operator_name); + + char* ret_value = new char[operator_name_len + 1]; + strcpy_s(ret_value, operator_name_len + 1, operator_name_utf); + + env->ReleaseStringUTFChars(operator_name, operator_name_utf); + + return GET_CPLUSPLUS_POINTER(ret_value); +} + +jlong Java_org_rocksdb_MergeOperatorV2_newMergeOperator + (JNIEnv* env, jobject java_merge_operator, jlong _operator_name) { + + char* operator_name = reinterpret_cast(_operator_name); + + auto* jni_merge_operator = + new std::shared_ptr( + new rocksdb::JniMergeOperatorV2(env, java_merge_operator, operator_name) + ); + + return GET_CPLUSPLUS_POINTER(jni_merge_operator); +} + +void Java_org_rocksdb_MergeOperatorV2_disposeInternal + (JNIEnv *, jclass, jlong j_handle) { + auto* jni_merge_operator = reinterpret_cast*>(j_handle); + delete jni_merge_operator; +} + +namespace ROCKSDB_NAMESPACE { + +JniMergeOperatorV2::JniMergeOperatorV2(JNIEnv* env, jobject java_merge_operator, char* _operator_name) + : JniCallback(env,java_merge_operator) { + operator_name = _operator_name; + + j_merge_class = env->GetObjectClass(java_merge_operator); + if(j_merge_class == nullptr) { + return; //Exception + } + j_merge_class = static_cast(env->NewGlobalRef(j_merge_class)); + if(j_merge_class == nullptr) { + return; //Exception + } + + j_merge_internal = env->GetMethodID(j_merge_class, "mergeInternal", + "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;[Ljava/nio/ByteBuffer;)Lorg/rocksdb/MergeOperatorOutput;"); + if(j_merge_internal == nullptr) { + return; + } + + return_value_clazz = env->FindClass("org/rocksdb/MergeOperatorOutput"); + if(return_value_clazz == nullptr) { + return ; //Exception + } + return_value_clazz = static_cast(env->NewGlobalRef(return_value_clazz)); + if(return_value_clazz == nullptr) { + return ; //Exception + } + + + return_value_method = env->GetMethodID(return_value_clazz, "getDirectValue", "()Ljava/nio/ByteBuffer;"); + if(return_value_method == nullptr) { + return ; + } + + return_status_method = env->GetMethodID(return_value_clazz, "getOpStatus", + "()I"); + if(return_status_method == nullptr) { + return ; + } + + j_byte_buffer_class = ByteBufferJni::getJClass(env); + if(j_byte_buffer_class == nullptr) { + return ; + } + j_byte_buffer_class = static_cast(env->NewGlobalRef(j_byte_buffer_class)); + if(j_byte_buffer_class == nullptr) { + return; //Exception + } + + byte_buffer_position = env->GetMethodID(j_byte_buffer_class, "position", "()I"); + if(byte_buffer_position == nullptr) { + return ; + } + + byte_buffer_remaining = env->GetMethodID(j_byte_buffer_class, "remaining", "()I"); + if(byte_buffer_remaining == nullptr) { + return ; + } + + + return ; +} + +bool JniMergeOperatorV2::FullMergeV2(const MergeOperationInput &merge_in, MergeOperationOutput *merge_out) const { + + jboolean attached_thread = JNI_FALSE; + auto env = getJniEnv(&attached_thread); + + auto j_operand_list = env->NewObjectArray(static_cast(merge_in.operand_list.size()), + j_byte_buffer_class, nullptr); + if(j_operand_list == nullptr) { + return clean_and_return_error(attached_thread, merge_out); + } + + for(int i = 0; i < merge_in.operand_list.size(); i++) { + //TODO - Setup array + auto operand = merge_in.operand_list[i]; + //auto byte_buffer = env->NewDirectByteBuffer((void *)operand.data(), operand.size()); //TODO - replace with C++ cast + auto byte_buffer = env->NewDirectByteBuffer( + const_cast(reinterpret_cast(operand.data())), + operand.size()); //TODO - replace with C++ cast + + if(byte_buffer == nullptr) { + return clean_and_return_error(attached_thread, merge_out); + } + env->SetObjectArrayElement(j_operand_list, i, byte_buffer); + } + + auto key = env->NewDirectByteBuffer( + const_cast(reinterpret_cast(merge_in.key.data())), + merge_in.key.size()); + if(key == nullptr) { + return clean_and_return_error(attached_thread, merge_out); + } + + jobject exising_value = nullptr; + if(merge_in.existing_value != nullptr) { + exising_value = env->NewDirectByteBuffer( + const_cast(reinterpret_cast(merge_in.existing_value->data())), + merge_in.existing_value->size()); + } + + + jobject result = env->CallObjectMethod(m_jcallback_obj, j_merge_internal, + key, exising_value, j_operand_list ); + if(env->ExceptionCheck() == JNI_TRUE) { + + env->ExceptionClear(); + Error(merge_in.logger, "Unable to merge, Java code throw exception"); + return clean_and_return_error(attached_thread, merge_out); + } + + if(result == nullptr) { + Error(merge_in.logger, "Unable to merge, Java code return nullptr result"); + return clean_and_return_error(attached_thread, merge_out); + } + + merge_out->op_failure_scope = javaToOpFailureScope(env->CallIntMethod(result, return_status_method)); + if(merge_out->op_failure_scope != MergeOperator::OpFailureScope::kDefault) { + releaseJniEnv(attached_thread); + return false; + } + + auto result_byte_buff = env->CallObjectMethod(result, return_value_method); + if(result_byte_buff == nullptr) { + Error(merge_in.logger, "Unable to merge, Java code return nullptr ByteBuffer"); + return clean_and_return_error(attached_thread, merge_out); + } + + auto result_byte_buff_data = env->GetDirectBufferAddress(result_byte_buff); + + + auto position = env->CallIntMethod(result_byte_buff, byte_buffer_position); + auto remaining = env->CallIntMethod(result_byte_buff, byte_buffer_remaining); + + merge_out->new_value.assign(static_cast(result_byte_buff_data) + position, remaining); + + releaseJniEnv(attached_thread); + + return true; +} + +JniMergeOperatorV2::~JniMergeOperatorV2() { + jboolean attached_thread = JNI_FALSE; + auto env = getJniEnv(&attached_thread); + env->DeleteGlobalRef(j_merge_class); + env->DeleteGlobalRef(j_byte_buffer_class); + env->DeleteGlobalRef(return_value_clazz); + delete operator_name; + releaseJniEnv(attached_thread); +} + +bool JniMergeOperatorV2::clean_and_return_error(jboolean& attached_thread, MergeOperationOutput *merge_out) const { + merge_out->op_failure_scope = MergeOperator::OpFailureScope::kOpFailureScopeMax; + releaseJniEnv(attached_thread); + return false; +} + +const MergeOperator::OpFailureScope JniMergeOperatorV2::javaToOpFailureScope(jint failure) const { + switch (failure) { + case 0: return MergeOperator::OpFailureScope::kDefault; + case 1: return MergeOperator::OpFailureScope::kTryMerge; + case 2: return MergeOperator::OpFailureScope::kMustMerge; + case 3: return MergeOperator::OpFailureScope::kOpFailureScopeMax; + default: return MergeOperator::OpFailureScope::kOpFailureScopeMax; + } +} + +const char *JniMergeOperatorV2::Name() const { + return operator_name; +} +} \ No newline at end of file diff --git a/java/rocksjni/jni_merge_operator_v2.h b/java/rocksjni/jni_merge_operator_v2.h new file mode 100644 index 000000000000..a96a4aeff813 --- /dev/null +++ b/java/rocksjni/jni_merge_operator_v2.h @@ -0,0 +1,39 @@ +// +// Created by rhubner on 29-Nov-23. +// +#ifndef ROCKSDB_JNI_MERGE_OPERATOR_V2_H +#define ROCKSDB_JNI_MERGE_OPERATOR_V2_H + +#include +#include "rocksjni/jnicallback.h" +#include "rocksdb/merge_operator.h" + +namespace ROCKSDB_NAMESPACE { + +class JniMergeOperatorV2 : public JniCallback, + public MergeOperator { + + public: + JniMergeOperatorV2(JNIEnv* env1, jobject java_merge_operator, char* operator_name); + bool FullMergeV2(const MergeOperationInput &merge_in, MergeOperationOutput *merge_out) const override; + const char* Name() const override; + ~JniMergeOperatorV2() override; + + private: + const MergeOperator::OpFailureScope javaToOpFailureScope(jint failure) const; + bool clean_and_return_error(jboolean& attached_thread, MergeOperationOutput *merge_out) const; + jclass j_merge_class; + jclass j_byte_buffer_class; + jmethodID j_merge_internal; + jclass return_value_clazz; + jmethodID return_value_method; + jmethodID return_status_method; + jmethodID byte_buffer_position; + jmethodID byte_buffer_remaining; + char* operator_name; + + +}; + +} +#endif // ROCKSDB_JNI_MERGE_OPERATOR_V2_H \ No newline at end of file diff --git a/java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java b/java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java index 732faee207a6..5788d17139f5 100644 --- a/java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java +++ b/java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java @@ -9,7 +9,7 @@ * CassandraValueMergeOperator is a merge operator that merges two cassandra wide column * values. */ -public class CassandraValueMergeOperator extends MergeOperator { +public class CassandraValueMergeOperator extends InBuiltMergeOperator { public CassandraValueMergeOperator(final int gcGracePeriodInSeconds) { super(newSharedCassandraValueMergeOperator(gcGracePeriodInSeconds, 0)); } diff --git a/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java b/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java index 607a17936e16..acf7f92c33f8 100644 --- a/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java +++ b/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java @@ -221,7 +221,7 @@ public ColumnFamilyOptions setMergeOperatorName(final String name) { @Override public ColumnFamilyOptions setMergeOperator( final MergeOperator mergeOperator) { - setMergeOperator(nativeHandle_, mergeOperator.nativeHandle_); + setMergeOperator(nativeHandle_, mergeOperator.nativeHandler()); return this; } diff --git a/java/src/main/java/org/rocksdb/InBuiltMergeOperator.java b/java/src/main/java/org/rocksdb/InBuiltMergeOperator.java new file mode 100644 index 000000000000..8a44e2988d00 --- /dev/null +++ b/java/src/main/java/org/rocksdb/InBuiltMergeOperator.java @@ -0,0 +1,31 @@ +// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. +// Copyright (c) 2014, Vlad Balan (vlad.gm@gmail.com). All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +package org.rocksdb; + +/** + * MergeOperator holds an operator to be applied when compacting + * two merge operands held under the same key in order to obtain a single + * value. + */ +public abstract class InBuiltMergeOperator extends RocksObject implements MergeOperator { + protected InBuiltMergeOperator(final long nativeHandle) { + super(nativeHandle); + } + + @Override + public long nativeHandler() { + return nativeHandle_; + } +} + + +// +// InBuiltMergeOperator +// interface MergeOperator +// +//interface MergeOperatorV2 extends MergeOperator +//interface MergeOperatorV3 extends MergeOperator \ No newline at end of file diff --git a/java/src/main/java/org/rocksdb/MergeOperator.java b/java/src/main/java/org/rocksdb/MergeOperator.java index c299f62210fa..1c6e914de0a7 100644 --- a/java/src/main/java/org/rocksdb/MergeOperator.java +++ b/java/src/main/java/org/rocksdb/MergeOperator.java @@ -1,18 +1,7 @@ -// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. -// Copyright (c) 2014, Vlad Balan (vlad.gm@gmail.com). All rights reserved. -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). - -package org.rocksdb; - -/** - * MergeOperator holds an operator to be applied when compacting - * two merge operands held under the same key in order to obtain a single - * value. - */ -public abstract class MergeOperator extends RocksObject { - protected MergeOperator(final long nativeHandle) { - super(nativeHandle); - } -} +package org.rocksdb; + +public interface MergeOperator { + + public long nativeHandler(); + +} diff --git a/java/src/main/java/org/rocksdb/MergeOperatorOutput.java b/java/src/main/java/org/rocksdb/MergeOperatorOutput.java new file mode 100644 index 000000000000..4ca46755c9e6 --- /dev/null +++ b/java/src/main/java/org/rocksdb/MergeOperatorOutput.java @@ -0,0 +1,51 @@ +package org.rocksdb; + + +import java.nio.ByteBuffer; + +/* + std::string& new_value; + Slice& existing_operand; + OpFailureScope op_failure_scope = OpFailureScope::kDefault; + */ +public class MergeOperatorOutput { + public enum OpFailureScope { + Default(0), + TryMerge(1), + MustMerge(2), + OpFailureScopeMax(3); + private final int status; + OpFailureScope(int status) { + this.status = status; + } + } + + private ByteBuffer directValue; + private OpFailureScope op_failure_scope = OpFailureScope.Default; + + + public MergeOperatorOutput(final ByteBuffer directValue) { + this.directValue = directValue; + } + + public MergeOperatorOutput(final ByteBuffer directValue, final OpFailureScope op_failure_scope) { + this.directValue = directValue; + this.op_failure_scope = op_failure_scope; + } + + public ByteBuffer getDirectValue() { + return directValue; + } + + public OpFailureScope getOp_failure_scope() { + return op_failure_scope; + } + + /** + * For JNI. Called from JniMergeOperatorV2 + */ + private int getOpStatus() { + return this.op_failure_scope.status; + } + +} \ No newline at end of file diff --git a/java/src/main/java/org/rocksdb/MergeOperatorV2.java b/java/src/main/java/org/rocksdb/MergeOperatorV2.java new file mode 100644 index 000000000000..b6ccb96496f7 --- /dev/null +++ b/java/src/main/java/org/rocksdb/MergeOperatorV2.java @@ -0,0 +1,64 @@ +package org.rocksdb; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +public abstract class MergeOperatorV2 extends RocksCallbackObject implements MergeOperator { + + private final String operatorName; + + public MergeOperatorV2(final String operatorName) { + super(new long[]{toCString(operatorName)}); + this.operatorName = operatorName; + } + + /** + * All parameters of this method are valid only during the call. If you want to keep this, you need to make deep copy/clone. + * + * + * @param key + * @param existingValue + * @param operand + * @return + */ + public abstract MergeOperatorOutput fullMergeV2(final ByteBuffer key, final ByteBuffer existingValue, final List operand); + + + final MergeOperatorOutput mergeInternal(final ByteBuffer key, final ByteBuffer existingValue, final ByteBuffer[] operand) { + List operandList = Arrays.stream(operand) + .map(ByteBuffer::asReadOnlyBuffer) + .collect(Collectors.toUnmodifiableList()); + + return fullMergeV2(key.asReadOnlyBuffer(), existingValue.asReadOnlyBuffer(), operandList); + } + + + @Override + final public long nativeHandler() { + return nativeHandle_; + } + + @Override + protected long initializeNative(long... nativeParameterHandles) { + return newMergeOperator(nativeParameterHandles[0]); + } + + @Override + protected void disposeInternal() { + disposeInternal(nativeHandle_); + } + + public String getOperatorName() { + return operatorName; + } + + private native long newMergeOperator(long operatorName); + + private static native long toCString(String operatorName); + + private static native void disposeInternal(long nativeHandle); + +} \ No newline at end of file diff --git a/java/src/main/java/org/rocksdb/Options.java b/java/src/main/java/org/rocksdb/Options.java index 29f5e8e0d233..1b02c5c22aa1 100644 --- a/java/src/main/java/org/rocksdb/Options.java +++ b/java/src/main/java/org/rocksdb/Options.java @@ -238,7 +238,7 @@ public Options setMergeOperatorName(final String name) { @Override public Options setMergeOperator(final MergeOperator mergeOperator) { - setMergeOperator(nativeHandle_, mergeOperator.nativeHandle_); + setMergeOperator(nativeHandle_, mergeOperator.nativeHandler()); return this; } diff --git a/java/src/main/java/org/rocksdb/StringAppendOperator.java b/java/src/main/java/org/rocksdb/StringAppendOperator.java index 547371e7c08b..0671b044b329 100644 --- a/java/src/main/java/org/rocksdb/StringAppendOperator.java +++ b/java/src/main/java/org/rocksdb/StringAppendOperator.java @@ -10,7 +10,7 @@ * StringAppendOperator is a merge operator that concatenates * two strings. */ -public class StringAppendOperator extends MergeOperator { +public class StringAppendOperator extends InBuiltMergeOperator { public StringAppendOperator() { this(','); } diff --git a/java/src/main/java/org/rocksdb/UInt64AddOperator.java b/java/src/main/java/org/rocksdb/UInt64AddOperator.java index 0cffdce8c117..837abdff31dd 100644 --- a/java/src/main/java/org/rocksdb/UInt64AddOperator.java +++ b/java/src/main/java/org/rocksdb/UInt64AddOperator.java @@ -9,7 +9,7 @@ * Uint64AddOperator is a merge operator that accumlates a long * integer value. */ -public class UInt64AddOperator extends MergeOperator { +public class UInt64AddOperator extends InBuiltMergeOperator { public UInt64AddOperator() { super(newSharedUInt64AddOperator()); } diff --git a/java/src/test/java/org/rocksdb/MergeOperatorV2Test.java b/java/src/test/java/org/rocksdb/MergeOperatorV2Test.java new file mode 100644 index 000000000000..de65e3c1f20b --- /dev/null +++ b/java/src/test/java/org/rocksdb/MergeOperatorV2Test.java @@ -0,0 +1,187 @@ +package org.rocksdb; + +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.in; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Optional; + +public class MergeOperatorV2Test { + + @ClassRule + public static final RocksNativeLibraryResource ROCKS_NATIVE_LIBRARY_RESOURCE = + new RocksNativeLibraryResource(); + + @Rule + public TemporaryFolder dbFolder = new TemporaryFolder(); + + private static byte[] KEY = "thisIsKey".getBytes(StandardCharsets.UTF_8); + + @Test + public void testMergeOperator() throws RocksDBException { + + try( TestMergeOperator mergeOperator = new TestMergeOperator(); + Options options = new Options()) { + options.setMergeOperator(mergeOperator); + options.setCreateIfMissing(true); + + try(RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { + db.put(KEY, "value".getBytes()); + db.merge(KEY, "value1".getBytes()); + db.merge(KEY, "value2".getBytes()); + db.merge(KEY, "value3".getBytes()); + byte[] valueFromDb = db.get(KEY); + assertThat(valueFromDb).containsExactly("10".getBytes(StandardCharsets.UTF_8)); + + } + } + } + + @Test + public void testMergeOperator2() throws RocksDBException { + + try (MergeOperatorV2 mergeOperator = new MergeOperatorV2("Second operator") { + + @Override + public MergeOperatorOutput fullMergeV2(ByteBuffer key, ByteBuffer existingValue, List operand) { + ByteBuffer b = ByteBuffer.allocateDirect(10); + b.put("xxx".getBytes(StandardCharsets.UTF_8)); + b.put(new byte[]{0,0}); + b.flip(); + b.position(3); + return new MergeOperatorOutput(b); + } + }) { + + + try (Options options = new Options()) { + + options.setMergeOperator(mergeOperator); + options.setCreateIfMissing(true); + + try (RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { + db.put(KEY, "value".getBytes()); + db.merge(KEY, "value1".getBytes()); + db.merge(KEY, "value2".getBytes()); + db.merge(KEY, "value3".getBytes()); + byte[] valueFromDb = db.get(KEY); + assertThat(valueFromDb).containsExactly(new byte[]{0,0}); + + } + } + } + } + + @Test + public void testMergeOperator3() throws RocksDBException { + + try (MergeOperatorV2 mergeOperator = new MergeOperatorV2("Third operator") { + + @Override + public MergeOperatorOutput fullMergeV2(ByteBuffer key, ByteBuffer existingValue, List operand) { + return new MergeOperatorOutput(operand.get(1)); + } + }) { + + try (Options options = new Options()) { + + options.setMergeOperator(mergeOperator); + options.setCreateIfMissing(true); + + try (RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { + db.put(KEY, "value".getBytes()); + db.merge(KEY, "value1".getBytes()); + db.merge(KEY, "value2".getBytes()); + db.merge(KEY, "value3".getBytes()); + byte[] valueFromDb = db.get(KEY); + assertThat(valueFromDb).containsExactly("value2".getBytes(StandardCharsets.UTF_8)); + + } + } + } + } + + @Test(expected = RocksDBException.class) + public void testMergeOperatorCrash() throws RocksDBException { + + try (MergeOperatorV2 mergeOperator = new MergeOperatorV2("CrashOperator") { + + @Override + public MergeOperatorOutput fullMergeV2(ByteBuffer key, ByteBuffer existingValue, List operand) { + return new MergeOperatorOutput(null, MergeOperatorOutput.OpFailureScope.OpFailureScopeMax); + } + }) { + + try (Options options = new Options()) { + + options.setMergeOperator(mergeOperator); + options.setCreateIfMissing(true); + + try (RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { + db.put(KEY, "value".getBytes()); + db.merge(KEY, "value1".getBytes()); + db.merge(KEY, "value2".getBytes()); + db.merge(KEY, "value3".getBytes()); + byte[] valueFromDb = db.get(KEY); + assertThat(valueFromDb).containsExactly("value2".getBytes(StandardCharsets.UTF_8)); + + } + } + } + } + + @Test(expected = RocksDBException.class) + public void testMergeOperatorJavaException() throws RocksDBException { + + try (MergeOperatorV2 mergeOperator = new MergeOperatorV2("CrashOperator") { + + @Override + public MergeOperatorOutput fullMergeV2(ByteBuffer key, ByteBuffer existingValue, List operand) { + throw new RuntimeException("Never do this"); + } + }) { + + try (Options options = new Options()) { + options.setMergeOperator(mergeOperator); + options.setCreateIfMissing(true); + + try (RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { + db.put(KEY, "value".getBytes()); + db.merge(KEY, "value1".getBytes()); + db.merge(KEY, "value2".getBytes()); + db.merge(KEY, "value3".getBytes()); + byte[] valueFromDb = db.get(KEY); + assertThat(valueFromDb).containsExactly("value2".getBytes(StandardCharsets.UTF_8)); + + } + } + } + } + + public static class TestMergeOperator extends MergeOperatorV2 { + + public TestMergeOperator() { + super("TestMergeOperator"); + } + + @Override + public MergeOperatorOutput fullMergeV2(ByteBuffer key, ByteBuffer existingValue, List operand) { + + ByteBuffer b = ByteBuffer.allocateDirect(10); + b.put("10".getBytes(StandardCharsets.UTF_8)); + b.flip(); + return new MergeOperatorOutput(b); + } + + } + + + +}