Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor/request data v3 #130

Merged
merged 11 commits into from
Nov 8, 2024
2 changes: 1 addition & 1 deletion src/Elastic.Transport.VirtualizedCluster/Audit/Auditor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private Auditor(Components.VirtualizedCluster cluster, Components.VirtualizedClu

public IEnumerable<Diagnostics.Auditing.Audit> AsyncAuditTrail { get; set; }
public IEnumerable<Diagnostics.Auditing.Audit> AuditTrail { get; set; }
public Func<Components.VirtualizedCluster> Cluster { get; set; }
public Func<Components.VirtualizedCluster> Cluster { get; }

public TransportResponse Response { get; internal set; }
public TransportResponse ResponseAsync { get; internal set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,16 @@ namespace Elastic.Transport.VirtualizedCluster.Components;
public sealed class ExposingPipelineFactory<TConfiguration> : RequestPipelineFactory
where TConfiguration : class, ITransportConfiguration
{
public ExposingPipelineFactory(TConfiguration configuration, DateTimeProvider dateTimeProvider)
public ExposingPipelineFactory(TConfiguration configuration)
{
DateTimeProvider = dateTimeProvider;
Configuration = configuration;
Pipeline = Create(new RequestData(Configuration, null), DateTimeProvider);
RequestHandler = new DistributedTransport<TConfiguration>(Configuration, this, DateTimeProvider);
Transport = new DistributedTransport<TConfiguration>(Configuration);
}

// ReSharper disable once MemberCanBePrivate.Global
public RequestPipeline Pipeline { get; }
private DateTimeProvider DateTimeProvider { get; }
private TConfiguration Configuration { get; }
public ITransport<TConfiguration> RequestHandler { get; }
public ITransport<TConfiguration> Transport { get; }

public override RequestPipeline Create(RequestData requestData, DateTimeProvider dateTimeProvider) =>
new DefaultRequestPipeline(requestData, DateTimeProvider);
public override RequestPipeline Create(RequestData requestData) =>
new RequestPipeline(requestData);
}
#nullable restore
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,21 @@ internal SealedVirtualCluster(VirtualCluster cluster, NodePool pool, TestableDat
private TransportConfigurationDescriptor CreateSettings() =>
new(_nodePool, _requestInvoker, serializer: null, _productRegistration.ProductRegistration);


/// <summary> Create the cluster using all defaults on <see cref="TransportConfigurationDescriptor"/> </summary>
public VirtualizedCluster AllDefaults() =>
new(_dateTimeProvider, CreateSettings());
new(CreateSettings());

/// <summary> Create the cluster using <paramref name="selector"/> to provide configuration changes </summary>
/// <param name="selector">Provide custom configuration options</param>
public VirtualizedCluster Settings(Func<TransportConfigurationDescriptor, TransportConfigurationDescriptor> selector) =>
new(_dateTimeProvider, selector(CreateSettings()));
new(selector(CreateSettings()));

/// <summary>
/// Allows you to create an instance of `<see cref="VirtualClusterConnection"/> using the DSL provided by <see cref="Virtual"/>
/// </summary>
/// <param name="selector">Provide custom configuration options</param>
public VirtualClusterRequestInvoker VirtualClusterConnection(Func<TransportConfigurationDescriptor, TransportConfigurationDescriptor> selector = null) =>
new VirtualizedCluster(_dateTimeProvider, selector == null ? CreateSettings() : selector(CreateSettings()))
new VirtualizedCluster(selector == null ? CreateSettings() : selector(CreateSettings()))
.Connection;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ protected VirtualCluster(IEnumerable<Node> nodes, MockProductRegistration produc
InternalNodes = nodes.ToList();
}

public List<IClientCallRule> ClientCallRules { get; } = new List<IClientCallRule>();
public TestableDateTimeProvider DateTimeProvider { get; } = new TestableDateTimeProvider();
public List<IClientCallRule> ClientCallRules { get; } = new();
private TestableDateTimeProvider TestDateTimeProvider { get; } = new();

protected List<Node> InternalNodes { get; }
public IReadOnlyList<Node> Nodes => InternalNodes;
public List<IRule> PingingRules { get; } = new List<IRule>();
public List<IRule> PingingRules { get; } = new();

public List<ISniffRule> SniffingRules { get; } = new List<ISniffRule>();
public List<ISniffRule> SniffingRules { get; } = new();
internal string PublishAddressOverride { get; private set; }

internal bool SniffShouldReturnFqnd { get; private set; }
Expand Down Expand Up @@ -73,32 +73,34 @@ public VirtualCluster ClientCalls(Func<ClientCallRule, IClientCallRule> selector
public SealedVirtualCluster SingleNodeConnection(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
{
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
return new SealedVirtualCluster(this, new SingleNodePool(nodes.First().Uri), DateTimeProvider, ProductRegistration);
return new SealedVirtualCluster(this, new SingleNodePool(nodes.First().Uri), TestDateTimeProvider, ProductRegistration);
}

public SealedVirtualCluster StaticNodePool(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
{
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
return new SealedVirtualCluster(this, new StaticNodePool(nodes, false, DateTimeProvider), DateTimeProvider, ProductRegistration);
var dateTimeProvider = TestDateTimeProvider;
var nodePool = new StaticNodePool(nodes, false) { DateTimeProvider = dateTimeProvider };
return new SealedVirtualCluster(this, nodePool , TestDateTimeProvider, ProductRegistration);
}

public SealedVirtualCluster SniffingNodePool(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
{
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
return new SealedVirtualCluster(this, new SniffingNodePool(nodes, false, DateTimeProvider), DateTimeProvider, ProductRegistration);
return new SealedVirtualCluster(this, new SniffingNodePool(nodes, false) { DateTimeProvider = TestDateTimeProvider }, TestDateTimeProvider, ProductRegistration);
}

public SealedVirtualCluster StickyNodePool(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
{
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
return new SealedVirtualCluster(this, new StickyNodePool(nodes, DateTimeProvider), DateTimeProvider, ProductRegistration);
return new SealedVirtualCluster(this, new StickyNodePool(nodes) { DateTimeProvider = TestDateTimeProvider}, TestDateTimeProvider, ProductRegistration);
}

public SealedVirtualCluster StickySniffingNodePool(Func<Node, float> sorter = null,
Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null
)
{
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
return new SealedVirtualCluster(this, new StickySniffingNodePool(nodes, sorter, DateTimeProvider), DateTimeProvider, ProductRegistration);
return new SealedVirtualCluster(this, new StickySniffingNodePool(nodes, sorter) { DateTimeProvider = TestDateTimeProvider }, TestDateTimeProvider, ProductRegistration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ private class VirtualResponse : TransportResponse;

private static readonly EndpointPath RootPath = new(HttpMethod.GET, "/");

internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, TransportConfigurationDescriptor settings)
internal VirtualizedCluster(TransportConfigurationDescriptor settings)
{
_dateTimeProvider = dateTimeProvider;
_settings = settings;
_exposingRequestPipeline = new ExposingPipelineFactory<ITransportConfiguration>(settings, _dateTimeProvider);
_dateTimeProvider = ((ITransportConfiguration)_settings).DateTimeProvider as TestableDateTimeProvider
?? throw new ArgumentException("DateTime provider is not a TestableDateTimeProvider", nameof(_dateTimeProvider));
_exposingRequestPipeline = new ExposingPipelineFactory<ITransportConfiguration>(settings);

_syncCall = (t, r) => t.Request<VirtualResponse>(
path: RootPath,
Expand All @@ -50,7 +51,7 @@ internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, Transport

public VirtualClusterRequestInvoker Connection => RequestHandler.Configuration.RequestInvoker as VirtualClusterRequestInvoker;
public NodePool ConnectionPool => RequestHandler.Configuration.NodePool;
public ITransport<ITransportConfiguration> RequestHandler => _exposingRequestPipeline?.RequestHandler;
public ITransport<ITransportConfiguration> RequestHandler => _exposingRequestPipeline?.Transport;

public VirtualizedCluster TransportProxiesTo(
Func<ITransport<ITransportConfiguration>, Func<RequestConfigurationDescriptor, IRequestConfiguration>, TransportResponse> sync,
Expand Down
8 changes: 2 additions & 6 deletions src/Elastic.Transport/Components/NodePool/CloudNodePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ public sealed class CloudNodePool : SingleNodePool
/// <para> Read more here: https://www.elastic.co/guide/en/cloud/current/ec-cloud-id.html</para>
/// </param>
/// <param name="credentials"></param>
/// <param name="dateTimeProvider">Optionally inject an instance of <see cref="DateTimeProvider"/> used to set <see cref="NodePool.LastUpdate"/></param>
public CloudNodePool(string cloudId, AuthorizationHeader credentials, DateTimeProvider dateTimeProvider = null) : this(ParseCloudId(cloudId), dateTimeProvider) =>
public CloudNodePool(string cloudId, AuthorizationHeader credentials) : this(ParseCloudId(cloudId)) =>
AuthenticationHeader = credentials;

private CloudNodePool(ParsedCloudId parsedCloudId, DateTimeProvider dateTimeProvider = null) : base(parsedCloudId.Uri, dateTimeProvider) =>
private CloudNodePool(ParsedCloudId parsedCloudId) : base(parsedCloudId.Uri) =>
ClusterName = parsedCloudId.Name;

//TODO implement debugger display for NodePool implementations and display it there and its ToString()
Expand Down Expand Up @@ -92,7 +91,4 @@ private static ParsedCloudId ParseCloudId(string cloudId)

return new ParsedCloudId(clusterName, new Uri($"https://{elasticsearchUuid}.{domainName}"));
}

/// <inheritdoc />
protected override void Dispose(bool disposing) => base.Dispose(disposing);
}
14 changes: 6 additions & 8 deletions src/Elastic.Transport/Components/NodePool/NodePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ public abstract class NodePool : IDisposable
{
private bool _disposed;

internal NodePool() { }

/// <summary>
/// The last time that this instance was updated.
/// </summary>
public abstract DateTimeOffset LastUpdate { get; protected set; }
public abstract DateTimeOffset? LastUpdate { get; protected set; }

/// <inheritdoc cref="DateTimeProvider"/>>
public DateTimeProvider DateTimeProvider { get; set; } = DefaultDateTimeProvider.Default;

/// <summary>
/// Returns the default maximum retries for the connection pool implementation.
Expand Down Expand Up @@ -82,18 +83,15 @@ public void Dispose()
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
_disposed = true;
}
if (!_disposed) _disposed = true;
}

/// <summary>
/// Creates a view over the nodes, with changing starting positions, that wraps over on each call
/// e.g Thread A might get 1,2,3,4,5 and thread B will get 2,3,4,5,1.
/// if there are no live nodes yields a different dead node to try once
/// </summary>
public abstract IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null);
public abstract IEnumerable<Node> CreateView(Auditor? auditor = null);

/// <summary>
/// Reseeds the nodes. The implementation is responsible for thread safety.
Expand Down
10 changes: 3 additions & 7 deletions src/Elastic.Transport/Components/NodePool/SingleNodePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ namespace Elastic.Transport;
public class SingleNodePool : NodePool
{
/// <inheritdoc cref="SingleNodePool"/>
public SingleNodePool(Uri uri, DateTimeProvider dateTimeProvider = null)
public SingleNodePool(Uri uri)
{
var node = new Node(uri);
UsingSsl = node.Uri.Scheme == "https";
Nodes = new List<Node> { node };
LastUpdate = (dateTimeProvider ?? DefaultDateTimeProvider.Default).Now();
}

/// <inheritdoc />
public override DateTimeOffset LastUpdate { get; protected set; }
public override DateTimeOffset? LastUpdate { get; protected set; }

/// <inheritdoc />
public override int MaxRetries => 0;
Expand All @@ -39,11 +38,8 @@ public SingleNodePool(Uri uri, DateTimeProvider dateTimeProvider = null)
public override bool UsingSsl { get; protected set; }

/// <inheritdoc />
public override IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null) => Nodes;
public override IEnumerable<Node> CreateView(Auditor? auditor) => Nodes;

/// <inheritdoc />
public override void Reseed(IEnumerable<Node> nodes) { } //ignored

/// <inheritdoc />
protected override void Dispose(bool disposing) => base.Dispose(disposing);
}
13 changes: 5 additions & 8 deletions src/Elastic.Transport/Components/NodePool/SniffingNodePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,13 @@ public class SniffingNodePool : StaticNodePool
private readonly ReaderWriterLockSlim _readerWriter = new();

/// <inheritdoc cref="SniffingNodePool"/>>
public SniffingNodePool(IEnumerable<Uri> uris, bool randomize = true, DateTimeProvider dateTimeProvider = null)
: base(uris, randomize, dateTimeProvider) { }
public SniffingNodePool(IEnumerable<Uri> uris, bool randomize = true) : base(uris, randomize) { }

/// <inheritdoc cref="SniffingNodePool"/>>
public SniffingNodePool(IEnumerable<Node> nodes, bool randomize = true, DateTimeProvider dateTimeProvider = null)
: base(nodes, randomize, dateTimeProvider) { }
public SniffingNodePool(IEnumerable<Node> nodes, bool randomize = true) : base(nodes, randomize) { }

/// <inheritdoc cref="SniffingNodePool"/>>
public SniffingNodePool(IEnumerable<Node> nodes, Func<Node, float> nodeScorer, DateTimeProvider dateTimeProvider = null)
: base(nodes, nodeScorer, dateTimeProvider) { }
public SniffingNodePool(IEnumerable<Node> nodes, Func<Node, float> nodeScorer) : base(nodes, nodeScorer) { }

/// <inheritdoc />
public override IReadOnlyCollection<Node> Nodes
Expand Down Expand Up @@ -81,12 +78,12 @@ public override void Reseed(IEnumerable<Node> nodes)
}

/// <inheritdoc />
public override IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null)
public override IEnumerable<Node> CreateView(Auditor? auditor)
{
_readerWriter.EnterReadLock();
try
{
return base.CreateView(audit);
return base.CreateView(auditor);
}
finally
{
Expand Down
Loading
Loading