Skip to content

Commit

Permalink
Add cancellation to initial socket connection
Browse files Browse the repository at this point in the history
Part of the fix to #1420

Use a linked token source to combine external cancellation and timeout cancelltion for initial RMQ connection

Fix API since we have to use the same for all target frameworks.

Fix the order of operations when establishing a connection.

Add missing ConfigureAwait

Add some more missing ConfigureAwait(false) statements.

A timeout of 1 second can result in OperationCanceledException

Validate hostname in SocketFrameHandler to restore expectation that an exception will be thrown. Ugh.

Increase connection timeout for CI

Add helpful message to connection failure.

Fix tests in a very ugly way. Much TODO

Make endpoint resolver SelectOne async.

Pass on CancellationToken

Pass cancellation token into more connection establishment methods.

Fix expected exception for net472

Use Async method otherwise GetAwaiter/GetResult locks up

Change expected exception type

Increase connection timeout for CI due to slow runners

Try out idea to fix odd CI error in GHA

Bump version

Upgrade dependencies.

Modify Windows GHA setup to explicitly create firewall rules for Erlang programs.

Tweak setting up firewalls on Windows

Use Get-Random to help ensure unique fw rule names
  • Loading branch information
lukebakken committed Dec 11, 2023
1 parent 52f494c commit 2aed524
Show file tree
Hide file tree
Showing 39 changed files with 691 additions and 409 deletions.
83 changes: 67 additions & 16 deletions .ci/windows/gha-setup.ps1
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
$ProgressPreference = 'Continue'
$VerbosePreference = 'Continue'
$ErrorActionPreference = 'Stop'
Set-StrictMode -Version 2.0

Expand Down Expand Up @@ -38,7 +39,7 @@ $erlang_install_dir = Join-Path -Path $HOME -ChildPath 'erlang'

Write-Host '[INFO] Downloading Erlang...'

if (-Not (Test-Path $erlang_installer_path))
if (-Not (Test-Path -LiteralPath $erlang_installer_path))
{
Invoke-WebRequest -UseBasicParsing -Uri $erlang_download_url -OutFile $erlang_installer_path
}
Expand All @@ -54,26 +55,76 @@ $rabbitmq_installer_download_url = "https://github.com/rabbitmq/rabbitmq-server/
$rabbitmq_installer_path = Join-Path -Path $base_installers_dir -ChildPath "rabbitmq-server-$rabbitmq_ver.exe"
Write-Host "[INFO] rabbitmq installer path $rabbitmq_installer_path"

