Skip to content

Commit

Permalink
Refactor pipeline and node pool components
Browse files Browse the repository at this point in the history
Refactor to decouple date provider and introduce Auditor usage.

Renamed and altered functions within pipeline components to utilize the `Auditor` class, improving flexibility and modularity. Removed the embedded `DateTimeProvider` instance from several classes and ensured that such dependencies are injected or fetched through associated components like the node pools. This change enhances monitoring and logging capabilities during request processing.
  • Loading branch information
Mpdreamz committed Nov 1, 2024
1 parent 84f4b38 commit 6aee59f
Show file tree
Hide file tree
Showing 20 changed files with 237 additions and 219 deletions.
2 changes: 1 addition & 1 deletion src/Elastic.Transport.VirtualizedCluster/Audit/Auditor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private Auditor(Components.VirtualizedCluster cluster, Components.VirtualizedClu

public IEnumerable<Diagnostics.Auditing.Audit> AsyncAuditTrail { get; set; }
public IEnumerable<Diagnostics.Auditing.Audit> AuditTrail { get; set; }
public Func<Components.VirtualizedCluster> Cluster { get; set; }
public Func<Components.VirtualizedCluster> Cluster { get; }

public TransportResponse Response { get; internal set; }
public TransportResponse ResponseAsync { get; internal set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ internal SealedVirtualCluster(VirtualCluster cluster, NodePool pool, TestableDat
}

private TransportConfigurationDescriptor CreateSettings() =>
new TransportConfigurationDescriptor(_nodePool, _requestInvoker, serializer: null, _productRegistration.ProductRegistration)
.DateTimeProvider(_dateTimeProvider);
new(_nodePool, _requestInvoker, serializer: null, _productRegistration.ProductRegistration);


/// <summary> Create the cluster using all defaults on <see cref="TransportConfigurationDescriptor"/> </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ protected VirtualCluster(IEnumerable<Node> nodes, MockProductRegistration produc
InternalNodes = nodes.ToList();
}

public List<IClientCallRule> ClientCallRules { get; } = new List<IClientCallRule>();
public TestableDateTimeProvider DateTimeProvider { get; } = new TestableDateTimeProvider();
public List<IClientCallRule> ClientCallRules { get; } = new();
private TestableDateTimeProvider TestDateTimeProvider { get; } = new();

protected List<Node> InternalNodes { get; }
public IReadOnlyList<Node> Nodes => InternalNodes;
public List<IRule> PingingRules { get; } = new List<IRule>();
public List<IRule> PingingRules { get; } = new();

public List<ISniffRule> SniffingRules { get; } = new List<ISniffRule>();
public List<ISniffRule> SniffingRules { get; } = new();
internal string PublishAddressOverride { get; private set; }

internal bool SniffShouldReturnFqnd { get; private set; }
Expand Down Expand Up @@ -73,32 +73,34 @@ public VirtualCluster ClientCalls(Func<ClientCallRule, IClientCallRule> selector
public SealedVirtualCluster SingleNodeConnection(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
{
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
return new SealedVirtualCluster(this, new SingleNodePool(nodes.First().Uri), DateTimeProvider, ProductRegistration);
return new SealedVirtualCluster(this, new SingleNodePool(nodes.First().Uri), TestDateTimeProvider, ProductRegistration);
}

public SealedVirtualCluster StaticNodePool(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
{
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
return new SealedVirtualCluster(this, new StaticNodePool(nodes, false, DateTimeProvider), DateTimeProvider, ProductRegistration);
var dateTimeProvider = TestDateTimeProvider;
var nodePool = new StaticNodePool(nodes, false) { DateTimeProvider = dateTimeProvider };
return new SealedVirtualCluster(this, nodePool , TestDateTimeProvider, ProductRegistration);
}

public SealedVirtualCluster SniffingNodePool(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
{
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
return new SealedVirtualCluster(this, new SniffingNodePool(nodes, false, DateTimeProvider), DateTimeProvider, ProductRegistration);
return new SealedVirtualCluster(this, new SniffingNodePool(nodes, false) { DateTimeProvider = TestDateTimeProvider }, TestDateTimeProvider, ProductRegistration);
}

public SealedVirtualCluster StickyNodePool(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
{
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
return new SealedVirtualCluster(this, new StickyNodePool(nodes, DateTimeProvider), DateTimeProvider, ProductRegistration);
return new SealedVirtualCluster(this, new StickyNodePool(nodes) { DateTimeProvider = TestDateTimeProvider}, TestDateTimeProvider, ProductRegistration);
}

public SealedVirtualCluster StickySniffingNodePool(Func<Node, float> sorter = null,
Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null
)
{
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
return new SealedVirtualCluster(this, new StickySniffingNodePool(nodes, sorter, DateTimeProvider), DateTimeProvider, ProductRegistration);
return new SealedVirtualCluster(this, new StickySniffingNodePool(nodes, sorter) { DateTimeProvider = TestDateTimeProvider }, TestDateTimeProvider, ProductRegistration);
}
}
8 changes: 2 additions & 6 deletions src/Elastic.Transport/Components/NodePool/CloudNodePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ public sealed class CloudNodePool : SingleNodePool
/// <para> Read more here: https://www.elastic.co/guide/en/cloud/current/ec-cloud-id.html</para>
/// </param>
/// <param name="credentials"></param>
/// <param name="dateTimeProvider">Optionally inject an instance of <see cref="DateTimeProvider"/> used to set <see cref="NodePool.LastUpdate"/></param>
public CloudNodePool(string cloudId, AuthorizationHeader credentials, DateTimeProvider dateTimeProvider = null) : this(ParseCloudId(cloudId), dateTimeProvider) =>
public CloudNodePool(string cloudId, AuthorizationHeader credentials) : this(ParseCloudId(cloudId)) =>
AuthenticationHeader = credentials;

private CloudNodePool(ParsedCloudId parsedCloudId, DateTimeProvider dateTimeProvider = null) : base(parsedCloudId.Uri, dateTimeProvider) =>
private CloudNodePool(ParsedCloudId parsedCloudId) : base(parsedCloudId.Uri) =>
ClusterName = parsedCloudId.Name;

//TODO implement debugger display for NodePool implementations and display it there and its ToString()
Expand Down Expand Up @@ -92,7 +91,4 @@ private static ParsedCloudId ParseCloudId(string cloudId)

return new ParsedCloudId(clusterName, new Uri($"https://{elasticsearchUuid}.{domainName}"));
}

/// <inheritdoc />
protected override void Dispose(bool disposing) => base.Dispose(disposing);
}
14 changes: 6 additions & 8 deletions src/Elastic.Transport/Components/NodePool/NodePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ public abstract class NodePool : IDisposable
{
private bool _disposed;

internal NodePool() { }

/// <summary>
/// The last time that this instance was updated.
/// </summary>
public abstract DateTimeOffset LastUpdate { get; protected set; }
public abstract DateTimeOffset? LastUpdate { get; protected set; }

/// <inheritdoc cref="DateTimeProvider"/>>
public DateTimeProvider DateTimeProvider { get; set; } = DefaultDateTimeProvider.Default;

/// <summary>
/// Returns the default maximum retries for the connection pool implementation.
Expand Down Expand Up @@ -82,18 +83,15 @@ public void Dispose()
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
_disposed = true;
}
if (!_disposed) _disposed = true;
}

/// <summary>
/// Creates a view over the nodes, with changing starting positions, that wraps over on each call
/// e.g Thread A might get 1,2,3,4,5 and thread B will get 2,3,4,5,1.
/// if there are no live nodes yields a different dead node to try once
/// </summary>
public abstract IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null);
public abstract IEnumerable<Node> CreateView(Auditor? auditor = null);

/// <summary>
/// Reseeds the nodes. The implementation is responsible for thread safety.
Expand Down
10 changes: 3 additions & 7 deletions src/Elastic.Transport/Components/NodePool/SingleNodePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ namespace Elastic.Transport;
public class SingleNodePool : NodePool
{
/// <inheritdoc cref="SingleNodePool"/>
public SingleNodePool(Uri uri, DateTimeProvider dateTimeProvider = null)
public SingleNodePool(Uri uri)
{
var node = new Node(uri);
UsingSsl = node.Uri.Scheme == "https";
Nodes = new List<Node> { node };
LastUpdate = (dateTimeProvider ?? DefaultDateTimeProvider.Default).Now();
}

/// <inheritdoc />
public override DateTimeOffset LastUpdate { get; protected set; }
public override DateTimeOffset? LastUpdate { get; protected set; }

/// <inheritdoc />
public override int MaxRetries => 0;
Expand All @@ -39,11 +38,8 @@ public SingleNodePool(Uri uri, DateTimeProvider dateTimeProvider = null)
public override bool UsingSsl { get; protected set; }

/// <inheritdoc />
public override IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null) => Nodes;
public override IEnumerable<Node> CreateView(Auditor? auditor) => Nodes;

/// <inheritdoc />
public override void Reseed(IEnumerable<Node> nodes) { } //ignored

/// <inheritdoc />
protected override void Dispose(bool disposing) => base.Dispose(disposing);
}
13 changes: 5 additions & 8 deletions src/Elastic.Transport/Components/NodePool/SniffingNodePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,13 @@ public class SniffingNodePool : StaticNodePool
private readonly ReaderWriterLockSlim _readerWriter = new();

