diff --git a/.azure/pipelines/azure-pipelines-external-release.yml b/.azure/pipelines/azure-pipelines-external-release.yml
index 218e0d98ed..a7de9c0fcd 100644
--- a/.azure/pipelines/azure-pipelines-external-release.yml
+++ b/.azure/pipelines/azure-pipelines-external-release.yml
@@ -1,9 +1,9 @@
######################################
# NOTE: Before running this pipeline to generate a new nuget package, update the version string in two places
# 1) update the name: string below (line 6) -- this is the version for the nuget package (e.g. 1.0.0)
-# 2) update \libs\host\GarnetServer.cs readonly string version (~line 45) -- NOTE - these two values need to be the same
+# 2) update \libs\host\GarnetServer.cs readonly string version (~line 53) -- NOTE - these two values need to be the same
######################################
-name: 1.0.19
+name: 1.0.30
trigger:
branches:
include:
diff --git a/Directory.Packages.props b/Directory.Packages.props
index c025628c1a..df66545fc2 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -25,5 +25,7 @@
+
+
\ No newline at end of file
diff --git a/Garnet.nuspec b/Garnet.nuspec
index 7d5d13bae5..199256eea4 100644
--- a/Garnet.nuspec
+++ b/Garnet.nuspec
@@ -37,6 +37,9 @@
+
+
+
diff --git a/Garnet.sln b/Garnet.sln
index d85da9228e..1a7f8dffd2 100644
--- a/Garnet.sln
+++ b/Garnet.sln
@@ -97,8 +97,17 @@ EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SampleModule", "playground\SampleModule\SampleModule.csproj", "{A8CA619E-8F13-4EF8-943F-2D5E3FEBFB3F}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GarnetJSON", "playground\GarnetJSON\GarnetJSON.csproj", "{2C8F1F5D-31E5-4D00-A46E-F3B1D9BC098F}"
+EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MigrateBench", "playground\MigrateBench\MigrateBench.csproj", "{6B66B394-E410-4B61-9A5A-1595FF6F5E08}"
EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "hosting", "hosting", "{01823EA4-4446-4D66-B268-DFEE55951964}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Windows", "Windows", "{697766CD-2046-46D9-958A-0FD3B46C98D4}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Garnet.worker", "hosting\Windows\Garnet.worker\Garnet.worker.csproj", "{DF2DD03E-87EE-482A-9FBA-6C8FBC23BDC5}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Garnet.resources", "libs\resources\Garnet.resources.csproj", "{A48412B4-FD60-467E-A5D9-F155CAB4F907}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -299,6 +308,22 @@ Global
{6B66B394-E410-4B61-9A5A-1595FF6F5E08}.Release|Any CPU.Build.0 = Release|Any CPU
{6B66B394-E410-4B61-9A5A-1595FF6F5E08}.Release|x64.ActiveCfg = Release|Any CPU
{6B66B394-E410-4B61-9A5A-1595FF6F5E08}.Release|x64.Build.0 = Release|Any CPU
+ {DF2DD03E-87EE-482A-9FBA-6C8FBC23BDC5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {DF2DD03E-87EE-482A-9FBA-6C8FBC23BDC5}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {DF2DD03E-87EE-482A-9FBA-6C8FBC23BDC5}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {DF2DD03E-87EE-482A-9FBA-6C8FBC23BDC5}.Debug|x64.Build.0 = Debug|Any CPU
+ {DF2DD03E-87EE-482A-9FBA-6C8FBC23BDC5}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {DF2DD03E-87EE-482A-9FBA-6C8FBC23BDC5}.Release|Any CPU.Build.0 = Release|Any CPU
+ {DF2DD03E-87EE-482A-9FBA-6C8FBC23BDC5}.Release|x64.ActiveCfg = Release|Any CPU
+ {DF2DD03E-87EE-482A-9FBA-6C8FBC23BDC5}.Release|x64.Build.0 = Release|Any CPU
+ {A48412B4-FD60-467E-A5D9-F155CAB4F907}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {A48412B4-FD60-467E-A5D9-F155CAB4F907}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {A48412B4-FD60-467E-A5D9-F155CAB4F907}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {A48412B4-FD60-467E-A5D9-F155CAB4F907}.Debug|x64.Build.0 = Debug|Any CPU
+ {A48412B4-FD60-467E-A5D9-F155CAB4F907}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {A48412B4-FD60-467E-A5D9-F155CAB4F907}.Release|Any CPU.Build.0 = Release|Any CPU
+ {A48412B4-FD60-467E-A5D9-F155CAB4F907}.Release|x64.ActiveCfg = Release|Any CPU
+ {A48412B4-FD60-467E-A5D9-F155CAB4F907}.Release|x64.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -330,6 +355,9 @@ Global
{A8CA619E-8F13-4EF8-943F-2D5E3FEBFB3F} = {69A71E2C-00E3-42F3-854E-BE157A24834E}
{2C8F1F5D-31E5-4D00-A46E-F3B1D9BC098F} = {69A71E2C-00E3-42F3-854E-BE157A24834E}
{6B66B394-E410-4B61-9A5A-1595FF6F5E08} = {69A71E2C-00E3-42F3-854E-BE157A24834E}
+ {697766CD-2046-46D9-958A-0FD3B46C98D4} = {01823EA4-4446-4D66-B268-DFEE55951964}
+ {DF2DD03E-87EE-482A-9FBA-6C8FBC23BDC5} = {697766CD-2046-46D9-958A-0FD3B46C98D4}
+ {A48412B4-FD60-467E-A5D9-F155CAB4F907} = {147FCE31-EC09-4C90-8E4D-37CA87ED18C3}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2C02C405-4798-41CA-AF98-61EDFEF6772E}
diff --git a/benchmark/BDN.benchmark/Resp/RespAofStress.cs b/benchmark/BDN.benchmark/Resp/RespAofStress.cs
new file mode 100644
index 0000000000..7e52feec90
--- /dev/null
+++ b/benchmark/BDN.benchmark/Resp/RespAofStress.cs
@@ -0,0 +1,103 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT license.
+
+using System.Runtime.CompilerServices;
+using BenchmarkDotNet.Attributes;
+using Embedded.perftest;
+using Garnet.server;
+
+namespace BDN.benchmark.Resp
+{
+ [MemoryDiagnoser]
+ public unsafe class RespAofStress
+ {
+ EmbeddedRespServer server;
+ RespServerSession session;
+ const int batchSize = 128;
+
+ static ReadOnlySpan SET => "*3\r\n$3\r\nSET\r\n$1\r\na\r\n$1\r\na\r\n"u8;
+ byte[] setRequestBuffer;
+ byte* setRequestBufferPointer;
+
+ static ReadOnlySpan INCR => "*2\r\n$4\r\nINCR\r\n$1\r\ni\r\n"u8;
+ byte[] incrRequestBuffer;
+ byte* incrRequestBufferPointer;
+
+ static ReadOnlySpan LPUSHPOP => "*3\r\n$5\r\nLPUSH\r\n$1\r\nd\r\n$1\r\ne\r\n*2\r\n$4\r\nLPOP\r\n$1\r\nd\r\n"u8;
+ byte[] lPushPopRequestBuffer;
+ byte* lPushPopRequestBufferPointer;
+
+ [GlobalSetup]
+ public void GlobalSetup()
+ {
+ var opt = new GarnetServerOptions
+ {
+ QuietMode = true,
+ EnableAOF = true,
+ UseAofNullDevice = true,
+ MainMemoryReplication = true,
+ CommitFrequencyMs = -1,
+ AofPageSize = "128m",
+ AofMemorySize = "256m",
+ };
+ server = new EmbeddedRespServer(opt);
+
+ session = server.GetRespSession();
+
+ setRequestBuffer = GC.AllocateArray(SET.Length * batchSize, pinned: true);
+ setRequestBufferPointer = (byte*)Unsafe.AsPointer(ref setRequestBuffer[0]);
+ for (int i = 0; i < batchSize; i++)
+ SET.CopyTo(new Span(setRequestBuffer).Slice(i * SET.Length));
+
+ _ = session.TryConsumeMessages(setRequestBufferPointer, setRequestBuffer.Length);
+
+ incrRequestBuffer = GC.AllocateArray(INCR.Length * batchSize, pinned: true);
+ incrRequestBufferPointer = (byte*)Unsafe.AsPointer(ref incrRequestBuffer[0]);
+ for (int i = 0; i < batchSize; i++)
+ INCR.CopyTo(new Span(incrRequestBuffer).Slice(i * INCR.Length));
+
+ _ = session.TryConsumeMessages(incrRequestBufferPointer, incrRequestBuffer.Length);
+
+ lPushPopRequestBuffer = GC.AllocateArray(LPUSHPOP.Length * batchSize, pinned: true);
+ lPushPopRequestBufferPointer = (byte*)Unsafe.AsPointer(ref lPushPopRequestBuffer[0]);
+ for (int i = 0; i < batchSize; i++)
+ LPUSHPOP.CopyTo(new Span(lPushPopRequestBuffer).Slice(i * LPUSHPOP.Length));
+
+ // Pre-populate list with a single element to avoid repeatedly emptying it during the benchmark
+ SlowConsumeMessage("*3\r\n$5\r\nLPUSH\r\n$1\r\nd\r\n$1\r\nf\r\n"u8);
+ }
+
+ [GlobalCleanup]
+ public void GlobalCleanup()
+ {
+ session.Dispose();
+ server.Dispose();
+ }
+
+ [Benchmark]
+ public void Set()
+ {
+ _ = session.TryConsumeMessages(setRequestBufferPointer, setRequestBuffer.Length);
+ }
+
+ [Benchmark]
+ public void Increment()
+ {
+ _ = session.TryConsumeMessages(incrRequestBufferPointer, incrRequestBuffer.Length);
+ }
+
+ [Benchmark]
+ public void LPushPop()
+ {
+ _ = session.TryConsumeMessages(lPushPopRequestBufferPointer, lPushPopRequestBuffer.Length);
+ }
+
+ private void SlowConsumeMessage(ReadOnlySpan message)
+ {
+ var buffer = GC.AllocateArray(message.Length, pinned: true);
+ var bufferPointer = (byte*)Unsafe.AsPointer(ref buffer[0]);
+ message.CopyTo(new Span(buffer));
+ _ = session.TryConsumeMessages(bufferPointer, buffer.Length);
+ }
+ }
+}
\ No newline at end of file
diff --git a/benchmark/BDN.benchmark/Resp/RespParseStress.cs b/benchmark/BDN.benchmark/Resp/RespParseStress.cs
index 19ea1f41dd..a46bc8c3af 100644
--- a/benchmark/BDN.benchmark/Resp/RespParseStress.cs
+++ b/benchmark/BDN.benchmark/Resp/RespParseStress.cs
@@ -35,6 +35,10 @@ public unsafe class RespParseStress
byte[] getRequestBuffer;
byte* getRequestBufferPointer;
+ static ReadOnlySpan INCR => "*2\r\n$4\r\nINCR\r\n$1\r\ni\r\n"u8;
+ byte[] incrRequestBuffer;
+ byte* incrRequestBufferPointer;
+
static ReadOnlySpan ZADDREM => "*4\r\n$4\r\nZADD\r\n$1\r\nc\r\n$1\r\n1\r\n$1\r\nc\r\n*3\r\n$4\r\nZREM\r\n$1\r\nc\r\n$1\r\nc\r\n"u8;
byte[] zAddRemRequestBuffer;
byte* zAddRemRequestBufferPointer;
@@ -92,6 +96,11 @@ public void GlobalSetup()
for (int i = 0; i < batchSize; i++)
GET.CopyTo(new Span(getRequestBuffer).Slice(i * GET.Length));
+ incrRequestBuffer = GC.AllocateArray(INCR.Length * batchSize, pinned: true);
+ incrRequestBufferPointer = (byte*)Unsafe.AsPointer(ref incrRequestBuffer[0]);
+ for (int i = 0; i < batchSize; i++)
+ INCR.CopyTo(new Span(incrRequestBuffer).Slice(i * INCR.Length));
+
zAddRemRequestBuffer = GC.AllocateArray(ZADDREM.Length * batchSize, pinned: true);
zAddRemRequestBufferPointer = (byte*)Unsafe.AsPointer(ref zAddRemRequestBuffer[0]);
for (int i = 0; i < batchSize; i++)
@@ -116,7 +125,7 @@ public void GlobalSetup()
SlowConsumeMessage("*4\r\n$4\r\nZADD\r\n$1\r\nc\r\n$1\r\n1\r\n$1\r\nd\r\n"u8);
// Pre-populate list with a single element to avoid repeatedly emptying it during the benchmark
- SlowConsumeMessage("*3\r\n$4\r\nLPUSH\r\n$1\r\nd\r\n$1\r\nf\r\n"u8);
+ SlowConsumeMessage("*3\r\n$5\r\nLPUSH\r\n$1\r\nd\r\n$1\r\nf\r\n"u8);
// Pre-populate set with a single element to avoid repeatedly emptying it during the benchmark
SlowConsumeMessage("*3\r\n$4\r\nSADD\r\n$1\r\ne\r\n$1\r\nb\r\n"u8);
@@ -164,6 +173,12 @@ public void Get()
_ = session.TryConsumeMessages(getRequestBufferPointer, getRequestBuffer.Length);
}
+ [Benchmark]
+ public void Increment()
+ {
+ _ = session.TryConsumeMessages(incrRequestBufferPointer, incrRequestBuffer.Length);
+ }
+
[Benchmark]
public void ZAddRem()
{
diff --git a/benchmark/Resp.benchmark/Program.cs b/benchmark/Resp.benchmark/Program.cs
index 6406cadcda..cb3a151e8d 100644
--- a/benchmark/Resp.benchmark/Program.cs
+++ b/benchmark/Resp.benchmark/Program.cs
@@ -195,7 +195,7 @@ static void Main(string[] args)
static void WaitForServer(Options opts)
{
- using var client = new GarnetClientSession(opts.Address, opts.Port, opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
+ using var client = new GarnetClientSession(opts.Address, opts.Port, new(), tlsOptions: opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
while (true)
{
try
diff --git a/benchmark/Resp.benchmark/RespOnlineBench.cs b/benchmark/Resp.benchmark/RespOnlineBench.cs
index 84a751fe40..6b89212f5a 100644
--- a/benchmark/Resp.benchmark/RespOnlineBench.cs
+++ b/benchmark/Resp.benchmark/RespOnlineBench.cs
@@ -157,7 +157,7 @@ private void InitializeClients()
{
gcsPool = new AsyncPool(opts.NumThreads.First(), () =>
{
- var c = new GarnetClientSession(address, port, opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
+ var c = new GarnetClientSession(address, port, new(), tlsOptions: opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
c.Connect();
if (auth != null)
{
@@ -573,8 +573,8 @@ public async void OpRunnerGarnetClientSession(int thread_id)
client = new GarnetClientSession(
address,
port,
- opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null,
- bufferSize: Math.Max(bufferSizeValue, opts.ValueLength * opts.IntraThreadParallelism));
+ new(Math.Max(bufferSizeValue, opts.ValueLength * opts.IntraThreadParallelism)),
+ tlsOptions: opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
client.Connect();
if (auth != null)
{
@@ -669,7 +669,11 @@ public async void OpRunnerGarnetClientSessionParallel(int thread_id, int paralle
GarnetClientSession client = null;
if (!opts.Pool)
{
- client = new GarnetClientSession(address, port, opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null, null, null, Math.Max(131072, opts.IntraThreadParallelism * opts.ValueLength));
+ client = new GarnetClientSession(
+ address,
+ port,
+ new NetworkBufferSettings(Math.Max(131072, opts.IntraThreadParallelism * opts.ValueLength)),
+ tlsOptions: opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
client.Connect();
if (auth != null)
{
diff --git a/benchmark/Resp.benchmark/RespPerfBench.cs b/benchmark/Resp.benchmark/RespPerfBench.cs
index 1fc6898907..74c535104a 100644
--- a/benchmark/Resp.benchmark/RespPerfBench.cs
+++ b/benchmark/Resp.benchmark/RespPerfBench.cs
@@ -407,7 +407,7 @@ private void GarnetClientSessionOperateThreadRunner(int NumOps, OpType opType, R
default:
throw new Exception($"opType: {opType} benchmark not supported with GarnetClientSession!");
}
- var c = new GarnetClientSession(opts.Address, opts.Port, opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
+ var c = new GarnetClientSession(opts.Address, opts.Port, new(), tlsOptions: opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
c.Connect();
if (opts.Auth != null)
{
diff --git a/benchmark/Resp.benchmark/TxnPerfBench.cs b/benchmark/Resp.benchmark/TxnPerfBench.cs
index 9670f8a5b7..89fa74e759 100644
--- a/benchmark/Resp.benchmark/TxnPerfBench.cs
+++ b/benchmark/Resp.benchmark/TxnPerfBench.cs
@@ -107,7 +107,7 @@ public void Run()
{
gcsPool = new AsyncPool(opts.NumThreads.First(), () =>
{
- var c = new GarnetClientSession(address, port, opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
+ var c = new GarnetClientSession(address, port, new(), tlsOptions: opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
c.Connect();
if (auth != null)
{
@@ -325,7 +325,7 @@ public void OpRunnerSERedis(int thread_id)
public void LoadData()
{
var req = new OnlineReqGen(0, opts.DbSize, true, opts.Zipf, opts.KeyLength, opts.ValueLength);
- GarnetClientSession client = new(address, port, opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
+ GarnetClientSession client = new(address, port, new(), tlsOptions: opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
client.Connect();
if (auth != null)
{
diff --git a/hosting/Windows/Garnet.worker/Garnet.worker.csproj b/hosting/Windows/Garnet.worker/Garnet.worker.csproj
new file mode 100644
index 0000000000..4f639c7e12
--- /dev/null
+++ b/hosting/Windows/Garnet.worker/Garnet.worker.csproj
@@ -0,0 +1,16 @@
+
+
+
+ net8.0
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/hosting/Windows/Garnet.worker/Program.cs b/hosting/Windows/Garnet.worker/Program.cs
new file mode 100644
index 0000000000..8418da8671
--- /dev/null
+++ b/hosting/Windows/Garnet.worker/Program.cs
@@ -0,0 +1,23 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT license.
+
+using Garnet;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+
+class Program
+{
+ static void Main(string[] args)
+ {
+ var builder = Host.CreateApplicationBuilder(args);
+ builder.Services.AddHostedService(_ => new Worker(args));
+
+ builder.Services.AddWindowsService(options =>
+ {
+ options.ServiceName = "Microsoft Garnet Server";
+ });
+
+ var host = builder.Build();
+ host.Run();
+ }
+}
\ No newline at end of file
diff --git a/hosting/Windows/Garnet.worker/Worker.cs b/hosting/Windows/Garnet.worker/Worker.cs
new file mode 100644
index 0000000000..d69adb7e3c
--- /dev/null
+++ b/hosting/Windows/Garnet.worker/Worker.cs
@@ -0,0 +1,60 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT license.
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Hosting;
+
+namespace Garnet
+{
+ public class Worker : BackgroundService
+ {
+ private bool _isDisposed = false;
+ private readonly string[] args;
+
+ private GarnetServer server;
+
+ public Worker(string[] args)
+ {
+ this.args = args;
+ }
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ try
+ {
+ server = new GarnetServer(args);
+
+ // Start the server
+ server.Start();
+
+ await Task.Delay(Timeout.Infinite, stoppingToken).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine($"Unable to initialize server due to exception: {ex.Message}");
+ }
+ }
+
+ ///
+ /// Triggered when the application host is performing a graceful shutdown.
+ ///
+ /// Indicates that the shutdown process should no longer be graceful.
+ public override async Task StopAsync(CancellationToken cancellationToken)
+ {
+ Dispose();
+ await base.StopAsync(cancellationToken).ConfigureAwait(false);
+ }
+
+ public override void Dispose()
+ {
+ if (_isDisposed)
+ {
+ return;
+ }
+ server?.Dispose();
+ _isDisposed = true;
+ }
+ }
+}
\ No newline at end of file
diff --git a/libs/client/ClientSession/GarnetClientSession.cs b/libs/client/ClientSession/GarnetClientSession.cs
index 70622fe4e6..92aa7951fc 100644
--- a/libs/client/ClientSession/GarnetClientSession.cs
+++ b/libs/client/ClientSession/GarnetClientSession.cs
@@ -23,7 +23,6 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo
{
readonly string address;
readonly int port;
- readonly int bufferSize;
readonly int bufferSizeDigits;
INetworkSender networkSender;
readonly ElasticCircularBuffer tasksTypes = new();
@@ -61,8 +60,6 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo
///
public bool IsConnected => socket != null && socket.Connected && !Disposed;
- readonly LimitedFixedBufferPool networkPool;
-
///
/// Username to authenticate the session on the server.
///
@@ -73,6 +70,21 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo
///
readonly string authPassword = null;
+ ///
+ /// Indicating whether this instance is using its own network pool or one that was provided
+ ///
+ readonly bool usingManagedNetworkPool = false;
+
+ ///
+ /// Instance of network buffer settings describing the send and receive buffer sizes
+ ///
+ readonly NetworkBufferSettings networkBufferSettings;
+
+ ///
+ /// NetworkPool used to allocate send and receive buffers
+ ///
+ readonly LimitedFixedBufferPool networkPool;
+
///
/// Create client instance
///
@@ -81,16 +93,29 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo
/// TLS options
/// Username to authenticate with
/// Password to authenticate with
- /// Network buffer size
+ /// Settings for send and receive network buffers
+ /// Buffer pool to use for allocating send and receive buffers
/// Max outstanding network sends allowed
/// Logger
- public GarnetClientSession(string address, int port, SslClientAuthenticationOptions tlsOptions = null, string authUsername = null, string authPassword = null, int bufferSize = 1 << 17, int networkSendThrottleMax = 8, ILogger logger = null)
+ public GarnetClientSession(
+ string address,
+ int port,
+ NetworkBufferSettings networkBufferSettings,
+ LimitedFixedBufferPool networkPool = null,
+ SslClientAuthenticationOptions tlsOptions = null,
+ string authUsername = null,
+ string authPassword = null,
+ int networkSendThrottleMax = 8,
+ ILogger logger = null)
{
- this.networkPool = new LimitedFixedBufferPool(bufferSize, logger: logger);
this.address = address;
this.port = port;
- this.bufferSize = bufferSize;
- this.bufferSizeDigits = NumUtils.NumDigits(bufferSize);
+
+ this.usingManagedNetworkPool = networkPool != null;
+ this.networkBufferSettings = networkBufferSettings;
+ this.networkPool = networkPool ?? networkBufferSettings.CreateBufferPool();
+ this.bufferSizeDigits = NumUtils.NumDigits(this.networkBufferSettings.sendBufferSize);
+
this.logger = logger;
this.sslOptions = tlsOptions;
this.networkSendThrottleMax = networkSendThrottleMax;
@@ -107,7 +132,15 @@ public GarnetClientSession(string address, int port, SslClientAuthenticationOpti
public void Connect(int timeoutMs = 0, CancellationToken token = default)
{
socket = GetSendSocket(address, port, timeoutMs);
- networkHandler = new GarnetClientSessionTcpNetworkHandler(this, socket, networkPool, sslOptions != null, this, networkSendThrottleMax, logger);
+ networkHandler = new GarnetClientSessionTcpNetworkHandler(
+ this,
+ socket,
+ networkBufferSettings,
+ networkPool,
+ sslOptions != null,
+ messageConsumer: this,
+ networkSendThrottleMax: networkSendThrottleMax,
+ logger: logger);
networkHandler.StartAsync(sslOptions, $"{address}:{port}", token).ConfigureAwait(false).GetAwaiter().GetResult();
networkSender = networkHandler.GetNetworkSender();
networkSender.GetResponseObject();
@@ -159,7 +192,7 @@ public void Dispose()
networkSender?.ReturnResponseObject();
socket?.Dispose();
networkHandler?.Dispose();
- networkPool.Dispose();
+ if (!usingManagedNetworkPool) networkPool.Dispose();
}
///
@@ -259,8 +292,8 @@ public void ExecuteClusterAppendLog(string nodeId, long previousAddress, long cu
}
offset = curr;
- if (payloadLength > bufferSize)
- throw new Exception($"Payload length {payloadLength} is larger than bufferSize {bufferSize} bytes");
+ if (payloadLength > networkBufferSettings.sendBufferSize)
+ throw new Exception($"Payload length {payloadLength} is larger than bufferSize {networkBufferSettings.sendBufferSize} bytes");
while (!RespWriteUtils.WriteBulkString(new Span((void*)payloadPtr, payloadLength), ref curr, end))
{
diff --git a/libs/client/ClientSession/GarnetClientSessionTcpNetworkHandler.cs b/libs/client/ClientSession/GarnetClientSessionTcpNetworkHandler.cs
index 2398223023..4351eb683d 100644
--- a/libs/client/ClientSession/GarnetClientSessionTcpNetworkHandler.cs
+++ b/libs/client/ClientSession/GarnetClientSessionTcpNetworkHandler.cs
@@ -10,8 +10,8 @@ namespace Garnet.client
{
sealed class GarnetClientSessionTcpNetworkHandler : TcpNetworkHandlerBase
{
- public GarnetClientSessionTcpNetworkHandler(GarnetClientSession serverHook, Socket socket, LimitedFixedBufferPool networkPool, bool useTLS, IMessageConsumer messageConsumer, int networkSendThrottleMax = 8, ILogger logger = null)
- : base(serverHook, new GarnetTcpNetworkSender(socket, networkPool, networkSendThrottleMax), socket, networkPool, useTLS, messageConsumer, logger)
+ public GarnetClientSessionTcpNetworkHandler(GarnetClientSession serverHook, Socket socket, NetworkBufferSettings networkBufferSettings, LimitedFixedBufferPool networkPool, bool useTLS, IMessageConsumer messageConsumer, int networkSendThrottleMax = 8, ILogger logger = null)
+ : base(serverHook, new GarnetTcpNetworkSender(socket, networkBufferSettings, networkPool, networkSendThrottleMax), socket, networkBufferSettings, networkPool, useTLS, messageConsumer: messageConsumer, logger: logger)
{
}
diff --git a/libs/client/ClientTcpNetworkSender.cs b/libs/client/ClientTcpNetworkSender.cs
index c15d300b17..a8061a8cb0 100644
--- a/libs/client/ClientTcpNetworkSender.cs
+++ b/libs/client/ClientTcpNetworkSender.cs
@@ -22,10 +22,11 @@ public class ClientTcpNetworkSender : GarnetTcpNetworkSender
///
///
///
+ ///
///
///
- public ClientTcpNetworkSender(Socket socket, Action