$erlang_reg_path = 'HKLM:\SOFTWARE\Ericsson\Erlang'
if (Test-Path 'HKLM:\SOFTWARE\WOW6432Node\')
if (Test-Path -LiteralPath 'HKLM:\SOFTWARE\WOW6432Node\')
{
$erlang_reg_path = 'HKLM:\SOFTWARE\WOW6432Node\Ericsson\Erlang'
New-Variable -Name erlangRegKeyPath -Option Constant `
-Value 'HKLM:\SOFTWARE\WOW6432Node\Ericsson\Erlang'
}
else
{
New-Variable -Name erlangRegKeyPath -Option Constant `
-Value 'HKLM:\SOFTWARE\Ericsson\Erlang'
}

New-Variable -Name erlangRegKey -Option Constant `
-Value (Get-ChildItem $erlangRegKeyPath)

if ($erlangRegKey -eq $null) {
Write-Error "Could not find Erlang installation registry key at $erlangRegKeyPath"
}

New-Variable -Name erlangErtsVersion -Option Constant `
-Value (Select-Object -InputObject $erlangRegKey -Last 1).PSChildName
Write-Verbose "erlangErtsVersion: $erlangErtsVersion"

New-Variable -Name erlangErtsRegKeyPath -Option Constant `
-Value "HKLM:\SOFTWARE\WOW6432Node\Ericsson\Erlang\$erlangErtsVersion"

New-Variable -Name erlangErtsRegKey -Option Constant `
-Value (Get-ItemProperty -LiteralPath HKLM:\SOFTWARE\WOW6432Node\Ericsson\Erlang\$erlangErtsVersion)

if ($erlangErtsRegKey -eq $null) {
Write-Error "Could not find Erlang erts registry key at $erlangErtsRegKeyPath"
}

New-Variable -Name erlangProgramFilesPath -Option Constant `
-Value ($erlangErtsRegKey.'(default)')

if (Test-Path -LiteralPath $erlangProgramFilesPath) {
Write-Verbose "Erlang installation directory: '$erlangProgramFilesPath'"
}
else {
Write-Error 'Could not find Erlang installation directory!'
}

New-Variable -Name allowedExes -Option Constant -Value @('erl.exe', 'epmd.exe', 'werl.exe')

New-Variable -Name exes -Option Constant -Value `
$(Get-ChildItem -Filter '*.exe' -Recurse -LiteralPath $erlangProgramFilesPath | Where-Object { $_.Name -in $allowedExes })

foreach ($exe in $exes) {
$fwRuleName = "rabbitmq-allow-$($exe.Name)-$(Get-Random)"
Write-Verbose "Updating or creating firewall rule for '$exe' - fwRuleName: $fwRuleName"
if (!(Get-NetFirewallRule -ErrorAction 'SilentlyContinue' -Name $fwRuleName)) {
New-NetFirewallRule -Enabled True -Name $fwRuleName -DisplayName $fwRuleName -Direction In -Program $exe -Profile Any -Action Allow
}
else {
Set-NetFirewallRule -Enabled True -Name $fwRuleName -DisplayName $fwRuleName -Direction In -Program $exe -Profile Any -Action Allow
}
}
$erlang_erts_version = Get-ChildItem -Path $erlang_reg_path -Name
$erlang_home = (Get-ItemProperty -LiteralPath $erlang_reg_path\$erlang_erts_version).'(default)'

Write-Host "[INFO] Setting ERLANG_HOME to '$erlang_home'..."
$env:ERLANG_HOME = $erlang_home
[Environment]::SetEnvironmentVariable('ERLANG_HOME', $erlang_home, 'Machine')
Add-Content -Verbose -LiteralPath $env:GITHUB_ENV -Value "ERLANG_HOME=$erlang_home"
Write-Host "[INFO] Setting ERLANG_HOME to '$erlangProgramFilesPath'..."
$env:ERLANG_HOME = $erlangProgramFilesPath
[Environment]::SetEnvironmentVariable('ERLANG_HOME', $erlangProgramFilesPath, 'Machine')
Add-Content -Verbose -LiteralPath $env:GITHUB_ENV -Value "ERLANG_HOME=$erlangProgramFilesPath"

Write-Host "[INFO] Setting RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS..."
$env:RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS = '-rabbitmq_stream advertised_host localhost'
[Environment]::SetEnvironmentVariable('RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS', '-rabbitmq_stream advertised_host localhost', 'Machine')

Write-Host '[INFO] Downloading RabbitMQ...'

if (-Not (Test-Path $rabbitmq_installer_path))
if (-Not (Test-Path -LiteralPath $rabbitmq_installer_path))
{
Invoke-WebRequest -UseBasicParsing -Uri $rabbitmq_installer_download_url -OutFile $rabbitmq_installer_path
}
Expand All @@ -83,15 +134,15 @@ else
}

Write-Host "[INFO] Installer dir '$base_installers_dir' contents:"
Get-ChildItem -Verbose -Path $base_installers_dir
Get-ChildItem -Verbose -LiteralPath $base_installers_dir

$rabbitmq_conf_in_file = Join-Path -Path $ci_windows_dir -ChildPath 'rabbitmq.conf.in'
$rabbitmq_appdata_dir = Join-Path -Path $env:AppData -ChildPath 'RabbitMQ'
New-Item -Path $rabbitmq_appdata_dir -ItemType Directory
$rabbitmq_conf_file = Join-Path -Path $rabbitmq_appdata_dir -ChildPath 'rabbitmq.conf'

Write-Host "[INFO] Creating RabbitMQ configuration file in '$rabbitmq_appdata_dir'"
Get-Content $rabbitmq_conf_in_file | %{ $_ -replace '@@CERTS_DIR@@', $certs_dir } | %{ $_ -replace '\\', '/' } | Set-Content -Path $rabbitmq_conf_file
Get-Content $rabbitmq_conf_in_file | %{ $_ -replace '@@CERTS_DIR@@', $certs_dir } | %{ $_ -replace '\\', '/' } | Set-Content -LiteralPath $rabbitmq_conf_file
Get-Content $rabbitmq_conf_file

Write-Host '[INFO] Creating Erlang cookie files...'
Expand All @@ -114,9 +165,9 @@ Write-Host '[INFO] Installing and starting RabbitMQ...'
& $rabbitmq_installer_path '/S' | Out-Null
(Get-Service -Name RabbitMQ).Status

$rabbitmq_base_path = (Get-ItemProperty -Name Install_Dir -Path 'HKLM:\SOFTWARE\WOW6432Node\VMware, Inc.\RabbitMQ Server').Install_Dir
$rabbitmq_base_path = (Get-ItemProperty -Name Install_Dir -LiteralPath 'HKLM:\SOFTWARE\WOW6432Node\VMware, Inc.\RabbitMQ Server').Install_Dir
$regPath = 'HKLM:\SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall\RabbitMQ'
if (Test-Path 'HKLM:\SOFTWARE\WOW6432Node\')
if (Test-Path -LiteralPath 'HKLM:\SOFTWARE\WOW6432Node\')
{
$regPath = 'HKLM:\SOFTWARE\WOW6432Node\Microsoft\Windows\CurrentVersion\Uninstall\RabbitMQ'
}
Expand All @@ -138,7 +189,7 @@ $env:RABBITMQ_RABBITMQCTL_PATH = $rabbitmqctl_path
$epmd_running = $false
[int]$count = 1

$epmd_exe = Join-Path -Path $erlang_home -ChildPath "erts-$erlang_erts_version" | Join-Path -ChildPath 'bin' | Join-Path -ChildPath 'epmd.exe'
$epmd_exe = Join-Path -Path $erlangProgramFilesPath -ChildPath "erts-$erlangErtsVersion" | Join-Path -ChildPath 'bin' | Join-Path -ChildPath 'epmd.exe'

Write-Host "[INFO] Waiting for epmd ($epmd_exe) to report that RabbitMQ has started..."

Expand Down
2 changes: 1 addition & 1 deletion .ci/windows/versions.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"erlang": "26.1.2",
"rabbitmq": "3.12.6"
"rabbitmq": "3.12.10"
}
2 changes: 1 addition & 1 deletion projects/Benchmarks/Benchmarks.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.13.9" />
<PackageReference Include="BenchmarkDotNet" Version="0.13.11" />
<PackageReference Include="Ductus.FluentDocker" Version="2.10.59" />
</ItemGroup>

Expand Down
8 changes: 4 additions & 4 deletions projects/RabbitMQ.Client.OAuth2/RabbitMQ.Client.OAuth2.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
</PropertyGroup>

<ItemGroup Condition="'$(Configuration)' == 'Release' and '$(SourceRoot)' == ''">
<SourceRoot Include="$(MSBuildThisFileDirectory)/"/>
<SourceRoot Include="$(MSBuildThisFileDirectory)/" />
</ItemGroup>

<ItemGroup>
Expand All @@ -54,10 +54,10 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" PrivateAssets="all" />
<PackageReference Include="MinVer" Version="4.3.0" PrivateAssets="all" />
<PackageReference Include="System.Net.Http.Json" Version="7.0.1" />
<PackageReference Include="System.Text.Json" Version="7.0.3" />
<PackageReference Include="System.Net.Http.Json" Version="8.0.0" />
<PackageReference Include="System.Text.Json" Version="8.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/RabbitMQ.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.3" PrivateAssets="all" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" PrivateAssets="all" />
<PackageReference Include="MinVer" Version="4.3.0" PrivateAssets="all" />
<PackageReference Include="System.Memory" Version="4.5.5" />
<PackageReference Include="System.Threading.Channels" Version="8.0.0" />
Expand Down
17 changes: 17 additions & 0 deletions projects/RabbitMQ.Client/client/TaskExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,23 @@ internal static class TaskExtensions
#if !NET6_0_OR_GREATER
private static readonly TaskContinuationOptions s_tco = TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously;
private static void IgnoreTaskContinuation(Task t, object s) => t.Exception.Handle(e => true);

public static async Task WithCancellation(this Task task, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<bool>();

// https://devblogs.microsoft.com/pfxteam/how-do-i-cancel-non-cancelable-async-operations/
using (cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs))
{
if (task != await Task.WhenAny(task, tcs.Task))
{
task.Ignore();
throw new OperationCanceledException(cancellationToken);
}
}

await task.ConfigureAwait(false);
}
#endif

public static Task TimeoutAfter(this Task task, TimeSpan timeout)
Expand Down
9 changes: 5 additions & 4 deletions projects/RabbitMQ.Client/client/api/ConnectionConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@

using System;
using System.Collections.Generic;

using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client
Expand Down Expand Up @@ -142,7 +143,7 @@ public sealed class ConnectionConfig
/// </summary>
public readonly int DispatchConsumerConcurrency;

internal readonly Func<AmqpTcpEndpoint, IFrameHandler> FrameHandlerFactory;
internal readonly Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> FrameHandlerFactoryAsync;

internal ConnectionConfig(string virtualHost, string userName, string password,
ICredentialsProvider credentialsProvider, ICredentialsRefresher credentialsRefresher,
Expand All @@ -152,7 +153,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler,
TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout,
bool dispatchConsumersAsync, int dispatchConsumerConcurrency,
Func<AmqpTcpEndpoint, IFrameHandler> frameHandlerFactory)
Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
{
VirtualHost = virtualHost;
UserName = userName;
Expand All @@ -174,7 +175,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
RequestedConnectionTimeout = requestedConnectionTimeout;
DispatchConsumersAsync = dispatchConsumersAsync;
DispatchConsumerConcurrency = dispatchConsumerConcurrency;
FrameHandlerFactory = frameHandlerFactory;
FrameHandlerFactoryAsync = frameHandlerFactoryAsync;
}
}
}
Loading

0 comments on commit 2aed524

Please sign in to comment.