Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configuration updates. #128

Merged
merged 7 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,23 @@ namespace Elastic.Transport.VirtualizedCluster.Components;
/// <summary>
/// An implementation that exposes all the components so that <see cref="VirtualCluster"/> can reference them directly.
/// </summary>
public sealed class ExposingPipelineFactory<TConfiguration> : RequestPipelineFactory<TConfiguration> where TConfiguration : class, ITransportConfiguration
public sealed class ExposingPipelineFactory<TConfiguration> : RequestPipelineFactory
where TConfiguration : class, ITransportConfiguration
{
public ExposingPipelineFactory(TConfiguration configuration, DateTimeProvider dateTimeProvider)
{
DateTimeProvider = dateTimeProvider;
MemoryStreamFactory = TransportConfiguration.DefaultMemoryStreamFactory;
Configuration = configuration;
Pipeline = Create(Configuration, DateTimeProvider, MemoryStreamFactory, null);
RequestHandler = new DistributedTransport<TConfiguration>(Configuration, this, DateTimeProvider, MemoryStreamFactory);
Pipeline = Create(new RequestData(Configuration, null, null), DateTimeProvider);
RequestHandler = new DistributedTransport<TConfiguration>(Configuration, this, DateTimeProvider);
}

// ReSharper disable once MemberCanBePrivate.Global
public RequestPipeline Pipeline { get; }
private DateTimeProvider DateTimeProvider { get; }
private MemoryStreamFactory MemoryStreamFactory { get; }
private TConfiguration Configuration { get; }
public ITransport<TConfiguration> RequestHandler { get; }

public override RequestPipeline Create(TConfiguration configurationValues, DateTimeProvider dateTimeProvider,
MemoryStreamFactory memoryStreamFactory, IRequestConfiguration? requestConfiguration) =>
new DefaultRequestPipeline<TConfiguration>(Configuration, DateTimeProvider, MemoryStreamFactory, requestConfiguration);
public override RequestPipeline Create(RequestData requestData, DateTimeProvider dateTimeProvider) =>
new DefaultRequestPipeline(requestData, DateTimeProvider);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Elastic.Transport.VirtualizedCluster.Components;

/// <summary>
/// A continuation of <see cref="VirtualCluster"/>'s builder methods that creates
/// an instance of <see cref="TransportConfiguration"/> for the cluster after which the components such as
/// an instance of <see cref="TransportConfigurationDescriptor"/> for the cluster after which the components such as
/// <see cref="IRequestInvoker"/> and <see cref="NodePool"/> can no longer be updated.
/// </summary>
public sealed class SealedVirtualCluster
Expand All @@ -28,23 +28,23 @@ internal SealedVirtualCluster(VirtualCluster cluster, NodePool pool, TestableDat
_productRegistration = productRegistration;
}

private TransportConfiguration CreateSettings() =>
private TransportConfigurationDescriptor CreateSettings() =>
new(_nodePool, _requestInvoker, serializer: null, _productRegistration.ProductRegistration);

/// <summary> Create the cluster using all defaults on <see cref="TransportConfiguration"/> </summary>
/// <summary> Create the cluster using all defaults on <see cref="TransportConfigurationDescriptor"/> </summary>
public VirtualizedCluster AllDefaults() =>
new(_dateTimeProvider, CreateSettings());

/// <summary> Create the cluster using <paramref name="selector"/> to provide configuration changes </summary>
/// <param name="selector">Provide custom configuration options</param>
public VirtualizedCluster Settings(Func<TransportConfiguration, TransportConfiguration> selector) =>
public VirtualizedCluster Settings(Func<TransportConfigurationDescriptor, TransportConfigurationDescriptor> selector) =>
new(_dateTimeProvider, selector(CreateSettings()));

/// <summary>
/// Allows you to create an instance of `<see cref="VirtualClusterConnection"/> using the DSL provided by <see cref="Virtual"/>
/// </summary>
/// <param name="selector">Provide custom configuration options</param>
public VirtualClusterRequestInvoker VirtualClusterConnection(Func<TransportConfiguration, TransportConfiguration> selector = null) =>
public VirtualClusterRequestInvoker VirtualClusterConnection(Func<TransportConfigurationDescriptor, TransportConfigurationDescriptor> selector = null) =>
new VirtualizedCluster(_dateTimeProvider, selector == null ? CreateSettings() : selector(CreateSettings()))
.Connection;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class VirtualizedCluster
{
private readonly ExposingPipelineFactory<ITransportConfiguration> _exposingRequestPipeline;
private readonly TestableDateTimeProvider _dateTimeProvider;
private readonly TransportConfiguration _settings;
private readonly TransportConfigurationDescriptor _settings;

private Func<ITransport<ITransportConfiguration>, Func<RequestConfigurationDescriptor, IRequestConfiguration>, Task<TransportResponse>> _asyncCall;
private Func<ITransport<ITransportConfiguration>, Func<RequestConfigurationDescriptor, IRequestConfiguration>, TransportResponse> _syncCall;
Expand All @@ -22,7 +22,7 @@ private class VirtualResponse : TransportResponse;

private static readonly EndpointPath RootPath = new(HttpMethod.GET, "/");

internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, TransportConfiguration settings)
internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, TransportConfigurationDescriptor settings)
{
_dateTimeProvider = dateTimeProvider;
_settings = settings;
Expand Down
66 changes: 66 additions & 0 deletions src/Elastic.Transport/Components/Endpoint.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;

namespace Elastic.Transport;

/// <summary>
/// Represents the path of an endpoint in a transport request, including the HTTP method
/// and the path and query information.
/// </summary>
/// <remarks>
/// This struct is used to store information about the HTTP method and the path and query of an endpoint,
/// which are essential components when constructing a request URI.
/// </remarks>
public readonly record struct EndpointPath(HttpMethod Method, string PathAndQuery);

/// <summary>
/// Represents an endpoint in a transport request, encapsulating the HTTP method, path and query,
/// and the node to which the request is being sent.
/// </summary>
/// <remarks>
/// This class is used to construct the URI for the request based on the node's URI and the path and query.
/// An empty endpoint can be created using the <see cref="Empty"/> method as a default or placeholder instance.
/// </remarks>
public record Endpoint(in EndpointPath Path, Node Node)
{
/// <summary> Represents an empty endpoint used as a default or placeholder instance of <see cref="Endpoint"/>. </summary>
public static Endpoint Empty(in EndpointPath path) => new(path, EmptyNode);

private static readonly Node EmptyNode = new(new Uri("http://empty.example"));

/// <summary> Indicates whether the endpoint is an empty placeholder instance. </summary>
public bool IsEmpty => Node == EmptyNode;

/// <summary> The <see cref="Uri" /> for the request. </summary>
public Uri Uri { get; private init; } = new(Node.Uri, Path.PathAndQuery);

/// <summary> The HTTP method used for the request (e.g., GET, POST, PUT, DELETE, HEAD). </summary>
public HttpMethod Method => Path.Method;

/// <summary> Gets the path and query of the endpoint.</summary>
public string PathAndQuery => Path.PathAndQuery;

private readonly Node _node = Node;

/// <summary>
/// Represents a node within the transport layer of the Elastic search client.
/// This object encapsulates the characteristics of a node, allowing for comparisons and operations
/// within the broader search infrastructure.
/// </summary>
public Node Node
{
get => _node;
init
{
_node = value;
Uri = new(Node.Uri, Path.PathAndQuery);
}
}

/// <inheritdoc/>
public override string ToString() => $"{Path.Method.GetStringValue()} {Uri}";

}
74 changes: 29 additions & 45 deletions src/Elastic.Transport/Components/Pipeline/DefaultRequestPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,40 @@
namespace Elastic.Transport;

/// <inheritdoc cref="RequestPipeline" />
public class DefaultRequestPipeline<TConfiguration> : RequestPipeline
where TConfiguration : class, ITransportConfiguration
public class DefaultRequestPipeline : RequestPipeline
{
private readonly IRequestInvoker _requestInvoker;
private readonly NodePool _nodePool;
private readonly RequestData _requestData;
private readonly DateTimeProvider _dateTimeProvider;
private readonly MemoryStreamFactory _memoryStreamFactory;
private readonly Func<Node, bool> _nodePredicate;
private readonly ProductRegistration _productRegistration;
private readonly TConfiguration _settings;
private readonly ResponseBuilder _responseBuilder;

private RequestConfiguration? _pingAndSniffRequestConfiguration;
private List<Audit> _auditTrail = null;
private List<Audit>? _auditTrail;
private readonly ITransportConfiguration _settings;

/// <inheritdoc cref="RequestPipeline" />
internal DefaultRequestPipeline(
TConfiguration configurationValues,
DateTimeProvider dateTimeProvider,
MemoryStreamFactory memoryStreamFactory,
IRequestConfiguration? requestConfiguration
)
internal DefaultRequestPipeline(RequestData requestData, DateTimeProvider dateTimeProvider)
{
_settings = configurationValues;
_nodePool = _settings.NodePool;
_requestInvoker = _settings.Connection;
_requestData = requestData;
_settings = requestData.ConnectionSettings;

_nodePool = requestData.ConnectionSettings.NodePool;
_requestInvoker = requestData.ConnectionSettings.Connection;
_dateTimeProvider = dateTimeProvider;
_memoryStreamFactory = memoryStreamFactory;
_productRegistration = configurationValues.ProductRegistration;
_memoryStreamFactory = requestData.MemoryStreamFactory;
_productRegistration = requestData.ConnectionSettings.ProductRegistration;
_responseBuilder = _productRegistration.ResponseBuilder;
_nodePredicate = _settings.NodePredicate ?? _productRegistration.NodePredicate;
RequestConfig = requestConfiguration;
_nodePredicate = requestData.ConnectionSettings.NodePredicate ?? _productRegistration.NodePredicate;

StartedOn = dateTimeProvider.Now();
}

/// <inheritdoc cref="RequestPipeline.AuditTrail" />
public override IEnumerable<Audit> AuditTrail => _auditTrail ?? (IEnumerable<Audit>)Array.Empty<Audit>();
public override IEnumerable<Audit> AuditTrail => _auditTrail;

private RequestConfiguration PingAndSniffRequestConfiguration
{
Expand All @@ -66,9 +63,9 @@ private RequestConfiguration PingAndSniffRequestConfiguration
{
PingTimeout = PingTimeout,
RequestTimeout = PingTimeout,
Authentication = RequestConfig?.Authentication ?? _settings.Authentication,
EnableHttpPipelining = RequestConfig?.HttpPipeliningEnabled ?? _settings.HttpPipeliningEnabled,
ForceNode = RequestConfig?.ForceNode
Authentication = _requestData.AuthenticationHeader,
EnableHttpPipelining = _requestData.HttpPipeliningEnabled,
ForceNode = _requestData.ForceNode
};

return _pingAndSniffRequestConfiguration;
Expand Down Expand Up @@ -99,10 +96,7 @@ public override bool IsTakingTooLong
}
}

public override int MaxRetries =>
RequestConfig?.ForceNode != null
? 0
: Math.Min(RequestConfig?.MaxRetries ?? _settings.MaxRetries.GetValueOrDefault(int.MaxValue), _nodePool.MaxRetries);
public override int MaxRetries => _requestData.MaxRetries;

public bool Refresh { get; private set; }

Expand Down Expand Up @@ -140,18 +134,13 @@ public override bool StaleClusterState

public override DateTimeOffset StartedOn { get; }

private TimeSpan PingTimeout =>
RequestConfig?.PingTimeout
?? _settings.PingTimeout
?? (_nodePool.UsingSsl ? RequestConfiguration.DefaultPingTimeoutOnSsl : RequestConfiguration.DefaultPingTimeout);
private TimeSpan PingTimeout => _requestData.PingTimeout;

private IRequestConfiguration RequestConfig { get; }
private bool RequestDisabledSniff => _requestData.DisableSniff;

private bool RequestDisabledSniff => RequestConfig != null && (RequestConfig.DisableSniff ?? false);
private TimeSpan RequestTimeout => _requestData.RequestTimeout;

private TimeSpan RequestTimeout => RequestConfig?.RequestTimeout ?? _settings.RequestTimeout ?? RequestConfiguration.DefaultRequestTimeout;

public override void AuditCancellationRequested() => Audit(CancellationRequested).Dispose();
public override void AuditCancellationRequested() => Audit(CancellationRequested)?.Dispose();

public override void BadResponse<TResponse>(ref TResponse response, ApiCallDetails callDetails, Endpoint endpoint, RequestData data, PostData? postData, TransportException exception)
{
Expand Down Expand Up @@ -362,9 +351,9 @@ public override bool TryGetSingleNode(out Node node)

public override IEnumerable<Node> NextNode()
{
if (RequestConfig?.ForceNode != null)
if (_requestData.ForceNode != null)
{
yield return new Node(RequestConfig.ForceNode);
yield return new Node(_requestData.ForceNode);

yield break;
}
Expand Down Expand Up @@ -416,15 +405,12 @@ public async ValueTask PingCoreAsync(bool isAsync, Node node, CancellationToken

TransportResponse response;

//TODO remove
var requestData = new RequestData(_settings, null, null, _memoryStreamFactory);

try
{
if (isAsync)
response = await _productRegistration.PingAsync(_requestInvoker, pingEndpoint, requestData, cancellationToken).ConfigureAwait(false);
response = await _productRegistration.PingAsync(_requestInvoker, pingEndpoint, _requestData, cancellationToken).ConfigureAwait(false);
else
response = _productRegistration.Ping(_requestInvoker, pingEndpoint, requestData);
response = _productRegistration.Ping(_requestInvoker, pingEndpoint, _requestData);

ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCallDetails);

Expand Down Expand Up @@ -462,7 +448,7 @@ public async ValueTask SniffCoreAsync(bool isAsync, CancellationToken cancellati
{
var sniffEndpoint = _productRegistration.CreateSniffEndpoint(node, PingAndSniffRequestConfiguration, _settings);
//TODO remove
var requestData = new RequestData(_settings, null, null, _memoryStreamFactory);
var requestData = new RequestData(_settings, null, null);

using var audit = Audit(SniffSuccess, node);

Expand Down Expand Up @@ -554,9 +540,7 @@ public override void ThrowNoNodesAttempted(Endpoint endpoint, List<PipelineExcep
throw new UnexpectedTransportException(clientException, seenExceptions) { Endpoint = endpoint, AuditTrail = AuditTrail };
}

private bool PingDisabled(Node node) =>
(RequestConfig?.DisablePings).GetValueOrDefault(false)
|| (_settings.DisablePings ?? false) || !_nodePool.SupportsPinging || !node.IsResurrected;
private bool PingDisabled(Node node) => _requestData.DisablePings || !node.IsResurrected;

private Auditable? Audit(AuditEvent type, Node node = null) =>
!_settings.DisableAuditTrail ?? true ? new(type, ref _auditTrail, _dateTimeProvider, node) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ private async ValueTask<TResponse> SetBodyCoreAsync<TResponse>(bool isAsync,
return response;
}

if (!requestData.ValidateResponseContentType(mimeType))
if (!ValidateResponseContentType(requestData.Accept, mimeType))
{
ConditionalDisposal(responseStream, ownsStream, response);
return default;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace Elastic.Transport;

/// <summary>
/// A pipeline exception is throw when ever a known failing exit point is reached in <see cref="DefaultRequestPipeline{TConfiguration}"/>
/// A pipeline exception is throw when ever a known failing exit point is reached in <see cref="DefaultRequestPipeline"/>
/// <para>See <see cref="PipelineFailure"/> for known exits points</para>
/// </summary>
public class PipelineException : Exception
Expand Down
4 changes: 2 additions & 2 deletions src/Elastic.Transport/Components/Pipeline/PipelineFailure.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace Elastic.Transport;

/// <summary>
/// A failure in <see cref="DefaultRequestPipeline{TConfiguration}"/>'s workflow that caused it to end prematurely.
/// A failure in <see cref="DefaultRequestPipeline"/>'s workflow that caused it to end prematurely.
/// </summary>
public enum PipelineFailure
{
Expand Down Expand Up @@ -43,7 +43,7 @@ public enum PipelineFailure
MaxRetriesReached,

/// <summary>
/// An exception occurred during <see cref="DefaultRequestPipeline{TConfiguration}"/> that could not be handled
/// An exception occurred during <see cref="DefaultRequestPipeline"/> that could not be handled
/// </summary>
Unexpected,

Expand Down
Loading
Loading