Skip to content

Commit

Permalink
add support for WriteBatchWithIndex
Browse files Browse the repository at this point in the history
  • Loading branch information
warrenfalk committed Jun 6, 2017
1 parent d57c2c4 commit cca984f
Show file tree
Hide file tree
Showing 8 changed files with 845 additions and 31 deletions.
11 changes: 11 additions & 0 deletions RocksDbSharp/Iterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ public void Dispose()
}
}

/// <summary>
/// Detach the iterator from its handle but don't dispose the handle
/// </summary>
/// <returns></returns>
public IntPtr Detach()
{
var r = handle;
handle = IntPtr.Zero;
return r;
}

public bool Valid()
{
return Native.Instance.rocksdb_iter_valid(handle);
Expand Down
174 changes: 174 additions & 0 deletions RocksDbSharp/Native.Marshaled.cs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,33 @@ public void rocksdb_writebatch_put(IntPtr writeOptions, string key, string val,
}
}

public void rocksdb_writebatch_wi_put(IntPtr writeOptions, string key, string val, Encoding encoding)
{
unsafe
{
if (encoding == null)
encoding = Encoding.UTF8;
fixed (char* k = key, v = val)
{
int klength = key.Length;
int vlength = val.Length;
int bklength = encoding.GetByteCount(k, klength);
int bvlength = encoding.GetByteCount(v, vlength);
var buffer = Marshal.AllocHGlobal(bklength + bvlength);
byte* bk = (byte*)buffer.ToPointer();
encoding.GetBytes(k, klength, bk, bklength);
byte* bv = bk + bklength;
encoding.GetBytes(v, vlength, bv, bvlength);

rocksdb_writebatch_wi_put(writeOptions, bk, (ulong)bklength, bv, (ulong)bvlength);
#if DEBUG
Zero(bk, bklength);
#endif
Marshal.FreeHGlobal(buffer);
}
}
}

public void rocksdb_iter_seek(
IntPtr iter,
string key,
Expand Down Expand Up @@ -463,6 +490,16 @@ public byte[] rocksdb_writebatch_data(IntPtr wbHandle)
return data;
}

public byte[] rocksdb_writebatch_wi_data(IntPtr wbHandle)
{
var resultPtr = rocksdb_writebatch_wi_data(wbHandle, out ulong size);
var data = new byte[size];
Marshal.Copy(resultPtr, data, 0, (int)size);
// Do not free this memory because it is owned by the write batch and will be freed when it is disposed
// rocksdb_free(resultPtr);
return data;
}

public int rocksdb_writebatch_data(IntPtr wbHandle, byte[] buffer, int offset, int length)
{
var resultPtr = rocksdb_writebatch_data(wbHandle, out ulong size);
Expand All @@ -479,6 +516,22 @@ public int rocksdb_writebatch_data(IntPtr wbHandle, byte[] buffer, int offset, i
return (int)size;
}

public int rocksdb_writebatch_wi_data(IntPtr wbHandle, byte[] buffer, int offset, int length)
{
var resultPtr = rocksdb_writebatch_wi_data(wbHandle, out ulong size);
bool fits = (int)size <= length;
if (!fits)
{
// Do not free this memory because it is owned by the write batch and will be freed when it is disposed
// rocksdb_free(resultPtr);
return -1;
}
Marshal.Copy(resultPtr, buffer, offset, (int)size);
// Do not free this memory because it is owned by the write batch and will be freed when it is disposed
// rocksdb_free(resultPtr);
return (int)size;
}

public string rocksdb_property_value_string(IntPtr db, string propname)
{
return MarshalNullTermAsciiStr(rocksdb_property_value(db, propname));
Expand Down Expand Up @@ -522,5 +575,126 @@ public unsafe void rocksdb_sstfilewriter_add(
}
}
}

public string rocksdb_writebatch_wi_get_from_batch(
IntPtr wb,
IntPtr options,
string key,
out IntPtr errptr,
ColumnFamilyHandle cf = null,
Encoding encoding = null)
{
if (encoding == null)
encoding = Encoding.UTF8;
unsafe
{
fixed (char* k = key)
{
int klength = key.Length;
int bklength = encoding.GetByteCount(k, klength);
var buffer = Marshal.AllocHGlobal(bklength);
byte* bk = (byte*)buffer.ToPointer();
encoding.GetBytes(k, klength, bk, bklength);

var resultPtr = cf == null
? rocksdb_writebatch_wi_get_from_batch(wb, options, bk, (ulong)bklength, out ulong bvlength, out errptr)
: rocksdb_writebatch_wi_get_from_batch_cf(wb, options, cf.Handle, bk, (ulong)bklength, out bvlength, out errptr);
#if DEBUG
Zero(bk, bklength);
#endif
Marshal.FreeHGlobal(buffer);

if (errptr != IntPtr.Zero)
return null;
if (resultPtr == IntPtr.Zero)
return null;

return MarshalAndFreeRocksDbString(resultPtr, (long)bvlength, encoding);
}
}
}

