From cca984f86dc25a748fe4af77bd9ad386a7038039 Mon Sep 17 00:00:00 2001 From: Warren Falk Date: Tue, 6 Jun 2017 09:47:24 -0400 Subject: [PATCH] add support for WriteBatchWithIndex --- RocksDbSharp/Iterator.cs | 11 + RocksDbSharp/Native.Marshaled.cs | 174 +++++++++++ RocksDbSharp/Native.Raw.cs | 81 ++++- RocksDbSharp/Native.Wrap.cs | 104 +++++++ RocksDbSharp/RocksDb.cs | 41 +-- RocksDbSharp/WriteBatch.cs | 65 +++- RocksDbSharp/WriteBatchWithIndex.cs | 351 ++++++++++++++++++++++ tests/RocksDbSharpTest/FunctionalTests.cs | 49 +++ 8 files changed, 845 insertions(+), 31 deletions(-) create mode 100644 RocksDbSharp/WriteBatchWithIndex.cs diff --git a/RocksDbSharp/Iterator.cs b/RocksDbSharp/Iterator.cs index 023dbc8..352b6f9 100644 --- a/RocksDbSharp/Iterator.cs +++ b/RocksDbSharp/Iterator.cs @@ -42,6 +42,17 @@ public void Dispose() } } + /// + /// Detach the iterator from its handle but don't dispose the handle + /// + /// + public IntPtr Detach() + { + var r = handle; + handle = IntPtr.Zero; + return r; + } + public bool Valid() { return Native.Instance.rocksdb_iter_valid(handle); diff --git a/RocksDbSharp/Native.Marshaled.cs b/RocksDbSharp/Native.Marshaled.cs index e29c725..81b5329 100644 --- a/RocksDbSharp/Native.Marshaled.cs +++ b/RocksDbSharp/Native.Marshaled.cs @@ -362,6 +362,33 @@ public void rocksdb_writebatch_put(IntPtr writeOptions, string key, string val, } } + public void rocksdb_writebatch_wi_put(IntPtr writeOptions, string key, string val, Encoding encoding) + { + unsafe + { + if (encoding == null) + encoding = Encoding.UTF8; + fixed (char* k = key, v = val) + { + int klength = key.Length; + int vlength = val.Length; + int bklength = encoding.GetByteCount(k, klength); + int bvlength = encoding.GetByteCount(v, vlength); + var buffer = Marshal.AllocHGlobal(bklength + bvlength); + byte* bk = (byte*)buffer.ToPointer(); + encoding.GetBytes(k, klength, bk, bklength); + byte* bv = bk + bklength; + encoding.GetBytes(v, vlength, bv, bvlength); + + rocksdb_writebatch_wi_put(writeOptions, bk, (ulong)bklength, bv, (ulong)bvlength); +#if DEBUG + Zero(bk, bklength); +#endif + Marshal.FreeHGlobal(buffer); + } + } + } + public void rocksdb_iter_seek( IntPtr iter, string key, @@ -463,6 +490,16 @@ public byte[] rocksdb_writebatch_data(IntPtr wbHandle) return data; } + public byte[] rocksdb_writebatch_wi_data(IntPtr wbHandle) + { + var resultPtr = rocksdb_writebatch_wi_data(wbHandle, out ulong size); + var data = new byte[size]; + Marshal.Copy(resultPtr, data, 0, (int)size); + // Do not free this memory because it is owned by the write batch and will be freed when it is disposed + // rocksdb_free(resultPtr); + return data; + } + public int rocksdb_writebatch_data(IntPtr wbHandle, byte[] buffer, int offset, int length) { var resultPtr = rocksdb_writebatch_data(wbHandle, out ulong size); @@ -479,6 +516,22 @@ public int rocksdb_writebatch_data(IntPtr wbHandle, byte[] buffer, int offset, i return (int)size; } + public int rocksdb_writebatch_wi_data(IntPtr wbHandle, byte[] buffer, int offset, int length) + { + var resultPtr = rocksdb_writebatch_wi_data(wbHandle, out ulong size); + bool fits = (int)size <= length; + if (!fits) + { + // Do not free this memory because it is owned by the write batch and will be freed when it is disposed + // rocksdb_free(resultPtr); + return -1; + } + Marshal.Copy(resultPtr, buffer, offset, (int)size); + // Do not free this memory because it is owned by the write batch and will be freed when it is disposed + // rocksdb_free(resultPtr); + return (int)size; + } + public string rocksdb_property_value_string(IntPtr db, string propname) { return MarshalNullTermAsciiStr(rocksdb_property_value(db, propname)); @@ -522,5 +575,126 @@ public unsafe void rocksdb_sstfilewriter_add( } } } + + public string rocksdb_writebatch_wi_get_from_batch( + IntPtr wb, + IntPtr options, + string key, + out IntPtr errptr, + ColumnFamilyHandle cf = null, + Encoding encoding = null) + { + if (encoding == null) + encoding = Encoding.UTF8; + unsafe + { + fixed (char* k = key) + { + int klength = key.Length; + int bklength = encoding.GetByteCount(k, klength); + var buffer = Marshal.AllocHGlobal(bklength); + byte* bk = (byte*)buffer.ToPointer(); + encoding.GetBytes(k, klength, bk, bklength); + + var resultPtr = cf == null + ? rocksdb_writebatch_wi_get_from_batch(wb, options, bk, (ulong)bklength, out ulong bvlength, out errptr) + : rocksdb_writebatch_wi_get_from_batch_cf(wb, options, cf.Handle, bk, (ulong)bklength, out bvlength, out errptr); +#if DEBUG + Zero(bk, bklength); +#endif + Marshal.FreeHGlobal(buffer); + + if (errptr != IntPtr.Zero) + return null; + if (resultPtr == IntPtr.Zero) + return null; + + return MarshalAndFreeRocksDbString(resultPtr, (long)bvlength, encoding); + } + } + } + + public byte[] rocksdb_writebatch_wi_get_from_batch( + IntPtr wb, + IntPtr options, + byte[] key, + ulong keyLength, + out IntPtr errptr, + ColumnFamilyHandle cf = null) + { + var resultPtr = cf == null + ? rocksdb_writebatch_wi_get_from_batch(wb, options, key, keyLength, out ulong valueLength, out errptr) + : rocksdb_writebatch_wi_get_from_batch_cf(wb, options, cf.Handle, key, keyLength, out valueLength, out errptr); + if (errptr != IntPtr.Zero) + return null; + if (resultPtr == IntPtr.Zero) + return null; + var result = new byte[valueLength]; + Marshal.Copy(resultPtr, result, 0, (int)valueLength); + rocksdb_free(resultPtr); + return result; + } + + public string rocksdb_writebatch_wi_get_from_batch_and_db( + IntPtr wb, + IntPtr db, + IntPtr read_options, + string key, + out IntPtr errptr, + ColumnFamilyHandle cf = null, + Encoding encoding = null) + { + if (encoding == null) + encoding = Encoding.UTF8; + unsafe + { + fixed (char* k = key) + { + int klength = key.Length; + int bklength = encoding.GetByteCount(k, klength); + var buffer = Marshal.AllocHGlobal(bklength); + byte* bk = (byte*)buffer.ToPointer(); + encoding.GetBytes(k, klength, bk, bklength); + + var resultPtr = cf == null + ? rocksdb_writebatch_wi_get_from_batch_and_db(wb, db, read_options, bk, (ulong)bklength, out ulong bvlength, out errptr) + : rocksdb_writebatch_wi_get_from_batch_and_db_cf(wb, db, read_options, cf.Handle, bk, (ulong)bklength, out bvlength, out errptr); +#if DEBUG + Zero(bk, bklength); +#endif + Marshal.FreeHGlobal(buffer); + + if (errptr != IntPtr.Zero) + return null; + if (resultPtr == IntPtr.Zero) + return null; + + return MarshalAndFreeRocksDbString(resultPtr, (long)bvlength, encoding); + } + } + } + + public byte[] rocksdb_writebatch_wi_get_from_batch_and_db( + IntPtr wb, + IntPtr db, + IntPtr read_options, + byte[] key, + ulong keyLength, + out IntPtr errptr, + ColumnFamilyHandle cf = null) + { + var resultPtr = cf == null + ? rocksdb_writebatch_wi_get_from_batch_and_db(wb, db, read_options, key, keyLength, out ulong valueLength, out errptr) + : rocksdb_writebatch_wi_get_from_batch_and_db_cf(wb, db, read_options, cf.Handle, key, keyLength, out valueLength, out errptr); + if (errptr != IntPtr.Zero) + return null; + if (resultPtr == IntPtr.Zero) + return null; + var result = new byte[valueLength]; + Marshal.Copy(resultPtr, result, 0, (int)valueLength); + rocksdb_free(resultPtr); + return result; + } + } } diff --git a/RocksDbSharp/Native.Raw.cs b/RocksDbSharp/Native.Raw.cs index 8568996..c4b89b8 100644 --- a/RocksDbSharp/Native.Raw.cs +++ b/RocksDbSharp/Native.Raw.cs @@ -532,11 +532,19 @@ public abstract void rocksdb_writebatch_wi_destroy( /*(rocksdb_writebatch_wi_t*)*/ IntPtr b); public abstract void rocksdb_writebatch_wi_clear(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b); public abstract int rocksdb_writebatch_wi_count(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b); +public abstract void rocksdb_writebatch_wi_put(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, + /*(const char*)*/ byte[] key, + /*(size_t)*/ ulong klen, + /*(const char*)*/ byte[] val, + /*(size_t)*/ ulong vlen); public abstract unsafe void rocksdb_writebatch_wi_put(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(const char*)*/ byte* key, /*(size_t)*/ ulong klen, /*(const char*)*/ byte* val, /*(size_t)*/ ulong vlen); +public abstract void rocksdb_writebatch_wi_put_cf( + /*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family, + /*(const char*)*/ byte[] key, /*(size_t)*/ ulong klen, /*(const char*)*/ byte[] val, /*(size_t)*/ ulong vlen); public abstract unsafe void rocksdb_writebatch_wi_put_cf( /*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family, /*(const char*)*/ byte* key, /*(size_t)*/ ulong klen, /*(const char*)*/ byte* val, /*(size_t)*/ ulong vlen); @@ -549,11 +557,19 @@ public abstract void rocksdb_writebatch_wi_putv_cf( int num_keys, /*(const char* const*)*/ IntPtr keys_list, /*(const size_t*)*/ IntPtr keys_list_sizes, int num_values, /*(const char* const*)*/ IntPtr values_list, /*(const size_t*)*/ IntPtr values_list_sizes); +public abstract void rocksdb_writebatch_wi_merge(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, + /*(const char*)*/ byte[] key, + /*(size_t)*/ ulong klen, + /*(const char*)*/ byte[] val, + /*(size_t)*/ ulong vlen); public abstract unsafe void rocksdb_writebatch_wi_merge(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(const char*)*/ byte* key, /*(size_t)*/ ulong klen, /*(const char*)*/ byte* val, /*(size_t)*/ ulong vlen); +public abstract void rocksdb_writebatch_wi_merge_cf( + /*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family, + /*(const char*)*/ byte[] key, /*(size_t)*/ ulong klen, /*(const char*)*/ byte[] val, /*(size_t)*/ ulong vlen); public abstract unsafe void rocksdb_writebatch_wi_merge_cf( /*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family, /*(const char*)*/ byte* key, /*(size_t)*/ ulong klen, /*(const char*)*/ byte* val, /*(size_t)*/ ulong vlen); @@ -566,9 +582,15 @@ public abstract void rocksdb_writebatch_wi_mergev_cf( int num_keys, /*(const char* const*)*/ IntPtr keys_list, /*(const size_t*)*/ IntPtr keys_list_sizes, int num_values, /*(const char* const*)*/ IntPtr values_list, /*(const size_t*)*/ IntPtr values_list_sizes); +public abstract void rocksdb_writebatch_wi_delete(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, + /*(const char*)*/ byte[] key, + /*(size_t)*/ ulong klen); public abstract unsafe void rocksdb_writebatch_wi_delete(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(const char*)*/ byte* key, /*(size_t)*/ ulong klen); +public abstract void rocksdb_writebatch_wi_delete_cf( + /*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family, + /*(const char*)*/ byte[] key, /*(size_t)*/ ulong klen); public abstract unsafe void rocksdb_writebatch_wi_delete_cf( /*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family, /*(const char*)*/ byte* key, /*(size_t)*/ ulong klen); @@ -578,9 +600,16 @@ public abstract void rocksdb_writebatch_wi_deletev( public abstract void rocksdb_writebatch_wi_deletev_cf( /*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family, int num_keys, /*(const char* const*)*/ IntPtr keys_list, /*(const size_t*)*/ IntPtr keys_list_sizes); +public abstract void rocksdb_writebatch_wi_delete_range( + /*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(const char*)*/ byte[] start_key, /*(size_t)*/ ulong start_key_len, + /*(const char*)*/ byte[] end_key, /*(size_t)*/ ulong end_key_len); public abstract unsafe void rocksdb_writebatch_wi_delete_range( /*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(const char*)*/ byte* start_key, /*(size_t)*/ ulong start_key_len, /*(const char*)*/ byte* end_key, /*(size_t)*/ ulong end_key_len); +public abstract void rocksdb_writebatch_wi_delete_range_cf( + /*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family, + /*(const char*)*/ byte[] start_key, /*(size_t)*/ ulong start_key_len, /*(const char*)*/ byte[] end_key, + /*(size_t)*/ ulong end_key_len); public abstract unsafe void rocksdb_writebatch_wi_delete_range_cf( /*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family, /*(const char*)*/ byte* start_key, /*(size_t)*/ ulong start_key_len, /*(const char*)*/ byte* end_key, @@ -594,53 +623,83 @@ public abstract void rocksdb_writebatch_wi_delete_rangev_cf( int num_keys, /*(const char* const*)*/ IntPtr start_keys_list, /*(const size_t*)*/ IntPtr start_keys_list_sizes, /*(const char* const*)*/ IntPtr end_keys_list, /*(const size_t*)*/ IntPtr end_keys_list_sizes); +public abstract void rocksdb_writebatch_wi_put_log_data( + /*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(const char*)*/ byte[] blob, /*(size_t)*/ ulong len); public abstract void rocksdb_writebatch_wi_put_log_data( /*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(const char*)*/ IntPtr blob, /*(size_t)*/ ulong len); public abstract void rocksdb_writebatch_wi_iterate( /*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(void*)*/ IntPtr state, - /*(void (*put)(void*, const char* k, size_t klen, const char* v, size_t vlen))*/ IntPtr put, - /*(void (*deleted)(void*, const char* k, size_t klen))*/ IntPtr deleted); + /*(void (*put)(void*, const char* k, size_t klen, const char* v, size_t vlen))*/ WriteBatchIteratePutCallback put, + /*(void (*deleted)(void*, const char* k, size_t klen))*/ WriteBatchIterateDeleteCallback deleted); public abstract /*(const char*)*/ IntPtr rocksdb_writebatch_wi_data( /*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(size_t*)*/ out ulong size); public abstract void rocksdb_writebatch_wi_set_save_point( /*(rocksdb_writebatch_wi_t*)*/ IntPtr b); public abstract void rocksdb_writebatch_wi_rollback_to_save_point( - /*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(char**)*/ IntPtr errptr); + /*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(char**)*/ out IntPtr errptr); +public abstract /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch( + /*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi, + /*(const rocksdb_options_t*)*/ IntPtr options, + /*(const char*)*/ byte[] key, /*(size_t)*/ ulong keylen, + /*(size_t*)*/ out ulong vallen, + /*(char**)*/ out IntPtr errptr); public abstract unsafe /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch( /*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi, /*(const rocksdb_options_t*)*/ IntPtr options, /*(const char*)*/ byte* key, /*(size_t)*/ ulong keylen, /*(size_t*)*/ out ulong vallen, - /*(char**)*/ IntPtr errptr); + /*(char**)*/ out IntPtr errptr); +public abstract /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch_cf( + /*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi, + /*(const rocksdb_options_t*)*/ IntPtr options, + /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family, + /*(const char*)*/ byte[] key, /*(size_t)*/ ulong keylen, + /*(size_t*)*/ out ulong vallen, + /*(char**)*/ out IntPtr errptr); public abstract unsafe /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch_cf( /*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi, /*(const rocksdb_options_t*)*/ IntPtr options, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family, /*(const char*)*/ byte* key, /*(size_t)*/ ulong keylen, /*(size_t*)*/ out ulong vallen, - /*(char**)*/ IntPtr errptr); + /*(char**)*/ out IntPtr errptr); +public abstract /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch_and_db( + /*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi, + /*(rocksdb_t*)*/ IntPtr db, + /*(const rocksdb_readoptions_t*)*/ IntPtr read_options, + /*(const char*)*/ byte[] key, /*(size_t)*/ ulong keylen, + /*(size_t*)*/ out ulong vallen, + /*(char**)*/ out IntPtr errptr); public abstract unsafe /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch_and_db( /*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi, /*(rocksdb_t*)*/ IntPtr db, - /*(const rocksdb_readoptions_t*)*/ IntPtr options, + /*(const rocksdb_readoptions_t*)*/ IntPtr read_options, /*(const char*)*/ byte* key, /*(size_t)*/ ulong keylen, /*(size_t*)*/ out ulong vallen, - /*(char**)*/ IntPtr errptr); + /*(char**)*/ out IntPtr errptr); +public abstract /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch_and_db_cf( + /*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi, + /*(rocksdb_t*)*/ IntPtr db, + /*(const rocksdb_readoptions_t*)*/ IntPtr read_options, + /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family, + /*(const char*)*/ byte[] key, /*(size_t)*/ ulong keylen, + /*(size_t*)*/ out ulong vallen, + /*(char**)*/ out IntPtr errptr); public abstract unsafe /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch_and_db_cf( /*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi, /*(rocksdb_t*)*/ IntPtr db, - /*(const rocksdb_readoptions_t*)*/ IntPtr options, + /*(const rocksdb_readoptions_t*)*/ IntPtr read_options, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family, /*(const char*)*/ byte* key, /*(size_t)*/ ulong keylen, /*(size_t*)*/ out ulong vallen, - /*(char**)*/ IntPtr errptr); + /*(char**)*/ out IntPtr errptr); public abstract void rocksdb_write_writebatch_wi( /*(rocksdb_t*)*/ IntPtr db, - /*(const rocksdb_writeoptions_t*)*/ IntPtr options, + /*(const rocksdb_writeoptions_t*)*/ IntPtr write_options, /*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi, - /*(char**)*/ IntPtr errptr); + /*(char**)*/ out IntPtr errptr); public abstract /*(rocksdb_iterator_t*)*/ IntPtr rocksdb_writebatch_wi_create_iterator_with_base( /*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi, /*(rocksdb_iterator_t*)*/ IntPtr base_iterator); diff --git a/RocksDbSharp/Native.Wrap.cs b/RocksDbSharp/Native.Wrap.cs index 45a1499..32eb951 100644 --- a/RocksDbSharp/Native.Wrap.cs +++ b/RocksDbSharp/Native.Wrap.cs @@ -212,6 +212,16 @@ public void rocksdb_write( throw new RocksDbException(errptr); } + public void rocksdb_write_writebatch_wi( + /*rocksdb_t**/ IntPtr db, + /*const rocksdb_writeoptions_t**/ IntPtr writeOptions, + /*(rocksdb_writebatch_wi_t*)*/ IntPtr writeBatchWithIndex) + { + rocksdb_write_writebatch_wi(db, writeOptions, writeBatchWithIndex, out IntPtr errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + public byte[] rocksdb_iter_key(IntPtr iterator) { IntPtr buffer = rocksdb_iter_key(iterator, out ulong length); @@ -281,6 +291,13 @@ public void rocksdb_writebatch_rollback_to_save_point(IntPtr writeBatch) throw new RocksDbException(errptr); } + public void rocksdb_writebatch_wi_rollback_to_save_point(IntPtr writeBatch) + { + rocksdb_writebatch_wi_rollback_to_save_point(writeBatch, out IntPtr errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + public void rocksdb_ingest_external_file(IntPtr db, string[] file_list, ulong list_len, IntPtr opt) { rocksdb_ingest_external_file(db, file_list, list_len, opt, out IntPtr errptr); @@ -330,5 +347,92 @@ public unsafe void rocksdb_sstfilewriter_add( if (errptr != IntPtr.Zero) throw new RocksDbException(errptr); } + + public string rocksdb_writebatch_wi_get_from_batch( + IntPtr wb, + IntPtr options, + string key, + ColumnFamilyHandle cf, + Encoding encoding = null) + { + var result = rocksdb_writebatch_wi_get_from_batch(wb, options, key, out IntPtr errptr, cf, encoding); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + return result; + } + + public IntPtr rocksdb_writebatch_wi_get_from_batch( + IntPtr wb, + IntPtr options, + byte[] key, + ulong keyLength, + out ulong vallen, + ColumnFamilyHandle cf) + { + var result = cf == null + ? rocksdb_writebatch_wi_get_from_batch(wb, options, key, keyLength, out vallen, out IntPtr errptr) + : rocksdb_writebatch_wi_get_from_batch_cf(wb, options, cf.Handle, key, keyLength, out vallen, out errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + return result; + } + + public byte[] rocksdb_writebatch_wi_get_from_batch( + IntPtr wb, + IntPtr options, + byte[] key, + ulong keyLength = 0, + ColumnFamilyHandle cf = null) + { + var result = rocksdb_writebatch_wi_get_from_batch(wb, options, key, keyLength == 0 ? (ulong)key.Length : keyLength, out IntPtr errptr, cf); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + return result; + } + + public string rocksdb_writebatch_wi_get_from_batch_and_db( + IntPtr wb, + IntPtr db, + IntPtr read_options, + string key, + ColumnFamilyHandle cf, + Encoding encoding = null) + { + var result = rocksdb_writebatch_wi_get_from_batch_and_db(wb, db, read_options, key, out IntPtr errptr, cf, encoding); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + return result; + } + + public IntPtr rocksdb_writebatch_wi_get_from_batch_and_db( + IntPtr wb, + IntPtr db, + IntPtr read_options, + byte[] key, + ulong keyLength, + out ulong vallen, + ColumnFamilyHandle cf) + { + var result = cf == null + ? rocksdb_writebatch_wi_get_from_batch_and_db(wb, db, read_options, key, keyLength, out vallen, out IntPtr errptr) + : rocksdb_writebatch_wi_get_from_batch_and_db_cf(wb, db, read_options, cf.Handle, key, keyLength, out vallen, out errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + return result; + } + + public byte[] rocksdb_writebatch_wi_get_from_batch_and_db( + IntPtr wb, + IntPtr db, + IntPtr read_options, + byte[] key, + ulong keyLength = 0, + ColumnFamilyHandle cf = null) + { + var result = rocksdb_writebatch_wi_get_from_batch_and_db(wb, db, read_options, key, keyLength == 0 ? (ulong)key.Length : keyLength, out IntPtr errptr, cf); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + return result; + } } } diff --git a/RocksDbSharp/RocksDb.cs b/RocksDbSharp/RocksDb.cs index c085a5f..1bdac6f 100644 --- a/RocksDbSharp/RocksDb.cs +++ b/RocksDbSharp/RocksDb.cs @@ -9,10 +9,11 @@ namespace RocksDbSharp { public class RocksDb : IDisposable - { - private ReadOptions defaultReadOptions; - private WriteOptions defaultWriteOptions; - private Encoding defaultEncoding; + { + internal static ReadOptions DefaultReadOptions { get; } = new ReadOptions(); + internal static OptionsHandle DefaultOptions { get; } = new DbOptions(); + internal static WriteOptions DefaultWriteOptions { get; } = new WriteOptions(); + internal static Encoding DefaultEncoding => Encoding.UTF8; private Dictionary columnFamilies; // Managed references to unmanaged resources that need to live at least as long as the db @@ -25,9 +26,6 @@ private RocksDb(IntPtr handle, dynamic optionsReferences, dynamic cfOptionsRefs, this.Handle = handle; References.Options = optionsReferences; References.CfOptions = cfOptionsRefs; - defaultReadOptions = new ReadOptions(); - defaultWriteOptions = new WriteOptions(); - defaultEncoding = Encoding.UTF8; this.columnFamilies = columnFamilies; } @@ -92,7 +90,7 @@ public void SetOptions(IEnumerable> options) public string Get(string key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null, Encoding encoding = null) { - return Native.Instance.rocksdb_get(Handle, (readOptions ?? defaultReadOptions).Handle, key, cf, encoding ?? defaultEncoding); + return Native.Instance.rocksdb_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, cf, encoding ?? DefaultEncoding); } public byte[] Get(byte[] key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) @@ -102,7 +100,7 @@ public byte[] Get(byte[] key, ColumnFamilyHandle cf = null, ReadOptions readOpti public byte[] Get(byte[] key, long keyLength, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) { - return Native.Instance.rocksdb_get(Handle, (readOptions ?? defaultReadOptions).Handle, key, keyLength, cf); + return Native.Instance.rocksdb_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, keyLength, cf); } public long Get(byte[] key, byte[] buffer, long offset, long length, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) @@ -114,7 +112,7 @@ public long Get(byte[] key, long keyLength, byte[] buffer, long offset, long len { unsafe { - var ptr = Native.Instance.rocksdb_get(Handle, (readOptions ?? defaultReadOptions).Handle, key, keyLength, out long valLength, cf); + var ptr = Native.Instance.rocksdb_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, keyLength, out long valLength, cf); valLength = Math.Min(length, valLength); Marshal.Copy(ptr, buffer, (int)offset, (int)valLength); Native.Instance.rocksdb_free(ptr); @@ -124,22 +122,27 @@ public long Get(byte[] key, long keyLength, byte[] buffer, long offset, long len public KeyValuePair[] MultiGet(byte[][] keys, ColumnFamilyHandle[] cf = null, ReadOptions readOptions = null) { - return Native.Instance.rocksdb_multi_get(Handle, (readOptions ?? defaultReadOptions).Handle, keys); + return Native.Instance.rocksdb_multi_get(Handle, (readOptions ?? DefaultReadOptions).Handle, keys); } public KeyValuePair[] MultiGet(string[] keys, ColumnFamilyHandle[] cf = null, ReadOptions readOptions = null) { - return Native.Instance.rocksdb_multi_get(Handle, (readOptions ?? defaultReadOptions).Handle, keys); + return Native.Instance.rocksdb_multi_get(Handle, (readOptions ?? DefaultReadOptions).Handle, keys); } public void Write(WriteBatch writeBatch, WriteOptions writeOptions = null) { - Native.Instance.rocksdb_write(Handle, (writeOptions ?? defaultWriteOptions).Handle, writeBatch.Handle); + Native.Instance.rocksdb_write(Handle, (writeOptions ?? DefaultWriteOptions).Handle, writeBatch.Handle); + } + + public void Write(WriteBatchWithIndex writeBatch, WriteOptions writeOptions = null) + { + Native.Instance.rocksdb_write_writebatch_wi(Handle, (writeOptions ?? DefaultWriteOptions).Handle, writeBatch.Handle); } public void Remove(string key, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) { - Native.Instance.rocksdb_delete(Handle, (writeOptions ?? defaultWriteOptions).Handle, key, cf); + Native.Instance.rocksdb_delete(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, cf); } public void Remove(byte[] key, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) @@ -149,12 +152,12 @@ public void Remove(byte[] key, ColumnFamilyHandle cf = null, WriteOptions writeO public void Remove(byte[] key, long keyLength, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) { - Native.Instance.rocksdb_delete(Handle, (writeOptions ?? defaultWriteOptions).Handle, key, keyLength); + Native.Instance.rocksdb_delete(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, keyLength); } public void Put(string key, string value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null, Encoding encoding = null) { - Native.Instance.rocksdb_put(Handle, (writeOptions ?? defaultWriteOptions).Handle, key, value, cf, encoding ?? defaultEncoding); + Native.Instance.rocksdb_put(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, value, cf, encoding ?? DefaultEncoding); } public void Put(byte[] key, byte[] value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) @@ -164,14 +167,14 @@ public void Put(byte[] key, byte[] value, ColumnFamilyHandle cf = null, WriteOpt public void Put(byte[] key, long keyLength, byte[] value, long valueLength, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) { - Native.Instance.rocksdb_put(Handle, (writeOptions ?? defaultWriteOptions).Handle, key, keyLength, value, valueLength, cf); + Native.Instance.rocksdb_put(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, keyLength, value, valueLength, cf); } public Iterator NewIterator(ColumnFamilyHandle cf = null, ReadOptions readOptions = null) { IntPtr iteratorHandle = cf == null - ? Native.Instance.rocksdb_create_iterator(Handle, (readOptions ?? defaultReadOptions).Handle) - : Native.Instance.rocksdb_create_iterator_cf(Handle, (readOptions ?? defaultReadOptions).Handle, cf.Handle); + ? Native.Instance.rocksdb_create_iterator(Handle, (readOptions ?? DefaultReadOptions).Handle) + : Native.Instance.rocksdb_create_iterator_cf(Handle, (readOptions ?? DefaultReadOptions).Handle, cf.Handle); // Note: passing in read options here only to ensure that it is not collected before the iterator return new Iterator(iteratorHandle, readOptions); } diff --git a/RocksDbSharp/WriteBatch.cs b/RocksDbSharp/WriteBatch.cs index 5c4dec4..4ad0f95 100644 --- a/RocksDbSharp/WriteBatch.cs +++ b/RocksDbSharp/WriteBatch.cs @@ -3,7 +3,39 @@ namespace RocksDbSharp { - public class WriteBatch : IDisposable + public interface IWriteBatch : IDisposable + { + IntPtr Handle { get; } + IWriteBatch Clear(); + int Count(); + IWriteBatch Put(string key, string val, Encoding encoding = null); + IWriteBatch Put(byte[] key, byte[] val, ColumnFamilyHandle cf = null); + IWriteBatch Put(byte[] key, ulong klen, byte[] val, ulong vlen, ColumnFamilyHandle cf = null); + unsafe void Put(byte* key, ulong klen, byte* val, ulong vlen, ColumnFamilyHandle cf = null); + IWriteBatch Putv(int numKeys, IntPtr keysList, IntPtr keysListSizes, int numValues, IntPtr valuesList, IntPtr valuesListSizes); + IWriteBatch PutvCf(IntPtr columnFamily, int numKeys, IntPtr keysList, IntPtr keysListSizes, int numValues, IntPtr valuesList, IntPtr valuesListSizes); + IWriteBatch Merge(byte[] key, ulong klen, byte[] val, ulong vlen, ColumnFamilyHandle cf = null); + unsafe void Merge(byte* key, ulong klen, byte* val, ulong vlen, ColumnFamilyHandle cf = null); + IWriteBatch MergeCf(IntPtr columnFamily, byte[] key, ulong klen, byte[] val, ulong vlen); + unsafe void MergeCf(IntPtr columnFamily, byte* key, ulong klen, byte* val, ulong vlen); + IWriteBatch Mergev(int numKeys, IntPtr keysList, IntPtr keysListSizes, int numValues, IntPtr valuesList, IntPtr valuesListSizes); + IWriteBatch MergevCf(IntPtr columnFamily, int numKeys, IntPtr keysList, IntPtr keysListSizes, int numValues, IntPtr valuesList, IntPtr valuesListSizes); + IWriteBatch Delete(byte[] key, ColumnFamilyHandle cf = null); + IWriteBatch Delete(byte[] key, ulong klen, ColumnFamilyHandle cf = null); + unsafe void Delete(byte* key, ulong klen, ColumnFamilyHandle cf = null); + unsafe void Deletev(int numKeys, IntPtr keysList, IntPtr keysListSizes, ColumnFamilyHandle cf = null); + IWriteBatch DeleteRange(byte[] startKey, ulong sklen, byte[] endKey, ulong eklen, ColumnFamilyHandle cf = null); + unsafe void DeleteRange(byte* startKey, ulong sklen, byte* endKey, ulong eklen, ColumnFamilyHandle cf = null); + unsafe void DeleteRangev(int numKeys, IntPtr startKeysList, IntPtr startKeysListSizes, IntPtr endKeysList, IntPtr endKeysListSizes, ColumnFamilyHandle cf = null); + IWriteBatch PutLogData(byte[] blob, ulong len); + IWriteBatch Iterate(IntPtr state, WriteBatchIteratePutCallback put, WriteBatchIterateDeleteCallback deleted); + byte[] ToBytes(); + byte[] ToBytes(byte[] buffer, int offset = 0, int size = -1); + void SetSavePoint(); + void RollbackToSavePoint(); + } + + public class WriteBatch : IWriteBatch, IDisposable { private IntPtr handle; private Encoding defaultEncoding = Encoding.UTF8; @@ -230,5 +262,36 @@ public void RollbackToSavePoint() { Native.Instance.rocksdb_writebatch_rollback_to_save_point(handle); } + + IWriteBatch IWriteBatch.Clear() + => Clear(); + IWriteBatch IWriteBatch.Put(string key, string val, Encoding encoding) + => Put(key, val, encoding); + IWriteBatch IWriteBatch.Put(byte[] key, byte[] val, ColumnFamilyHandle cf) + => Put(key, val, cf); + IWriteBatch IWriteBatch.Put(byte[] key, ulong klen, byte[] val, ulong vlen, ColumnFamilyHandle cf) + => Put(key, klen, val, vlen, cf); + IWriteBatch IWriteBatch.Putv(int numKeys, IntPtr keysList, IntPtr keysListSizes, int numValues, IntPtr valuesList, IntPtr valuesListSizes) + => Putv(numKeys, keysList, keysListSizes, numValues, valuesList, valuesListSizes); + IWriteBatch IWriteBatch.PutvCf(IntPtr columnFamily, int numKeys, IntPtr keysList, IntPtr keysListSizes, int numValues, IntPtr valuesList, IntPtr valuesListSizes) + => PutvCf(columnFamily, numKeys, keysList, keysListSizes, numValues, valuesList, valuesListSizes); + IWriteBatch IWriteBatch.Merge(byte[] key, ulong klen, byte[] val, ulong vlen, ColumnFamilyHandle cf) + => Merge(key, klen, val, vlen, cf); + IWriteBatch IWriteBatch.MergeCf(IntPtr columnFamily, byte[] key, ulong klen, byte[] val, ulong vlen) + => MergeCf(columnFamily, key, klen, val, vlen); + IWriteBatch IWriteBatch.Mergev(int numKeys, IntPtr keysList, IntPtr keysListSizes, int numValues, IntPtr valuesList, IntPtr valuesListSizes) + => Mergev(numKeys, keysList, keysListSizes, numValues, valuesList, valuesListSizes); + IWriteBatch IWriteBatch.MergevCf(IntPtr columnFamily, int numKeys, IntPtr keysList, IntPtr keysListSizes, int numValues, IntPtr valuesList, IntPtr valuesListSizes) + => MergevCf(columnFamily, numKeys, keysList, keysListSizes, numValues, valuesList, valuesListSizes); + IWriteBatch IWriteBatch.Delete(byte[] key, ColumnFamilyHandle cf) + => Delete(key, cf); + IWriteBatch IWriteBatch.Delete(byte[] key, ulong klen, ColumnFamilyHandle cf) + => Delete(key, klen, cf); + IWriteBatch IWriteBatch.DeleteRange(byte[] startKey, ulong sklen, byte[] endKey, ulong eklen, ColumnFamilyHandle cf) + => DeleteRange(startKey, sklen, endKey, eklen, cf); + IWriteBatch IWriteBatch.PutLogData(byte[] blob, ulong len) + => PutLogData(blob, len); + IWriteBatch IWriteBatch.Iterate(IntPtr state, WriteBatchIteratePutCallback put, WriteBatchIterateDeleteCallback deleted) + => Iterate(state, put, deleted); } } diff --git a/RocksDbSharp/WriteBatchWithIndex.cs b/RocksDbSharp/WriteBatchWithIndex.cs new file mode 100644 index 0000000..1d6c45a --- /dev/null +++ b/RocksDbSharp/WriteBatchWithIndex.cs @@ -0,0 +1,351 @@ +using System; +using System.Collections.Generic; +using System.Runtime.InteropServices; +using System.Text; +using Transitional; + +namespace RocksDbSharp +{ + public class WriteBatchWithIndex : IWriteBatch + { + private IntPtr handle; + private Encoding defaultEncoding = Encoding.UTF8; + + public WriteBatchWithIndex(ulong reservedBytes = 0, bool overwriteKeys = true) + : this(Native.Instance.rocksdb_writebatch_wi_create(reservedBytes, overwriteKeys)) + { + } + + public WriteBatchWithIndex(byte[] rep, long size = -1) + : this(Native.Instance.rocksdb_writebatch_wi_create_from(rep, (ulong)(size < 0 ? rep.Length : size))) + { + } + + private WriteBatchWithIndex(IntPtr handle) + { + this.handle = handle; + } + + public IntPtr Handle { get { return handle; } } + + public void Dispose() + { + if (handle != IntPtr.Zero) + { +#if !NODESTROY + Native.Instance.rocksdb_writebatch_wi_destroy(handle); +#endif + handle = IntPtr.Zero; + } + } + + public WriteBatchWithIndex Clear() + { + Native.Instance.rocksdb_writebatch_wi_clear(handle); + return this; + } + + public int Count() + { + return Native.Instance.rocksdb_writebatch_wi_count(handle); + } + + public Iterator CreateIteratorWithBase(Iterator baseIterator, ColumnFamilyHandle cf = null) + { + var handle = cf == null + ? Native.Instance.rocksdb_writebatch_wi_create_iterator_with_base(Handle, baseIterator.Handle) + : Native.Instance.rocksdb_writebatch_wi_create_iterator_with_base_cf(Handle, baseIterator.Handle, cf.Handle); + return new Iterator(handle); + } + + public string Get(string key, ColumnFamilyHandle cf = null, OptionsHandle options = null, Encoding encoding = null) + { + return Native.Instance.rocksdb_writebatch_wi_get_from_batch(Handle, (options ?? RocksDb.DefaultOptions).Handle, key, cf, encoding ?? defaultEncoding); + } + + public byte[] Get(byte[] key, ColumnFamilyHandle cf = null, OptionsHandle options = null) + { + return Get(key, (ulong)key.GetLongLength(0), cf, options); + } + + public byte[] Get(byte[] key, ulong keyLength, ColumnFamilyHandle cf = null, OptionsHandle options = null) + { + return Native.Instance.rocksdb_writebatch_wi_get_from_batch(Handle, (options ?? RocksDb.DefaultOptions).Handle, key, keyLength, cf); + } + + public ulong Get(byte[] key, byte[] buffer, ulong offset, ulong length, ColumnFamilyHandle cf = null, OptionsHandle options = null) + { + return Get(key, (ulong)key.GetLongLength(0), buffer, offset, length, cf, options); + } + + public ulong Get(byte[] key, ulong keyLength, byte[] buffer, ulong offset, ulong length, ColumnFamilyHandle cf = null, OptionsHandle options = null) + { + unsafe + { + var ptr = Native.Instance.rocksdb_writebatch_wi_get_from_batch(Handle, (options ?? RocksDb.DefaultOptions).Handle, key, keyLength, out ulong valLength, cf); + valLength = Math.Min(length, valLength); + Marshal.Copy(ptr, buffer, (int)offset, (int)valLength); + Native.Instance.rocksdb_free(ptr); + return valLength; + } + } + + public string Get(RocksDb db, string key, ColumnFamilyHandle cf = null, ReadOptions options = null, Encoding encoding = null) + { + return Native.Instance.rocksdb_writebatch_wi_get_from_batch_and_db(Handle, db.Handle, (options ?? RocksDb.DefaultReadOptions).Handle, key, cf, encoding ?? defaultEncoding); + } + + public byte[] Get(RocksDb db, byte[] key, ColumnFamilyHandle cf = null, ReadOptions options = null) + { + return Get(db, key, (ulong)key.GetLongLength(0), cf, options); + } + + public byte[] Get(RocksDb db, byte[] key, ulong keyLength, ColumnFamilyHandle cf = null, ReadOptions options = null) + { + return Native.Instance.rocksdb_writebatch_wi_get_from_batch_and_db(Handle, db.Handle, (options ?? RocksDb.DefaultReadOptions).Handle, key, keyLength, cf); + } + + public ulong Get(RocksDb db, byte[] key, byte[] buffer, ulong offset, ulong length, ColumnFamilyHandle cf = null, ReadOptions options = null) + { + return Get(db, key, (ulong)key.GetLongLength(0), buffer, offset, length, cf, options); + } + + public ulong Get(RocksDb db, byte[] key, ulong keyLength, byte[] buffer, ulong offset, ulong length, ColumnFamilyHandle cf = null, ReadOptions options = null) + { + unsafe + { + var ptr = Native.Instance.rocksdb_writebatch_wi_get_from_batch_and_db(Handle, db.Handle, (options ?? RocksDb.DefaultReadOptions).Handle, key, keyLength, out ulong valLength, cf); + valLength = Math.Min(length, valLength); + Marshal.Copy(ptr, buffer, (int)offset, (int)valLength); + Native.Instance.rocksdb_free(ptr); + return valLength; + } + } + + public Iterator NewIterator(Iterator baseIterator, ColumnFamilyHandle cf = null) + { + IntPtr iteratorHandle = cf == null + ? Native.Instance.rocksdb_writebatch_wi_create_iterator_with_base(Handle, baseIterator.Handle) + : Native.Instance.rocksdb_writebatch_wi_create_iterator_with_base_cf(Handle, baseIterator.Handle, cf.Handle); + baseIterator.Detach(); + // Note: passing in base iterator here only to ensure that it is not collected before the iterator + return new Iterator(iteratorHandle); + } + + public WriteBatchWithIndex Put(string key, string val, Encoding encoding = null) + { + if (encoding == null) + encoding = defaultEncoding; + Native.Instance.rocksdb_writebatch_wi_put(handle, key, val, encoding); + return this; + } + + public WriteBatchWithIndex Put(byte[] key, byte[] val, ColumnFamilyHandle cf = null) + { + return Put(key, (ulong)key.Length, val, (ulong)val.Length, cf); + } + + public WriteBatchWithIndex Put(byte[] key, ulong klen, byte[] val, ulong vlen, ColumnFamilyHandle cf = null) + { + if (cf == null) + Native.Instance.rocksdb_writebatch_wi_put(handle, key, klen, val, vlen); + else + Native.Instance.rocksdb_writebatch_wi_put_cf(handle, cf.Handle, key, klen, val, vlen); + return this; + } + + public unsafe void Put(byte* key, ulong klen, byte* val, ulong vlen, ColumnFamilyHandle cf = null) + { + if (cf == null) + Native.Instance.rocksdb_writebatch_wi_put(handle, key, klen, val, vlen); + else + Native.Instance.rocksdb_writebatch_wi_put_cf(handle, cf.Handle, key, klen, val, vlen); + } + + public WriteBatchWithIndex Putv(int numKeys, IntPtr keysList, IntPtr keysListSizes, int numValues, IntPtr valuesList, IntPtr valuesListSizes) + { + Native.Instance.rocksdb_writebatch_wi_putv(handle, numKeys, keysList, keysListSizes, numValues, valuesList, valuesListSizes); + return this; + } + + public WriteBatchWithIndex PutvCf(IntPtr columnFamily, int numKeys, IntPtr keysList, IntPtr keysListSizes, int numValues, IntPtr valuesList, IntPtr valuesListSizes) + { + Native.Instance.rocksdb_writebatch_wi_putv_cf(handle, columnFamily, numKeys, keysList, keysListSizes, numValues, valuesList, valuesListSizes); + return this; + } + + public WriteBatchWithIndex Merge(byte[] key, ulong klen, byte[] val, ulong vlen, ColumnFamilyHandle cf = null) + { + if (cf == null) + Native.Instance.rocksdb_writebatch_wi_merge(handle, key, klen, val, vlen); + else + Native.Instance.rocksdb_writebatch_wi_merge_cf(handle, cf.Handle, key, klen, val, vlen); + return this; + } + + public unsafe void Merge(byte* key, ulong klen, byte* val, ulong vlen, ColumnFamilyHandle cf = null) + { + if (cf == null) + Native.Instance.rocksdb_writebatch_wi_merge(handle, key, klen, val, vlen); + else + Native.Instance.rocksdb_writebatch_wi_merge_cf(handle, cf.Handle, key, klen, val, vlen); + } + + public WriteBatchWithIndex MergeCf(IntPtr columnFamily, byte[] key, ulong klen, byte[] val, ulong vlen) + { + Native.Instance.rocksdb_writebatch_wi_merge_cf(handle, columnFamily, key, klen, val, vlen); + return this; + } + + public unsafe void MergeCf(IntPtr columnFamily, byte* key, ulong klen, byte* val, ulong vlen) + { + Native.Instance.rocksdb_writebatch_wi_merge_cf(handle, columnFamily, key, klen, val, vlen); + } + + public WriteBatchWithIndex Mergev(int numKeys, IntPtr keysList, IntPtr keysListSizes, int numValues, IntPtr valuesList, IntPtr valuesListSizes) + { + Native.Instance.rocksdb_writebatch_wi_mergev(handle, numKeys, keysList, keysListSizes, numValues, valuesList, valuesListSizes); + return this; + } + + public WriteBatchWithIndex MergevCf(IntPtr columnFamily, int numKeys, IntPtr keysList, IntPtr keysListSizes, int numValues, IntPtr valuesList, IntPtr valuesListSizes) + { + Native.Instance.rocksdb_writebatch_wi_mergev_cf(handle, columnFamily, numKeys, keysList, keysListSizes, numValues, valuesList, valuesListSizes); + return this; + } + + public WriteBatchWithIndex Delete(byte[] key, ColumnFamilyHandle cf = null) + { + return Delete(key, (ulong)key.Length, cf); + } + + public WriteBatchWithIndex Delete(byte[] key, ulong klen, ColumnFamilyHandle cf = null) + { + if (cf == null) + Native.Instance.rocksdb_writebatch_wi_delete(handle, key, klen); + else + Native.Instance.rocksdb_writebatch_wi_delete_cf(handle, cf.Handle, key, klen); + return this; + } + + public unsafe void Delete(byte* key, ulong klen, ColumnFamilyHandle cf = null) + { + if (cf == null) + Native.Instance.rocksdb_writebatch_wi_delete(handle, key, klen); + else + Native.Instance.rocksdb_writebatch_wi_delete_cf(handle, cf.Handle, key, klen); + } + + public unsafe void Deletev(int numKeys, IntPtr keysList, IntPtr keysListSizes, ColumnFamilyHandle cf = null) + { + if (cf == null) + Native.Instance.rocksdb_writebatch_wi_deletev(handle, numKeys, keysList, keysListSizes); + else + Native.Instance.rocksdb_writebatch_wi_deletev_cf(handle, cf.Handle, numKeys, keysList, keysListSizes); + } + + public WriteBatchWithIndex DeleteRange(byte[] startKey, ulong sklen, byte[] endKey, ulong eklen, ColumnFamilyHandle cf = null) + { + if (cf == null) + Native.Instance.rocksdb_writebatch_wi_delete_range(handle, startKey, sklen, endKey, eklen); + else + Native.Instance.rocksdb_writebatch_wi_delete_range_cf(handle, cf.Handle, startKey, sklen, endKey, eklen); + return this; + } + + public unsafe void DeleteRange(byte* startKey, ulong sklen, byte* endKey, ulong eklen, ColumnFamilyHandle cf = null) + { + if (cf == null) + Native.Instance.rocksdb_writebatch_wi_delete_range(handle, startKey, sklen, endKey, eklen); + else + Native.Instance.rocksdb_writebatch_wi_delete_range_cf(handle, cf.Handle, startKey, sklen, endKey, eklen); + } + + public unsafe void DeleteRangev(int numKeys, IntPtr startKeysList, IntPtr startKeysListSizes, IntPtr endKeysList, IntPtr endKeysListSizes, ColumnFamilyHandle cf = null) + { + if (cf == null) + Native.Instance.rocksdb_writebatch_wi_delete_rangev(handle, numKeys, startKeysList, startKeysListSizes, endKeysList, endKeysListSizes); + else + Native.Instance.rocksdb_writebatch_wi_delete_rangev_cf(handle, cf.Handle, numKeys, startKeysList, startKeysListSizes, endKeysList, endKeysListSizes); + } + + public WriteBatchWithIndex PutLogData(byte[] blob, ulong len) + { + Native.Instance.rocksdb_writebatch_wi_put_log_data(handle, blob, len); + return this; + } + + public WriteBatchWithIndex Iterate(IntPtr state, WriteBatchIteratePutCallback put, WriteBatchIterateDeleteCallback deleted) + { + Native.Instance.rocksdb_writebatch_wi_iterate(handle, state, put, deleted); + return this; + } + + /// + /// Get the write batch as bytes + /// + /// + public byte[] ToBytes() + { + return Native.Instance.rocksdb_writebatch_wi_data(handle); + } + + /// + /// Get the write batch as bytes + /// + /// + /// + /// + /// null if size was not large enough to hold the data + public byte[] ToBytes(byte[] buffer, int offset = 0, int size = -1) + { + if (size < 0) + size = buffer.Length; + if (Native.Instance.rocksdb_writebatch_wi_data(handle, buffer, 0, size) > 0) + return buffer; + return null; + } + + public void SetSavePoint() + { + Native.Instance.rocksdb_writebatch_wi_set_save_point(handle); + } + + public void RollbackToSavePoint() + { + Native.Instance.rocksdb_writebatch_wi_rollback_to_save_point(handle); + } + + + IWriteBatch IWriteBatch.Clear() + => Clear(); + IWriteBatch IWriteBatch.Put(string key, string val, Encoding encoding) + => Put(key, val, encoding); + IWriteBatch IWriteBatch.Put(byte[] key, byte[] val, ColumnFamilyHandle cf) + => Put(key, val, cf); + IWriteBatch IWriteBatch.Put(byte[] key, ulong klen, byte[] val, ulong vlen, ColumnFamilyHandle cf) + => Put(key, klen, val, vlen, cf); + IWriteBatch IWriteBatch.Putv(int numKeys, IntPtr keysList, IntPtr keysListSizes, int numValues, IntPtr valuesList, IntPtr valuesListSizes) + => Putv(numKeys, keysList, keysListSizes, numValues, valuesList, valuesListSizes); + IWriteBatch IWriteBatch.PutvCf(IntPtr columnFamily, int numKeys, IntPtr keysList, IntPtr keysListSizes, int numValues, IntPtr valuesList, IntPtr valuesListSizes) + => PutvCf(columnFamily, numKeys, keysList, keysListSizes, numValues, valuesList, valuesListSizes); + IWriteBatch IWriteBatch.Merge(byte[] key, ulong klen, byte[] val, ulong vlen, ColumnFamilyHandle cf) + => Merge(key, klen, val, vlen, cf); + IWriteBatch IWriteBatch.MergeCf(IntPtr columnFamily, byte[] key, ulong klen, byte[] val, ulong vlen) + => MergeCf(columnFamily, key, klen, val, vlen); + IWriteBatch IWriteBatch.Mergev(int numKeys, IntPtr keysList, IntPtr keysListSizes, int numValues, IntPtr valuesList, IntPtr valuesListSizes) + => Mergev(numKeys, keysList, keysListSizes, numValues, valuesList, valuesListSizes); + IWriteBatch IWriteBatch.MergevCf(IntPtr columnFamily, int numKeys, IntPtr keysList, IntPtr keysListSizes, int numValues, IntPtr valuesList, IntPtr valuesListSizes) + => MergevCf(columnFamily, numKeys, keysList, keysListSizes, numValues, valuesList, valuesListSizes); + IWriteBatch IWriteBatch.Delete(byte[] key, ColumnFamilyHandle cf) + => Delete(key, cf); + IWriteBatch IWriteBatch.Delete(byte[] key, ulong klen, ColumnFamilyHandle cf) + => Delete(key, klen, cf); + IWriteBatch IWriteBatch.DeleteRange(byte[] startKey, ulong sklen, byte[] endKey, ulong eklen, ColumnFamilyHandle cf) + => DeleteRange(startKey, sklen, endKey, eklen, cf); + IWriteBatch IWriteBatch.PutLogData(byte[] blob, ulong len) + => PutLogData(blob, len); + IWriteBatch IWriteBatch.Iterate(IntPtr state, WriteBatchIteratePutCallback put, WriteBatchIterateDeleteCallback deleted) + => Iterate(state, put, deleted); + } +} diff --git a/tests/RocksDbSharpTest/FunctionalTests.cs b/tests/RocksDbSharpTest/FunctionalTests.cs index d55e45d..eb56c6d 100644 --- a/tests/RocksDbSharpTest/FunctionalTests.cs +++ b/tests/RocksDbSharpTest/FunctionalTests.cs @@ -247,6 +247,55 @@ public void FunctionalTest() sst.Add("1001", "1001"); // this order is only allowed using an integer comparator sst.Finish(); } + + // test write batch with index + { + var wbwi = new WriteBatchWithIndex(reservedBytes: 1024); + wbwi.Put("one", "un"); + wbwi.Put("two", "deux"); + var oneValueIn = Encoding.UTF8.GetBytes("one"); + var oneValueOut = wbwi.Get("one"); + Assert.Equal("un", oneValueOut); + using (var db = RocksDb.Open(options, path, columnFamilies)) + { + var oneCombinedOut = wbwi.Get(db, "one"); + var threeCombinedOut = wbwi.Get(db, "three"); + Assert.Equal("un", oneCombinedOut); + Assert.Equal("tres", threeCombinedOut); + + using (var wbIterator = wbwi.NewIterator(db.NewIterator())) + { + wbIterator.Seek("o"); + Assert.True(wbIterator.Valid()); + var itkey = wbIterator.StringKey(); + Assert.Equal("one", itkey); + var itval = wbIterator.StringValue(); + Assert.Equal("un", itval); + + wbIterator.Next(); + Assert.True(wbIterator.Valid()); + itkey = wbIterator.StringKey(); + Assert.Equal("three", itkey); + itval = wbIterator.StringValue(); + Assert.Equal("tres", itval); + + wbIterator.Next(); + Assert.True(wbIterator.Valid()); + itkey = wbIterator.StringKey(); + Assert.Equal("two", itkey); + itval = wbIterator.StringValue(); + Assert.Equal("deux", itval); + + wbIterator.Next(); + Assert.False(wbIterator.Valid()); + } + + db.Write(wbwi); + + var oneDbOut = wbwi.Get("one"); + Assert.Equal("un", oneDbOut); + } + } } class IntegerStringComparator : StringComparatorBase