Skip to content

Commit

Permalink
refactor(repeater): stop sending ping event
Browse files Browse the repository at this point in the history
Ping historically was there to signal over RMQ that repeater is still connected
Socket.IO implementation has built in mechanism for disconnection detection https://socket.io/docs/v4/engine-io-protocol/#heartbeat
  • Loading branch information
denis-maiorov-brightsec committed Aug 8, 2024
1 parent c6e8897 commit 2b9e7a7
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 100 deletions.
11 changes: 2 additions & 9 deletions src/SecTester.Repeater/Bus/DefaultRepeaterBusFactory.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using SecTester.Core;
using SecTester.Core.Utils;
using SocketIO.Serializer.MessagePack;
using SocketIOClient;
using SocketIOClient.Transport;
Expand All @@ -13,13 +11,11 @@ public class DefaultRepeaterBusFactory : IRepeaterBusFactory
{
private readonly Configuration _config;
private readonly ILoggerFactory _loggerFactory;
private readonly IServiceScopeFactory _scopeFactory;

public DefaultRepeaterBusFactory(Configuration config, ILoggerFactory loggerFactory, IServiceScopeFactory scopeFactory)
public DefaultRepeaterBusFactory(Configuration config, ILoggerFactory loggerFactory)
{
_config = config ?? throw new ArgumentNullException(nameof(config));
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
_scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory));
}

public IRepeaterBus Create(string repeaterId)
Expand All @@ -46,9 +42,6 @@ public IRepeaterBus Create(string repeaterId)
};
var wrapper = new SocketIoConnection(client);

var scope = _scopeFactory.CreateAsyncScope();
var timerProvider = scope.ServiceProvider.GetRequiredService<ITimerProvider>();

return new SocketIoRepeaterBus(options, wrapper, timerProvider, _loggerFactory.CreateLogger<IRepeaterBus>());
return new SocketIoRepeaterBus(options, wrapper, _loggerFactory.CreateLogger<IRepeaterBus>());
}
}
28 changes: 1 addition & 27 deletions src/SecTester.Repeater/Bus/SocketIoRepeaterBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,22 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using Microsoft.Extensions.Logging;
using SecTester.Core.Utils;

namespace SecTester.Repeater.Bus;

internal sealed class SocketIoRepeaterBus : IRepeaterBus
{
private static readonly TimeSpan PingInterval = TimeSpan.FromSeconds(10);

private readonly ITimerProvider _heartbeat;
private readonly ISocketIoConnection _connection;
private readonly ILogger<IRepeaterBus> _logger;
private readonly SocketIoRepeaterBusOptions _options;

internal SocketIoRepeaterBus(SocketIoRepeaterBusOptions options, ISocketIoConnection connection, ITimerProvider heartbeat, ILogger<IRepeaterBus> logger)
internal SocketIoRepeaterBus(SocketIoRepeaterBusOptions options, ISocketIoConnection connection, ILogger<IRepeaterBus> logger)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_connection = connection ?? throw new ArgumentNullException(nameof(connection));
_heartbeat = heartbeat ?? throw new ArgumentNullException(nameof(heartbeat));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

Expand All @@ -37,8 +33,6 @@ public async Task Connect()

await _connection.Connect().ConfigureAwait(false);

await SchedulePing().ConfigureAwait(false);

_logger.LogDebug("Repeater connected to {BaseUrl}", _options.BaseUrl);
}
}
Expand Down Expand Up @@ -75,8 +69,6 @@ public async ValueTask DisposeAsync()
{
if (_connection is { Connected: true })
{
_heartbeat.Elapsed -= Ping;
_heartbeat.Stop();
await _connection.Disconnect().ConfigureAwait(false);
_logger.LogDebug("Repeater disconnected from {BaseUrl}", _options.BaseUrl);
}
Expand Down Expand Up @@ -111,22 +103,4 @@ public async Task Deploy(string repeaterId, CancellationToken? cancellationToken
_connection.Off("deployed");
}
}

private async Task SchedulePing()
{
await Ping().ConfigureAwait(false);
_heartbeat.Interval = PingInterval.TotalMilliseconds;
_heartbeat.Elapsed += Ping;
_heartbeat.Start();
}

private async void Ping(object sender, ElapsedEventArgs args)
{
await Ping().ConfigureAwait(false);
}