public byte[] rocksdb_writebatch_wi_get_from_batch(
IntPtr wb,
IntPtr options,
byte[] key,
ulong keyLength,
out IntPtr errptr,
ColumnFamilyHandle cf = null)
{
var resultPtr = cf == null
? rocksdb_writebatch_wi_get_from_batch(wb, options, key, keyLength, out ulong valueLength, out errptr)
: rocksdb_writebatch_wi_get_from_batch_cf(wb, options, cf.Handle, key, keyLength, out valueLength, out errptr);
if (errptr != IntPtr.Zero)
return null;
if (resultPtr == IntPtr.Zero)
return null;
var result = new byte[valueLength];
Marshal.Copy(resultPtr, result, 0, (int)valueLength);
rocksdb_free(resultPtr);
return result;
}

public string rocksdb_writebatch_wi_get_from_batch_and_db(
IntPtr wb,
IntPtr db,
IntPtr read_options,
string key,
out IntPtr errptr,
ColumnFamilyHandle cf = null,
Encoding encoding = null)
{
if (encoding == null)
encoding = Encoding.UTF8;
unsafe
{
fixed (char* k = key)
{
int klength = key.Length;
int bklength = encoding.GetByteCount(k, klength);
var buffer = Marshal.AllocHGlobal(bklength);
byte* bk = (byte*)buffer.ToPointer();
encoding.GetBytes(k, klength, bk, bklength);

var resultPtr = cf == null
? rocksdb_writebatch_wi_get_from_batch_and_db(wb, db, read_options, bk, (ulong)bklength, out ulong bvlength, out errptr)
: rocksdb_writebatch_wi_get_from_batch_and_db_cf(wb, db, read_options, cf.Handle, bk, (ulong)bklength, out bvlength, out errptr);
#if DEBUG
Zero(bk, bklength);
#endif
Marshal.FreeHGlobal(buffer);

if (errptr != IntPtr.Zero)
return null;
if (resultPtr == IntPtr.Zero)
return null;

return MarshalAndFreeRocksDbString(resultPtr, (long)bvlength, encoding);
}
}
}

