Skip to content

Commit

Permalink
Renames
Browse files Browse the repository at this point in the history
- TransportClient => interface IRequestInvoker
- HttpTransport => interface ITransport
- DefaultHttpTransport => DistributedTransport
  • Loading branch information
Mpdreamz committed Aug 30, 2023
1 parent dcd0fdc commit 112c8da
Show file tree
Hide file tree
Showing 55 changed files with 356 additions and 357 deletions.
10 changes: 5 additions & 5 deletions benchmarks/Elastic.Transport.Benchmarks/TransportBenchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,22 @@ namespace Elastic.Transport.Benchmarks
{
public class TransportBenchmarks
{
private DefaultHttpTransport _transport;
private DistributedTransport _requestHandler;

[GlobalSetup]
public void Setup()
{
var connection = new InMemoryTransportClient();
var connection = new InMemoryRequestInvoker();
var pool = new SingleNodePool(new Uri("http://localhost:9200"));
var settings = new TransportConfiguration(pool, connection);

_transport = new DefaultHttpTransport(settings);
_requestHandler = new DistributedTransport(settings);
}

[Benchmark]
public void TransportSuccessfulRequestBenchmark() => _transport.Get<VoidResponse>("/");
public void TransportSuccessfulRequestBenchmark() => _requestHandler.Get<VoidResponse>("/");

[Benchmark]
public async Task TransportSuccessfulAsyncRequestBenchmark() => await _transport.GetAsync<VoidResponse>("/");
public async Task TransportSuccessfulAsyncRequestBenchmark() => await _requestHandler.GetAsync<VoidResponse>("/");
}
}
2 changes: 1 addition & 1 deletion benchmarks/Elastic.Transport.Profiling/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ private static async Task Main()
MemoryProfiler.GetSnapshot("start");

var config = new TransportConfiguration(new Uri("http://localhost:9200"), new ElasticsearchProductRegistration());
var transport = new DefaultHttpTransport(config);
var transport = new DistributedTransport(config);

// WARMUP
for (var i = 0; i < 50; i++) _ = await transport.GetAsync<VoidResponse>("/");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ public ExposingPipelineFactory(TConfiguration connectionSettings, DateTimeProvid
MemoryStreamFactory = TransportConfiguration.DefaultMemoryStreamFactory;
Settings = connectionSettings;
Pipeline = Create(Settings, DateTimeProvider, MemoryStreamFactory, new DefaultRequestParameters());
Transport = new DefaultHttpTransport<TConfiguration>(Settings, this, DateTimeProvider, MemoryStreamFactory);
RequestHandler = new DistributedTransport<TConfiguration>(Settings, this, DateTimeProvider, MemoryStreamFactory);
}

// ReSharper disable once MemberCanBePrivate.Global
public RequestPipeline Pipeline { get; }
private DateTimeProvider DateTimeProvider { get; }
private MemoryStreamFactory MemoryStreamFactory { get; }
private TConfiguration Settings { get; }
public HttpTransport<TConfiguration> Transport { get; }
public ITransport<TConfiguration> RequestHandler { get; }