private async Task Ping()
{
await _connection.EmitAsync("ping").ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,10 @@ public class DefaultRepeaterBusFactoryTests : IDisposable

private readonly ILoggerFactory _loggerFactory = Substitute.For<ILoggerFactory>();
private readonly ITimerProvider _timerProvider = Substitute.For<ITimerProvider>();
private readonly IServiceScopeFactory _serviceScopeFactory = Substitute.For<IServiceScopeFactory>();

public DefaultRepeaterBusFactoryTests()
{
// ADHOC: since GetRequiredService is part of extension we should explicitly mock an instance method
_serviceScopeFactory.CreateAsyncScope().ServiceProvider.GetService(typeof(ITimerProvider)).Returns(_timerProvider);
}

public void Dispose()
{
_timerProvider.ClearSubstitute();
_serviceScopeFactory.ClearSubstitute();
_loggerFactory.ClearSubstitute();
GC.SuppressFinalize(this);
}
Expand All @@ -29,7 +21,7 @@ public async Task Create_CreatesBus()
{
// arrange
Configuration config = new(Hostname, new Credentials(Token));
DefaultRepeaterBusFactory sut = new(config, _loggerFactory, _serviceScopeFactory);
DefaultRepeaterBusFactory sut = new(config, _loggerFactory);

// act
await using var bus = sut.Create(Id);
Expand All @@ -43,7 +35,7 @@ public async Task Create_CredentialsNotDefined_ThrowsError()
{
// arrange
Configuration config = new(Hostname);
DefaultRepeaterBusFactory sut = new(config, _loggerFactory, _serviceScopeFactory);
DefaultRepeaterBusFactory sut = new(config, _loggerFactory);

// act
var act = async () =>
Expand Down
47 changes: 1 addition & 46 deletions test/SecTester.Repeater.Tests/Bus/SocketIoRepeaterBusTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,19 @@ public sealed class SocketIoRepeaterBusTests : IDisposable
private static readonly SocketIoRepeaterBusOptions Options = new(Url);

private readonly ISocketIoConnection _connection = Substitute.For<ISocketIoConnection>();
private readonly ITimerProvider _heartbeat = Substitute.For<ITimerProvider>();
private readonly ILogger<IRepeaterBus> _logger = Substitute.For<ILogger<IRepeaterBus>>();
private readonly ISocketIoMessage _socketIoMessage = Substitute.For<ISocketIoMessage>();
private readonly SocketIoRepeaterBus _sut;

public SocketIoRepeaterBusTests()
{
_sut = new SocketIoRepeaterBus(Options, _connection, _heartbeat, _logger);
_sut = new SocketIoRepeaterBus(Options, _connection, _logger);
}

public void Dispose()
{
_socketIoMessage.ClearSubstitute();
_connection.ClearSubstitute();
_heartbeat.ClearSubstitute();
_logger.ClearSubstitute();

GC.SuppressFinalize(this);
Expand Down Expand Up @@ -117,36 +115,6 @@ public async Task Connect_AlreadyConnected_DoNothing()
await _connection.DidNotReceive().Connect();
}

[Fact]
public async Task Connect_SchedulesPing()
{
// arrange
_connection.Connect().Returns(Task.CompletedTask);

// act
await _sut.Connect();

// assert
_heartbeat.Received().Elapsed += Arg.Any<ElapsedEventHandler>();
_heartbeat.Received().Start();
}

[Fact]
public async Task Connect_ShouldSendPingMessage()
{
// arrange
var elapsedEventArgs = EventArgs.Empty as ElapsedEventArgs;
_connection.Connect().Returns(Task.CompletedTask);
await _sut.Connect();

// act
_heartbeat.Elapsed += Raise.Event<ElapsedEventHandler>(new object(), elapsedEventArgs);

// assert
_heartbeat.Interval.Should().BeGreaterOrEqualTo(10_000);
await _connection.Received(2).EmitAsync("ping");
}

[Fact]
public async Task Deploy_Success()
{
Expand Down Expand Up @@ -198,17 +166,4 @@ public async Task DisposeAsync_NotConnectedYet_Success()
await _connection.DidNotReceive().Disconnect();
_connection.Received().Dispose();
}

[Fact]
public async Task DisposeAsync_StopsPingMessages()
{
// arrange
_connection.Connected.Returns(true);

// act
await _sut.DisposeAsync();

// assert
_heartbeat.Received().Stop();
}
}
8 changes: 0 additions & 8 deletions test/SecTester.Repeater.Tests/Usings.cs
Original file line number Diff line number Diff line change
@@ -1,26 +1,18 @@
global using System.Net;
global using System.Net.Http.Json;
global using System.Net.Sockets;
global using System.Text;
global using System.Text.Json;
global using System.Text.Json.Serialization;
global using System.Text.RegularExpressions;
global using System.Timers;
global using FluentAssertions;
global using Microsoft.Extensions.DependencyInjection;
global using Microsoft.Extensions.DependencyInjection.Extensions;
global using Microsoft.Extensions.Logging;
global using NSubstitute;
global using NSubstitute.ClearExtensions;
global using NSubstitute.ExceptionExtensions;
global using NSubstitute.ReturnsExtensions;
global using RichardSzalay.MockHttp;
global using SecTester.Core;
global using SecTester.Core.Bus;
global using SecTester.Core.Commands;
global using SecTester.Core.Dispatchers;
global using SecTester.Core.Exceptions;
global using SecTester.Core.RetryStrategies;
global using SecTester.Core.Logger;
global using SecTester.Core.Utils;
global using SecTester.Repeater.Api;
Expand Down

0 comments on commit 2b9e7a7

Please sign in to comment.