Skip to content

Commit

Permalink
Add option to clear RocksDB store
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret committed Jan 8, 2025
1 parent 4678c30 commit 1493b3e
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 30 deletions.
15 changes: 15 additions & 0 deletions src/RocksDb.Extensions/ColumnFamily.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using RocksDbSharp;

namespace RocksDb.Extensions;

internal class ColumnFamily
{
public ColumnFamilyHandle Handle { get; set; }
public string Name { get; }

public ColumnFamily(ColumnFamilyHandle handle, string name)
{
Handle = handle;
Name = name;
}
}
1 change: 1 addition & 0 deletions src/RocksDb.Extensions/IRocksDbAccessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public interface IRocksDbAccessor<TKey, TValue>
void PutRange(IReadOnlyList<(TKey key, TValue value)> items);
IEnumerable<TValue> GetAll();
bool HasKey(TKey key);
void Clear();
}

#pragma warning restore CS1591
11 changes: 11 additions & 0 deletions src/RocksDb.Extensions/IRocksDbStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,15 @@ public abstract class RocksDbStore<TKey, TValue>
/// <param name="key">The key to check in the store for an associated value.</param>
/// <returns><c>true</c> if the store contains an element with the specified key; otherwise, <c>false</c>.</returns>
public bool HasKey(TKey key) => _rocksDbAccessor.HasKey(key);

/// <summary>
/// Resets the column family associated with the store.
/// This operation destroys the current column family and creates a new one,
/// effectively removing all stored key-value pairs.
///
/// Note: This method is intended for scenarios where a complete reset of the column family
/// is required. The operation may involve internal reallocation and metadata changes, which
/// can impact performance during execution. Use with caution in high-frequency workflows.
/// </summary>
public void Clear() => _rocksDbAccessor.Clear();
}
38 changes: 22 additions & 16 deletions src/RocksDb.Extensions/RocksDbAccessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ internal class RocksDbAccessor<TKey, TValue> : IRocksDbAccessor<TKey, TValue>, I

private readonly ISerializer<TKey> _keySerializer;
private readonly ISerializer<TValue> _valueSerializer;
private readonly RocksDbSharp.RocksDb _rocksDb;
private readonly ColumnFamilyHandle _columnFamilyHandle;
private readonly RocksDbContext _rocksDbContext;
private readonly ColumnFamily _columnFamily;
private readonly bool _checkIfExists;

public RocksDbAccessor(RocksDbSharp.RocksDb rocksDb,
ColumnFamilyHandle columnFamilyHandle,
public RocksDbAccessor(RocksDbContext rocksDbContext,
ColumnFamily columnFamily,
ISerializer<TKey> keySerializer,
ISerializer<TValue> valueSerializer)
{
_rocksDb = rocksDb;
_columnFamilyHandle = columnFamilyHandle;
_rocksDbContext = rocksDbContext;
_columnFamily = columnFamily;
_keySerializer = keySerializer;
_valueSerializer = valueSerializer;

Expand Down Expand Up @@ -56,7 +56,7 @@ public void Remove(TKey key)
keySpan = keyBufferWriter.WrittenSpan;
}

_rocksDb.Remove(keySpan, _columnFamilyHandle);
_rocksDbContext.Db.Remove(keySpan, _columnFamily.Handle);
}
finally
{
Expand Down Expand Up @@ -119,7 +119,7 @@ public void Put(TKey key, TValue value)
valueSpan = valueBufferWriter.WrittenSpan;
}

_rocksDb.Put(keySpan, valueSpan, _columnFamilyHandle);
_rocksDbContext.Db.Put(keySpan, valueSpan, _columnFamily.Handle);
}
finally
{
Expand Down Expand Up @@ -165,13 +165,13 @@ public bool TryGet(TKey key, [MaybeNullWhen(false)] out TValue value)
keySpan = keyBufferWriter.WrittenSpan;
}

if (_checkIfExists && _rocksDb.HasKey(keySpan, _columnFamilyHandle) == false)
if (_checkIfExists && _rocksDbContext.Db.HasKey(keySpan, _columnFamily.Handle) == false)
{
value = default;
return false;
}

value = _rocksDb.Get(keySpan, this, _columnFamilyHandle);
value = _rocksDbContext.Db.Get(keySpan, this, _columnFamily.Handle);
return value != null;
}
finally
Expand Down Expand Up @@ -202,7 +202,7 @@ public void PutRange(ReadOnlySpan<TKey> keys, ReadOnlySpan<TValue> values)
AddToBatch(keys[i], values[i], batch);
}