public override RequestPipeline Create(TConfiguration configurationValues, DateTimeProvider dateTimeProvider,
MemoryStreamFactory memoryStreamFactory, RequestParameters requestParameters) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@ namespace Elastic.Transport.VirtualizedCluster.Components;
/// <summary>
/// A continuation of <see cref="VirtualCluster"/>'s builder methods that creates
/// an instance of <see cref="TransportConfiguration"/> for the cluster after which the components such as
/// <see cref="TransportClient"/> and <see cref="NodePool"/> can no longer be updated.
/// <see cref="IRequestInvoker"/> and <see cref="NodePool"/> can no longer be updated.
/// </summary>
public sealed class SealedVirtualCluster
{
private readonly TransportClient _connection;
private readonly IRequestInvoker _connection;
private readonly NodePool _connectionPool;
private readonly TestableDateTimeProvider _dateTimeProvider;
private readonly MockProductRegistration _productRegistration;

internal SealedVirtualCluster(VirtualCluster cluster, NodePool pool, TestableDateTimeProvider dateTimeProvider, MockProductRegistration productRegistration)
{
_connectionPool = pool;
_connection = new VirtualClusterTransportClient(cluster, dateTimeProvider);
_connection = new VirtualClusterTransport(cluster, dateTimeProvider);
_dateTimeProvider = dateTimeProvider;
_productRegistration = productRegistration;
}
Expand All @@ -44,7 +44,7 @@ public VirtualizedCluster Settings(Func<TransportConfiguration, TransportConfigu
/// Allows you to create an instance of `<see cref="VirtualClusterConnection"/> using the DSL provided by <see cref="Virtual"/>
/// </summary>
/// <param name="selector">Provide custom configuration options</param>
public VirtualClusterTransportClient VirtualClusterConnection(Func<TransportConfiguration, TransportConfiguration> selector = null) =>
public VirtualClusterTransport VirtualClusterConnection(Func<TransportConfiguration, TransportConfiguration> selector = null) =>
new VirtualizedCluster(_dateTimeProvider, selector == null ? CreateSettings() : selector(CreateSettings()))
.Connection;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace Elastic.Transport.VirtualizedCluster.Components;
/// <see cref="SealedVirtualCluster.VirtualClusterConnection"/> becomes available
/// </pre>
/// </summary>
public class VirtualClusterTransportClient : InMemoryTransportClient
public class VirtualClusterTransport : IRequestInvoker
{
private static readonly object Lock = new();

Expand All @@ -41,18 +41,23 @@ public class VirtualClusterTransportClient : InMemoryTransportClient
private MockProductRegistration _productRegistration;
private IDictionary<int, State> _calls = new Dictionary<int, State>();

internal VirtualClusterTransportClient(VirtualCluster cluster, TestableDateTimeProvider dateTimeProvider)
private readonly InMemoryRequestInvoker _inMemoryRequestInvoker;

internal VirtualClusterTransport(VirtualCluster cluster, TestableDateTimeProvider dateTimeProvider)
{
UpdateCluster(cluster);
_dateTimeProvider = dateTimeProvider;
_productRegistration = cluster.ProductRegistration;
_inMemoryRequestInvoker = new InMemoryRequestInvoker();
}

void IDisposable.Dispose() { }

/// <summary>
/// Create a <see cref="VirtualClusterTransportClient"/> instance that always returns a successful response.
/// Create a <see cref="VirtualClusterTransport"/> instance that always returns a successful response.
/// </summary>
/// <param name="response">The bytes to be returned on every API call invocation</param>
public static VirtualClusterTransportClient Success(byte[] response) =>
public static VirtualClusterTransport Success(byte[] response) =>
Virtual.Elasticsearch
.Bootstrap(1)
.ClientCalls(r => r.SucceedAlways().ReturnByteResponse(response))
Expand All @@ -61,9 +66,9 @@ public static VirtualClusterTransportClient Success(byte[] response) =>
.Connection;

/// <summary>
/// Create a <see cref="VirtualClusterTransportClient"/> instance that always returns a failed response.
/// Create a <see cref="VirtualClusterTransport"/> instance that always returns a failed response.
/// </summary>
public static VirtualClusterTransportClient Error() =>
public static VirtualClusterTransport Error() =>
Virtual.Elasticsearch
.Bootstrap(1)
.ClientCalls(r => r.FailAlways(400))
Expand Down Expand Up @@ -109,12 +114,14 @@ private void UpdateCluster(VirtualCluster cluster)

private bool IsPingRequest(RequestData requestData) => _productRegistration.IsPingRequest(requestData);

/// <inheritdoc cref="TransportClient.RequestAsync{TResponse}"/>>
public override Task<TResponse> RequestAsync<TResponse>(RequestData requestData, CancellationToken cancellationToken) =>
/// <inheritdoc cref="IRequestInvoker.RequestAsync{TResponse}"/>>
public Task<TResponse> RequestAsync<TResponse>(RequestData requestData, CancellationToken cancellationToken)
where TResponse : TransportResponse, new() =>
Task.FromResult(Request<TResponse>(requestData));

/// <inheritdoc cref="TransportClient.Request{TResponse}"/>>
public override TResponse Request<TResponse>(RequestData requestData)
/// <inheritdoc cref="IRequestInvoker.Request{TResponse}"/>>
public TResponse Request<TResponse>(RequestData requestData)
where TResponse : TransportResponse, new()
{
if (!_calls.ContainsKey(requestData.Uri.Port))
throw new Exception($"Expected a call to happen on port {requestData.Uri.Port} but received none");
Expand Down Expand Up @@ -265,7 +272,7 @@ private TResponse Fail<TResponse, TRule>(RequestData requestData, TRule rule, Ru

return ret.Match(
(e) => throw e,
(statusCode) => ReturnConnectionStatus<TResponse>(requestData, CallResponse(rule),
(statusCode) => _inMemoryRequestInvoker.BuildResponse<TResponse>(requestData, CallResponse(rule),
//make sure we never return a valid status code in Fail responses because of a bad rule.
statusCode >= 200 && statusCode < 300 ? 502 : statusCode, rule.ReturnContentType)
);
Expand All @@ -282,7 +289,7 @@ TRule rule
rule.RecordExecuted();

beforeReturn?.Invoke(rule);
return ReturnConnectionStatus<TResponse>(requestData, successResponse(rule), contentType: rule.ReturnContentType);
return _inMemoryRequestInvoker.BuildResponse<TResponse>(requestData, successResponse(rule), contentType: rule.ReturnContentType);
}

private static byte[] CallResponse<TRule>(TRule rule)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ public class VirtualizedCluster
private readonly TestableDateTimeProvider _dateTimeProvider;
private readonly TransportConfiguration _settings;

private Func<HttpTransport<ITransportConfiguration>, Func<RequestConfigurationDescriptor, IRequestConfiguration>, Task<TransportResponse>> _asyncCall;
private Func<HttpTransport<ITransportConfiguration>, Func<RequestConfigurationDescriptor, IRequestConfiguration>, TransportResponse> _syncCall;
private Func<ITransport<ITransportConfiguration>, Func<RequestConfigurationDescriptor, IRequestConfiguration>, Task<TransportResponse>> _asyncCall;
private Func<ITransport<ITransportConfiguration>, Func<RequestConfigurationDescriptor, IRequestConfiguration>, TransportResponse> _syncCall;

private class VirtualResponse : TransportResponse { }

Expand Down Expand Up @@ -48,13 +48,13 @@ internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, Transport
};
}

public VirtualClusterTransportClient Connection => Transport.Settings.Connection as VirtualClusterTransportClient;
public NodePool ConnectionPool => Transport.Settings.NodePool;
public HttpTransport<ITransportConfiguration> Transport => _exposingRequestPipeline?.Transport;
public VirtualClusterTransport Connection => RequestHandler.Configuration.Connection as VirtualClusterTransport;
public NodePool ConnectionPool => RequestHandler.Configuration.NodePool;
public ITransport<ITransportConfiguration> RequestHandler => _exposingRequestPipeline?.RequestHandler;

public VirtualizedCluster TransportProxiesTo(
Func<HttpTransport<ITransportConfiguration>, Func<RequestConfigurationDescriptor, IRequestConfiguration>, TransportResponse> sync,
Func<HttpTransport<ITransportConfiguration>, Func<RequestConfigurationDescriptor, IRequestConfiguration>, Task<TransportResponse>> async
Func<ITransport<ITransportConfiguration>, Func<RequestConfigurationDescriptor, IRequestConfiguration>, TransportResponse> sync,
Func<ITransport<ITransportConfiguration>, Func<RequestConfigurationDescriptor, IRequestConfiguration>, Task<TransportResponse>> async
)
{
_syncCall = sync;
Expand All @@ -63,10 +63,10 @@ Func<HttpTransport<ITransportConfiguration>, Func<RequestConfigurationDescriptor
}

public TransportResponse ClientCall(Func<RequestConfigurationDescriptor, IRequestConfiguration> requestOverrides = null) =>
_syncCall(Transport, requestOverrides);
_syncCall(RequestHandler, requestOverrides);

public async Task<TransportResponse> ClientCallAsync(Func<RequestConfigurationDescriptor, IRequestConfiguration> requestOverrides = null) =>
await _asyncCall(Transport, requestOverrides).ConfigureAwait(false);
await _asyncCall(RequestHandler, requestOverrides).ConfigureAwait(false);

public void ChangeTime(Func<DateTimeOffset, DateTimeOffset> change) => _dateTimeProvider.ChangeTime(change);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@
namespace Elastic.Transport.VirtualizedCluster.Products;

/// <summary>
/// Makes sure <see cref="VirtualClusterTransportClient"/> is mockable by providing a different sniff response based on the current <see cref="ProductRegistration"/>
/// Makes sure <see cref="VirtualClusterTransport"/> is mockable by providing a different sniff response based on the current <see cref="ProductRegistration"/>
/// </summary>
public abstract class MockProductRegistration
{
/// <summary>
/// Information about the current product we are injecting into <see cref="HttpTransport{TConnectionSettings}"/>
/// Information about the current product we are injecting into <see cref="ITransport{TConfiguration}"/>
/// </summary>
public abstract ProductRegistration ProductRegistration { get; }

/// <summary>
/// Return the sniff response for the product as raw bytes for <see cref="TransportClient.Request{TResponse}"/> to return.
/// Return the sniff response for the product as raw bytes for <see cref="IRequestInvoker.Request{TResponse}"/> to return.
/// </summary>
/// <param name="nodes">The nodes we expect to be returned in the response</param>
/// <param name="stackVersion">The current version under test</param>
Expand All @@ -28,7 +28,7 @@ public abstract class MockProductRegistration
public abstract byte[] CreateSniffResponseBytes(IReadOnlyList<Node> nodes, string stackVersion, string publishAddressOverride, bool returnFullyQualifiedDomainNames);

/// <summary>
/// see <see cref="VirtualClusterTransportClient.Request{TResponse}"/> uses this to determine if the current request is a sniff request and should follow
/// see <see cref="VirtualClusterTransport.Request{TResponse}"/> uses this to determine if the current request is a sniff request and should follow
/// the sniffing rules
/// </summary>
public abstract bool IsSniffRequest(RequestData requestData);
Expand Down
4 changes: 2 additions & 2 deletions src/Elastic.Transport/Components/NodePool/Node.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
namespace Elastic.Transport;

/// <summary>
/// Represents an endpoint <see cref="Uri"/> with additional associated metadata on which the <see cref="HttpTransport{TConnectionSettings}"/> can act.
/// Represents an endpoint <see cref="Uri"/> with additional associated metadata on which the <see cref="ITransport{TConfiguration}"/> can act.
/// </summary>
public sealed class Node : IEquatable<Node>
{
Expand Down Expand Up @@ -65,7 +65,7 @@ public IReadOnlyCollection<string> Features
public Uri Uri { get; }

/// <summary>
/// Indicates whether the node is alive. <see cref="HttpTransport{TConnectionSettings}"/> can take nodes out of rotation by calling
/// Indicates whether the node is alive. <see cref="ITransport{TConfiguration}"/> can take nodes out of rotation by calling
/// <see cref="MarkDead"/> on <see cref="Node"/>.
/// </summary>
public bool IsAlive { get; private set; }
Expand Down
8 changes: 4 additions & 4 deletions src/Elastic.Transport/Components/NodePool/NodePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ namespace Elastic.Transport;
/// <summary>
/// A node pool is responsible for maintaining a read only collection of <see cref="Node"/>(s) under <see cref="Nodes"/>.
/// <para>
/// Unlike the name might suggest this component is not responsible for IO level pooling. For that we rely on <see cref="TransportClient"/> abstracting away
/// Unlike the name might suggest this component is not responsible for IO level pooling. For that we rely on <see cref="IRequestInvoker"/> abstracting away
/// the connection IO pooling.
/// </para>
/// <para>This interface signals the current connection strategy to <see cref="HttpTransport{TConnectionSettings}"/>.</para>
/// <para>This interface signals the current connection strategy to <see cref="ITransport{TConfiguration}"/>.</para>
/// </summary>
public abstract class NodePool : IDisposable
{
Expand Down Expand Up @@ -68,7 +68,7 @@ internal NodePool() { }
public abstract bool UsingSsl { get; protected set; }

/// <summary>
///
///
/// </summary>
public void Dispose()
{
Expand All @@ -77,7 +77,7 @@ public void Dispose()
}

/// <summary>
///
///
/// </summary>
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
namespace Elastic.Transport;

/// <summary>
/// A node pool that enables <see cref="SupportsReseeding"/> which in turn allows the <see cref="HttpTransport{TConnectionSettings}"/> to enable sniffing to
/// A node pool that enables <see cref="SupportsReseeding"/> which in turn allows the <see cref="ITransport{TConfiguration}"/> to enable sniffing to
/// discover the current cluster's list of active nodes.
/// </summary>
public class SniffingNodePool : StaticNodePool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
namespace Elastic.Transport;

/// <summary>
/// A node pool that disables <see cref="SupportsReseeding"/> which in turn disallows the <see cref="HttpTransport{TConnectionSettings}"/> to enable sniffing to
/// A node pool that disables <see cref="SupportsReseeding"/> which in turn disallows the <see cref="ITransport{TConfiguration}"/> to enable sniffing to
/// discover the current cluster's list of active nodes.
/// <para>Therefore the nodes you supply are the list of known nodes throughout its lifetime, hence static</para>
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace Elastic.Transport;
public class DefaultRequestPipeline<TConfiguration> : RequestPipeline
where TConfiguration : class, ITransportConfiguration
{
private readonly TransportClient _transportClient;
private readonly IRequestInvoker _requestInvoker;
private readonly NodePool _nodePool;
private readonly DateTimeProvider _dateTimeProvider;
private readonly MemoryStreamFactory _memoryStreamFactory;
Expand All @@ -48,7 +48,7 @@ RequestParameters requestParameters
{
_settings = configurationValues;
_nodePool = _settings.NodePool;
_transportClient = _settings.Connection;
_requestInvoker = _settings.Connection;
_dateTimeProvider = dateTimeProvider;
_memoryStreamFactory = memoryStreamFactory;
_productRegistration = configurationValues.ProductRegistration;
Expand Down Expand Up @@ -195,7 +195,7 @@ public override TResponse CallProductEndpoint<TResponse>(RequestData requestData

try
{
var response = _transportClient.Request<TResponse>(requestData);
var response = _requestInvoker.Request<TResponse>(requestData);

#if NET6_0_OR_GREATER
activity?.SetStatus(response.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType ? ActivityStatusCode.Ok : ActivityStatusCode.Error);
Expand Down Expand Up @@ -247,7 +247,7 @@ public override async Task<TResponse> CallProductEndpointAsync<TResponse>(Reques

try
{
var response = await _transportClient.RequestAsync<TResponse>(requestData, cancellationToken).ConfigureAwait(false);
var response = await _requestInvoker.RequestAsync<TResponse>(requestData, cancellationToken).ConfigureAwait(false);

#if NET6_0_OR_GREATER
activity?.SetStatus(response.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType ? ActivityStatusCode.Ok : ActivityStatusCode.Error);
Expand Down Expand Up @@ -473,7 +473,7 @@ public override void Ping(Node node)

try
{
var response = _productRegistration.Ping(_transportClient, pingData);
var response = _productRegistration.Ping(_requestInvoker, pingData);

ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCallDetails);

Expand Down Expand Up @@ -508,7 +508,7 @@ public override async Task PingAsync(Node node, CancellationToken cancellationTo

try
{
var response = await _productRegistration.PingAsync(_transportClient, pingData, cancellationToken).ConfigureAwait(false);
var response = await _productRegistration.PingAsync(_requestInvoker, pingData, cancellationToken).ConfigureAwait(false);

ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCallDetails);

Expand Down Expand Up @@ -546,7 +546,7 @@ public override void Sniff()

try
{
var (response, nodes) = _productRegistration.Sniff(_transportClient, _nodePool.UsingSsl, requestData);
var (response, nodes) = _productRegistration.Sniff(_requestInvoker, _nodePool.UsingSsl, requestData);

ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCallDetails);

Expand Down Expand Up @@ -591,7 +591,7 @@ public override async Task SniffAsync(CancellationToken cancellationToken)
try
{
var (response, nodes) = await _productRegistration
.SniffAsync(_transportClient, _nodePool.UsingSsl, requestData, cancellationToken)
.SniffAsync(_requestInvoker, _nodePool.UsingSsl, requestData, cancellationToken)
.ConfigureAwait(false);

ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCallDetails);
Expand Down
Loading

0 comments on commit 112c8da

Please sign in to comment.