public byte[] rocksdb_writebatch_wi_get_from_batch_and_db(
IntPtr wb,
IntPtr db,
IntPtr read_options,
byte[] key,
ulong keyLength,
out IntPtr errptr,
ColumnFamilyHandle cf = null)
{
var resultPtr = cf == null
? rocksdb_writebatch_wi_get_from_batch_and_db(wb, db, read_options, key, keyLength, out ulong valueLength, out errptr)
: rocksdb_writebatch_wi_get_from_batch_and_db_cf(wb, db, read_options, cf.Handle, key, keyLength, out valueLength, out errptr);
if (errptr != IntPtr.Zero)
return null;
if (resultPtr == IntPtr.Zero)
return null;
var result = new byte[valueLength];
Marshal.Copy(resultPtr, result, 0, (int)valueLength);
rocksdb_free(resultPtr);
return result;
}

}
}
81 changes: 70 additions & 11 deletions RocksDbSharp/Native.Raw.cs
Original file line number Diff line number Diff line change
Expand Up @@ -532,11 +532,19 @@ public abstract void rocksdb_writebatch_wi_destroy(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b);
public abstract void rocksdb_writebatch_wi_clear(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b);
public abstract int rocksdb_writebatch_wi_count(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b);
public abstract void rocksdb_writebatch_wi_put(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b,
/*(const char*)*/ byte[] key,
/*(size_t)*/ ulong klen,
/*(const char*)*/ byte[] val,
/*(size_t)*/ ulong vlen);
public abstract unsafe void rocksdb_writebatch_wi_put(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b,
/*(const char*)*/ byte* key,
/*(size_t)*/ ulong klen,
/*(const char*)*/ byte* val,
/*(size_t)*/ ulong vlen);
public abstract void rocksdb_writebatch_wi_put_cf(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
/*(const char*)*/ byte[] key, /*(size_t)*/ ulong klen, /*(const char*)*/ byte[] val, /*(size_t)*/ ulong vlen);
public abstract unsafe void rocksdb_writebatch_wi_put_cf(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
/*(const char*)*/ byte* key, /*(size_t)*/ ulong klen, /*(const char*)*/ byte* val, /*(size_t)*/ ulong vlen);
Expand All @@ -549,11 +557,19 @@ public abstract void rocksdb_writebatch_wi_putv_cf(
int num_keys, /*(const char* const*)*/ IntPtr keys_list, /*(const size_t*)*/ IntPtr keys_list_sizes,
int num_values, /*(const char* const*)*/ IntPtr values_list,
/*(const size_t*)*/ IntPtr values_list_sizes);
public abstract void rocksdb_writebatch_wi_merge(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b,
/*(const char*)*/ byte[] key,
/*(size_t)*/ ulong klen,
/*(const char*)*/ byte[] val,
/*(size_t)*/ ulong vlen);
public abstract unsafe void rocksdb_writebatch_wi_merge(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b,
/*(const char*)*/ byte* key,
/*(size_t)*/ ulong klen,
/*(const char*)*/ byte* val,
/*(size_t)*/ ulong vlen);
public abstract void rocksdb_writebatch_wi_merge_cf(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
/*(const char*)*/ byte[] key, /*(size_t)*/ ulong klen, /*(const char*)*/ byte[] val, /*(size_t)*/ ulong vlen);
public abstract unsafe void rocksdb_writebatch_wi_merge_cf(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
/*(const char*)*/ byte* key, /*(size_t)*/ ulong klen, /*(const char*)*/ byte* val, /*(size_t)*/ ulong vlen);
Expand All @@ -566,9 +582,15 @@ public abstract void rocksdb_writebatch_wi_mergev_cf(
int num_keys, /*(const char* const*)*/ IntPtr keys_list, /*(const size_t*)*/ IntPtr keys_list_sizes,
int num_values, /*(const char* const*)*/ IntPtr values_list,
/*(const size_t*)*/ IntPtr values_list_sizes);
public abstract void rocksdb_writebatch_wi_delete(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b,
/*(const char*)*/ byte[] key,
/*(size_t)*/ ulong klen);
public abstract unsafe void rocksdb_writebatch_wi_delete(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b,
/*(const char*)*/ byte* key,
/*(size_t)*/ ulong klen);
public abstract void rocksdb_writebatch_wi_delete_cf(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
/*(const char*)*/ byte[] key, /*(size_t)*/ ulong klen);
public abstract unsafe void rocksdb_writebatch_wi_delete_cf(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
/*(const char*)*/ byte* key, /*(size_t)*/ ulong klen);
Expand All @@ -578,9 +600,16 @@ public abstract void rocksdb_writebatch_wi_deletev(
public abstract void rocksdb_writebatch_wi_deletev_cf(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
int num_keys, /*(const char* const*)*/ IntPtr keys_list, /*(const size_t*)*/ IntPtr keys_list_sizes);
public abstract void rocksdb_writebatch_wi_delete_range(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(const char*)*/ byte[] start_key, /*(size_t)*/ ulong start_key_len,
/*(const char*)*/ byte[] end_key, /*(size_t)*/ ulong end_key_len);
public abstract unsafe void rocksdb_writebatch_wi_delete_range(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(const char*)*/ byte* start_key, /*(size_t)*/ ulong start_key_len,
/*(const char*)*/ byte* end_key, /*(size_t)*/ ulong end_key_len);
public abstract void rocksdb_writebatch_wi_delete_range_cf(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
/*(const char*)*/ byte[] start_key, /*(size_t)*/ ulong start_key_len, /*(const char*)*/ byte[] end_key,
/*(size_t)*/ ulong end_key_len);
public abstract unsafe void rocksdb_writebatch_wi_delete_range_cf(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
/*(const char*)*/ byte* start_key, /*(size_t)*/ ulong start_key_len, /*(const char*)*/ byte* end_key,
Expand All @@ -594,53 +623,83 @@ public abstract void rocksdb_writebatch_wi_delete_rangev_cf(
int num_keys, /*(const char* const*)*/ IntPtr start_keys_list,
/*(const size_t*)*/ IntPtr start_keys_list_sizes, /*(const char* const*)*/ IntPtr end_keys_list,
/*(const size_t*)*/ IntPtr end_keys_list_sizes);
public abstract void rocksdb_writebatch_wi_put_log_data(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(const char*)*/ byte[] blob, /*(size_t)*/ ulong len);
public abstract void rocksdb_writebatch_wi_put_log_data(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(const char*)*/ IntPtr blob, /*(size_t)*/ ulong len);
public abstract void rocksdb_writebatch_wi_iterate(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b,
/*(void*)*/ IntPtr state,
/*(void (*put)(void*, const char* k, size_t klen, const char* v, size_t vlen))*/ IntPtr put,
/*(void (*deleted)(void*, const char* k, size_t klen))*/ IntPtr deleted);
/*(void (*put)(void*, const char* k, size_t klen, const char* v, size_t vlen))*/ WriteBatchIteratePutCallback put,
/*(void (*deleted)(void*, const char* k, size_t klen))*/ WriteBatchIterateDeleteCallback deleted);
public abstract /*(const char*)*/ IntPtr rocksdb_writebatch_wi_data(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b,
/*(size_t*)*/ out ulong size);
public abstract void rocksdb_writebatch_wi_set_save_point(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b);
public abstract void rocksdb_writebatch_wi_rollback_to_save_point(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(char**)*/ IntPtr errptr);
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(char**)*/ out IntPtr errptr);
public abstract /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi,
/*(const rocksdb_options_t*)*/ IntPtr options,
/*(const char*)*/ byte[] key, /*(size_t)*/ ulong keylen,
/*(size_t*)*/ out ulong vallen,
/*(char**)*/ out IntPtr errptr);
public abstract unsafe /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi,
/*(const rocksdb_options_t*)*/ IntPtr options,
/*(const char*)*/ byte* key, /*(size_t)*/ ulong keylen,
/*(size_t*)*/ out ulong vallen,
/*(char**)*/ IntPtr errptr);
/*(char**)*/ out IntPtr errptr);
public abstract /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch_cf(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi,
/*(const rocksdb_options_t*)*/ IntPtr options,
/*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
/*(const char*)*/ byte[] key, /*(size_t)*/ ulong keylen,
/*(size_t*)*/ out ulong vallen,
/*(char**)*/ out IntPtr errptr);
public abstract unsafe /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch_cf(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi,
/*(const rocksdb_options_t*)*/ IntPtr options,
/*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
/*(const char*)*/ byte* key, /*(size_t)*/ ulong keylen,
/*(size_t*)*/ out ulong vallen,
/*(char**)*/ IntPtr errptr);
/*(char**)*/ out IntPtr errptr);
public abstract /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch_and_db(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi,
/*(rocksdb_t*)*/ IntPtr db,
/*(const rocksdb_readoptions_t*)*/ IntPtr read_options,
/*(const char*)*/ byte[] key, /*(size_t)*/ ulong keylen,
/*(size_t*)*/ out ulong vallen,
/*(char**)*/ out IntPtr errptr);
public abstract unsafe /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch_and_db(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi,
/*(rocksdb_t*)*/ IntPtr db,
/*(const rocksdb_readoptions_t*)*/ IntPtr options,
/*(const rocksdb_readoptions_t*)*/ IntPtr read_options,
/*(const char*)*/ byte* key, /*(size_t)*/ ulong keylen,
/*(size_t*)*/ out ulong vallen,
/*(char**)*/ IntPtr errptr);
/*(char**)*/ out IntPtr errptr);
public abstract /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch_and_db_cf(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi,
/*(rocksdb_t*)*/ IntPtr db,
/*(const rocksdb_readoptions_t*)*/ IntPtr read_options,
/*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
/*(const char*)*/ byte[] key, /*(size_t)*/ ulong keylen,
/*(size_t*)*/ out ulong vallen,
/*(char**)*/ out IntPtr errptr);
public abstract unsafe /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch_and_db_cf(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi,
/*(rocksdb_t*)*/ IntPtr db,
/*(const rocksdb_readoptions_t*)*/ IntPtr options,
/*(const rocksdb_readoptions_t*)*/ IntPtr read_options,
/*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
/*(const char*)*/ byte* key, /*(size_t)*/ ulong keylen,
/*(size_t*)*/ out ulong vallen,
/*(char**)*/ IntPtr errptr);
/*(char**)*/ out IntPtr errptr);
public abstract void rocksdb_write_writebatch_wi(
/*(rocksdb_t*)*/ IntPtr db,
/*(const rocksdb_writeoptions_t*)*/ IntPtr options,
/*(const rocksdb_writeoptions_t*)*/ IntPtr write_options,
/*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi,
/*(char**)*/ IntPtr errptr);
/*(char**)*/ out IntPtr errptr);
public abstract /*(rocksdb_iterator_t*)*/ IntPtr rocksdb_writebatch_wi_create_iterator_with_base(
/*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi,
/*(rocksdb_iterator_t*)*/ IntPtr base_iterator);
Expand Down
Loading

0 comments on commit cca984f

Please sign in to comment.