_rocksDb.Write(batch);
_rocksDbContext.Db.Write(batch);
}

public void PutRange(ReadOnlySpan<TValue> values, Func<TValue, TKey> keySelector)
Expand All @@ -215,7 +215,7 @@ public void PutRange(ReadOnlySpan<TValue> values, Func<TValue, TKey> keySelector
AddToBatch(key, value, batch);
}

_rocksDb.Write(batch);
_rocksDbContext.Db.Write(batch);
}

public void PutRange(IReadOnlyList<(TKey key, TValue value)> items)
Expand All @@ -227,7 +227,7 @@ public void PutRange(IReadOnlyList<(TKey key, TValue value)> items)
AddToBatch(key, value, batch);
}

_rocksDb.Write(batch);
_rocksDbContext.Db.Write(batch);
}

private void AddToBatch(TKey key, TValue value, WriteBatch batch)
Expand Down Expand Up @@ -281,7 +281,7 @@ private void AddToBatch(TKey key, TValue value, WriteBatch batch)
valueSpan = valueBufferWriter.WrittenSpan;
}

_ = batch.Put(keySpan, valueSpan, _columnFamilyHandle);
_ = batch.Put(keySpan, valueSpan, _columnFamily.Handle);
}
finally
{
Expand All @@ -301,7 +301,7 @@ private void AddToBatch(TKey key, TValue value, WriteBatch batch)

public IEnumerable<TValue> GetAll()
{
using var iterator = _rocksDb.NewIterator(_columnFamilyHandle);
using var iterator = _rocksDbContext.Db.NewIterator(_columnFamily.Handle);
_ = iterator.SeekToFirst();
while (iterator.Valid())
{
Expand Down Expand Up @@ -338,7 +338,7 @@ public bool HasKey(TKey key)
keySpan = keyBufferWriter.WrittenSpan;
}

return _rocksDb.HasKey(keySpan, _columnFamilyHandle);
return _rocksDbContext.Db.HasKey(keySpan, _columnFamily.Handle);
}
finally
{
Expand All @@ -349,5 +349,11 @@ public bool HasKey(TKey key)
}
}
}

public void Clear()
{
_rocksDbContext.Db.DropColumnFamily(_columnFamily.Name);
_columnFamily.Handle = _rocksDbContext.Db.CreateColumnFamily(_rocksDbContext.ColumnFamilyOptions, _columnFamily.Name);
}
}