/// <inheritdoc cref="SniffingNodePool"/>>
public SniffingNodePool(IEnumerable<Uri> uris, bool randomize = true, DateTimeProvider dateTimeProvider = null)
: base(uris, randomize, dateTimeProvider) { }
public SniffingNodePool(IEnumerable<Uri> uris, bool randomize = true) : base(uris, randomize) { }

/// <inheritdoc cref="SniffingNodePool"/>>
public SniffingNodePool(IEnumerable<Node> nodes, bool randomize = true, DateTimeProvider dateTimeProvider = null)
: base(nodes, randomize, dateTimeProvider) { }
public SniffingNodePool(IEnumerable<Node> nodes, bool randomize = true) : base(nodes, randomize) { }

/// <inheritdoc cref="SniffingNodePool"/>>
public SniffingNodePool(IEnumerable<Node> nodes, Func<Node, float> nodeScorer, DateTimeProvider dateTimeProvider = null)
: base(nodes, nodeScorer, dateTimeProvider) { }
public SniffingNodePool(IEnumerable<Node> nodes, Func<Node, float> nodeScorer) : base(nodes, nodeScorer) { }

/// <inheritdoc />
public override IReadOnlyCollection<Node> Nodes
Expand Down Expand Up @@ -81,12 +78,12 @@ public override void Reseed(IEnumerable<Node> nodes)
}