4 changes: 2 additions & 2 deletions src/RocksDb.Extensions/RocksDbBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public IRocksDbBuilder AddStore<TKey, TValue, TStore>(string columnFamily) where
var keySerializer = CreateSerializer<TKey>(rocksDbOptions.Value.SerializerFactories);
var valueSerializer = CreateSerializer<TValue>(rocksDbOptions.Value.SerializerFactories);
var rocksDbAccessor = new RocksDbAccessor<TKey, TValue>(
rocksDbContext.Db,
columnFamilyHandle,
rocksDbContext,
new ColumnFamily(columnFamilyHandle, columnFamily),
keySerializer,
valueSerializer
);
Expand Down
27 changes: 15 additions & 12 deletions src/RocksDb.Extensions/RocksDbContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ internal class RocksDbContext : IDisposable
{
private readonly RocksDbSharp.RocksDb _rocksDb;
private readonly Cache _cache;
private readonly ColumnFamilyOptions _userSpecifiedOptions;

private const long BlockCacheSize = 50 * 1024 * 1024L;
private const long BlockSize = 4096L;
Expand All @@ -16,23 +17,23 @@ internal class RocksDbContext : IDisposable
public RocksDbContext(IOptions<RocksDbOptions> options)
{
var dbOptions = new DbOptions();
var userSpecifiedOptions = new ColumnFamilyOptions();
_userSpecifiedOptions = new ColumnFamilyOptions();
var tableConfig = new BlockBasedTableOptions();
_cache = Cache.CreateLru(BlockCacheSize);
tableConfig.SetBlockCache(_cache);
tableConfig.SetBlockSize(BlockSize);

var filter = BloomFilterPolicy.Create();
tableConfig.SetFilterPolicy(filter);
userSpecifiedOptions.SetBlockBasedTableFactory(tableConfig);
userSpecifiedOptions.SetWriteBufferSize(WriteBufferSize);
userSpecifiedOptions.SetCompression(Compression.No);
userSpecifiedOptions.SetCompactionStyle(Compaction.Universal);
userSpecifiedOptions.SetMaxWriteBufferNumberToMaintain(MaxWriteBuffers);
userSpecifiedOptions.SetCreateIfMissing();
userSpecifiedOptions.SetCreateMissingColumnFamilies();
userSpecifiedOptions.SetErrorIfExists(false);
userSpecifiedOptions.SetInfoLogLevel(InfoLogLevel.Error);
_userSpecifiedOptions.SetBlockBasedTableFactory(tableConfig);
_userSpecifiedOptions.SetWriteBufferSize(WriteBufferSize);
_userSpecifiedOptions.SetCompression(Compression.No);
_userSpecifiedOptions.SetCompactionStyle(Compaction.Universal);
_userSpecifiedOptions.SetMaxWriteBufferNumberToMaintain(MaxWriteBuffers);
_userSpecifiedOptions.SetCreateIfMissing();
_userSpecifiedOptions.SetCreateMissingColumnFamilies();
_userSpecifiedOptions.SetErrorIfExists(false);
_userSpecifiedOptions.SetInfoLogLevel(InfoLogLevel.Error);

// this is the recommended way to increase parallelism in RocksDb
// note that the current implementation of setIncreaseParallelism affects the number
Expand All @@ -51,9 +52,9 @@ public RocksDbContext(IOptions<RocksDbOptions> options)
var writeOptions = new WriteOptions();
writeOptions.DisableWal(1);

userSpecifiedOptions.EnableStatistics();
_userSpecifiedOptions.EnableStatistics();

var columnFamilies = CreateColumnFamilies(options.Value.ColumnFamilies, userSpecifiedOptions);
var columnFamilies = CreateColumnFamilies(options.Value.ColumnFamilies, _userSpecifiedOptions);

if (options.Value.DeleteExistingDatabaseOnStartup)
{
Expand All @@ -71,6 +72,8 @@ private static void DestroyDatabase(string path)

public RocksDbSharp.RocksDb Db => _rocksDb;

public ColumnFamilyOptions ColumnFamilyOptions => _userSpecifiedOptions;

private static ColumnFamilies CreateColumnFamilies(IReadOnlyList<string> columnFamilyNames,
ColumnFamilyOptions columnFamilyOptions)
{
Expand Down
61 changes: 61 additions & 0 deletions test/RocksDb.Extensions.Tests/ClearStoreTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using NUnit.Framework;
using RocksDb.Extensions.Tests.Utils;
using Shouldly;

namespace RocksDb.Extensions.Tests;

public class ClearStoreTests
{
[Test]
public void should_reset_store_range_data_to_store()
{
// Setup RocksDbStore
using var testFixture = CreateTestFixture<int, string>();

// Put some data
var store = testFixture.GetStore<RocksDbGenericStore<int, string>>();
var cacheKeys = Enumerable.Range(0, 100)
.Select(x => (key: x, value: x.ToString()))
.ToList();

store.PutRange(cacheKeys);

// Verify that data was added
foreach (var (key, expectedValue) in cacheKeys)
{
store.HasKey(key).ShouldBeTrue();
store.TryGet(key, out var value).ShouldBeTrue();
value.ShouldBe(expectedValue);
}

// Clear the store
store.Clear();

// Verify that data is no longer there
foreach (var (key, expectedValue) in cacheKeys)
{
store.HasKey(key).ShouldBeFalse();
store.TryGet(key, out _).ShouldBeFalse();
}

// Try to put the data again
store.PutRange(cacheKeys);

// Verify that data is available again
foreach (var (key, expectedValue) in cacheKeys)
{
store.HasKey(key).ShouldBeTrue();
store.TryGet(key, out var value).ShouldBeTrue();
value.ShouldBe(expectedValue);
}
}

private static TestFixture CreateTestFixture<TKey, TValue>()
{
var testFixture = TestFixture.Create(rockDb =>
{
_ = rockDb.AddStore<TKey, TValue, RocksDbGenericStore<TKey, TValue>>("my-store");
});
return testFixture;
}
}

0 comments on commit 1493b3e

Please sign in to comment.