/// <inheritdoc />
public override IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null)
public override IEnumerable<Node> CreateView(Auditor? auditor)
{
_readerWriter.EnterReadLock();
try
{
return base.CreateView(audit);
return base.CreateView(auditor);
}
finally
{
Expand Down
48 changes: 20 additions & 28 deletions src/Elastic.Transport/Components/NodePool/StaticNodePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,36 @@ public class StaticNodePool : NodePool
private readonly Func<Node, float> _nodeScorer;

/// <inheritdoc cref="StaticNodePool"/>
public StaticNodePool(IEnumerable<Uri> uris, bool randomize = true, DateTimeProvider dateTimeProvider = null)
: this(uris.Select(uri => new Node(uri)), randomize, null, dateTimeProvider) { }
public StaticNodePool(IEnumerable<Uri> uris, bool randomize = true)
: this(uris.Select(uri => new Node(uri)), randomize, null) { }

/// <inheritdoc cref="StaticNodePool"/>
public StaticNodePool(IEnumerable<Node> nodes, bool randomize = true, DateTimeProvider dateTimeProvider = null)
: this(nodes, randomize, null, dateTimeProvider) { }
public StaticNodePool(IEnumerable<Node> nodes, bool randomize = true)
: this(nodes, randomize, null) { }

/// <inheritdoc cref="StaticNodePool"/>
protected StaticNodePool(IEnumerable<Node> nodes, bool randomize, int? randomizeSeed = null, DateTimeProvider dateTimeProvider = null)
protected StaticNodePool(IEnumerable<Node> nodes, bool randomize, int? randomizeSeed = null)
{
Randomize = randomize;
Random = !randomize || !randomizeSeed.HasValue
? new Random()
: new Random(randomizeSeed.Value);

Initialize(nodes, dateTimeProvider);
Initialize(nodes);
}

//this constructor is protected because nodeScorer only makes sense on subclasses that support reseeding otherwise just manually sort `nodes` before instantiating.
/// <inheritdoc cref="StaticNodePool"/>
protected StaticNodePool(IEnumerable<Node> nodes, Func<Node, float> nodeScorer = null, DateTimeProvider dateTimeProvider = null)
protected StaticNodePool(IEnumerable<Node> nodes, Func<Node, float> nodeScorer = null)
{
_nodeScorer = nodeScorer;
Initialize(nodes, dateTimeProvider);
Initialize(nodes);
}

private void Initialize(IEnumerable<Node> nodes, DateTimeProvider dateTimeProvider)
private void Initialize(IEnumerable<Node> nodes)
{
var nodesProvided = nodes?.ToList() ?? throw new ArgumentNullException(nameof(nodes));
nodesProvided.ThrowIfEmpty(nameof(nodes));
DateTimeProvider = dateTimeProvider ?? Elastic.Transport.DefaultDateTimeProvider.Default;

string scheme = null;
foreach (var node in nodesProvided)
Expand All @@ -76,11 +75,10 @@ private void Initialize(IEnumerable<Node> nodes, DateTimeProvider dateTimeProvid
InternalNodes = SortNodes(nodesProvided)
.DistinctByCustom(n => n.Uri)
.ToList();
LastUpdate = DateTimeProvider.Now();
}

/// <inheritdoc />
public override DateTimeOffset LastUpdate { get; protected set; }
public override DateTimeOffset? LastUpdate { get; protected set; }

/// <inheritdoc />
public override int MaxRetries => InternalNodes.Count - 1;
Expand Down Expand Up @@ -112,9 +110,6 @@ protected IReadOnlyList<Node> AliveNodes
}
}

/// <inheritdoc cref="DateTimeProvider"/>>
protected DateTimeProvider DateTimeProvider { get; private set; }

/// <summary>
/// The list of nodes we are operating over. This is protected so that subclasses that DO implement <see cref="SupportsReseeding"/>
/// can update this list. Its up to subclasses to make this thread safe.
Expand All @@ -137,7 +132,7 @@ protected IReadOnlyList<Node> AliveNodes
/// e.g Thread A might get 1,2,3,4,5 and thread B will get 2,3,4,5,1.
/// if there are no live nodes yields a different dead node to try once
/// </summary>
public override IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null)
public override IEnumerable<Node> CreateView(Auditor? auditor)
{
var nodes = AliveNodes;

Expand All @@ -146,13 +141,13 @@ public override IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = nu
if (nodes.Count == 0)
{
//could not find a suitable node retrying on first node off globalCursor
yield return RetryInternalNodes(globalCursor, audit);
yield return RetryInternalNodes(globalCursor, auditor);

yield break;
}

var localCursor = globalCursor % nodes.Count;
foreach (var aliveNode in SelectAliveNodes(localCursor, nodes, audit)) yield return aliveNode;
foreach (var aliveNode in SelectAliveNodes(localCursor, nodes, auditor)) yield return aliveNode;
}

/// <inheritdoc />
Expand All @@ -164,14 +159,13 @@ public override void Reseed(IEnumerable<Node> nodes) { } //ignored
/// <paramref name="globalCursor"/>
/// </summary>
/// <param name="globalCursor"></param>
/// <param name="audit">Trace action to document the fact all nodes were dead and were resurrecting one at random</param>
protected Node RetryInternalNodes(int globalCursor, Action<AuditEvent, Node> audit = null)
/// <param name="auditor">Trace action to document the fact all nodes were dead and were resurrecting one at random</param>
protected Node RetryInternalNodes(int globalCursor, Auditor? auditor = null)
{
audit?.Invoke(AuditEvent.AllNodesDead, null);
auditor?.Emit(AuditEvent.AllNodesDead);
var node = InternalNodes[globalCursor % InternalNodes.Count];
node.IsResurrected = true;
audit?.Invoke(AuditEvent.Resurrection, node);

auditor?.Emit(AuditEvent.Resurrection, node);
return node;
}

Expand All @@ -181,8 +175,8 @@ protected Node RetryInternalNodes(int globalCursor, Action<AuditEvent, Node> aud
/// </summary>
/// <param name="cursor">The starting point into <paramref name="aliveNodes"/> from wich to start.</param>
/// <param name="aliveNodes"></param>
/// <param name="audit">Trace action to notify if a resurrection occured</param>
protected static IEnumerable<Node> SelectAliveNodes(int cursor, IReadOnlyList<Node> aliveNodes, Action<AuditEvent, Node> audit = null)
/// <param name="auditor">Trace action to notify if a resurrection occured</param>
protected static IEnumerable<Node> SelectAliveNodes(int cursor, IReadOnlyList<Node> aliveNodes, Auditor? auditor = null)
{
// ReSharper disable once ForCanBeConvertedToForeach
for (var attempts = 0; attempts < aliveNodes.Count; attempts++)
Expand All @@ -192,7 +186,7 @@ protected static IEnumerable<Node> SelectAliveNodes(int cursor, IReadOnlyList<No
//if this node is not alive or no longer dead mark it as resurrected
if (!node.IsAlive)
{
audit?.Invoke(AuditEvent.Resurrection, node);
auditor?.Emit(AuditEvent.Resurrection, node);
node.IsResurrected = true;
}

Expand All @@ -209,6 +203,4 @@ protected IOrderedEnumerable<Node> SortNodes(IEnumerable<Node> nodes) =>
? nodes.OrderByDescending(_nodeScorer)
: nodes.OrderBy(n => Randomize ? Random.Next() : 1);

/// <inheritdoc />
protected override void Dispose(bool disposing) => base.Dispose(disposing);
}
12 changes: 5 additions & 7 deletions src/Elastic.Transport/Components/NodePool/StickyNodePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@ namespace Elastic.Transport;
public sealed class StickyNodePool : StaticNodePool
{
/// <inheritdoc cref="StickyNodePool"/>
public StickyNodePool(IEnumerable<Uri> uris, DateTimeProvider dateTimeProvider = null)
: base(uris, false, dateTimeProvider) { }
public StickyNodePool(IEnumerable<Uri> uris) : base(uris, false) { }

/// <inheritdoc cref="StickyNodePool"/>
public StickyNodePool(IEnumerable<Node> nodes, DateTimeProvider dateTimeProvider = null)
: base(nodes, false, dateTimeProvider) { }
public StickyNodePool(IEnumerable<Node> nodes) : base(nodes, false) { }

/// <inheritdoc cref="StaticNodePool.CreateView"/>
public override IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null)
public override IEnumerable<Node> CreateView(Auditor? auditor)
{
var nodes = AliveNodes;

Expand All @@ -33,7 +31,7 @@ public override IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = nu
var globalCursor = Interlocked.Increment(ref GlobalCursor);

//could not find a suitable node retrying on first node off globalCursor
yield return RetryInternalNodes(globalCursor, audit);
yield return RetryInternalNodes(globalCursor, auditor);

yield break;
}
Expand All @@ -44,7 +42,7 @@ public override IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = nu
Interlocked.Exchange(ref GlobalCursor, -1);

var localCursor = 0;
foreach (var aliveNode in SelectAliveNodes(localCursor, nodes, audit))
foreach (var aliveNode in SelectAliveNodes(localCursor, nodes, auditor))
yield return aliveNode;
}

Expand Down
Loading

0 comments on commit 6aee59f

Please sign in to comment.