Skip to content

Commit df26537

Browse files
Merge pull request #1440 from Particular/reliability-r90
Improve reliability of channel provider in case of reconnects
2 parents e0bf989 + 96e2a9b commit df26537

File tree

8 files changed

+243
-21
lines changed

8 files changed

+243
-21
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ jobs:
5151
creds: ${{ secrets.AZURE_ACI_CREDENTIALS }}
5252
enable-AzPSSession: true
5353
- name: Setup RabbitMQ
54-
uses: Particular/setup-rabbitmq-action@v1.6.0
54+
uses: Particular/setup-rabbitmq-action@v1.7.0
5555
with:
5656
connection-string-name: RabbitMQTransport_ConnectionString
5757
tag: RabbitMQTransport

src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
<PackageReference Include="Particular.Packaging" Version="4.0.0" PrivateAssets="All" />
2424
</ItemGroup>
2525

26+
<ItemGroup Label="Direct references to transitive dependencies to avoid versions with CVE">
27+
<PackageReference Include="System.Formats.Asn1" Version="8.0.1" />
28+
</ItemGroup>
29+
2630
<ItemGroup>
2731
<Compile Include="..\NServiceBus.Transport.RabbitMQ\Configuration\ConnectionConfiguration.cs" Link="Transport\ConnectionConfiguration.cs" />
2832
<Compile Include="..\NServiceBus.Transport.RabbitMQ\Configuration\QueueType.cs" Link="Transport\QueueType.cs" />
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
namespace NServiceBus.Transport.RabbitMQ.Tests.ConnectionString
2+
{
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using global::RabbitMQ.Client;
8+
using global::RabbitMQ.Client.Events;
9+
using NUnit.Framework;
10+
11+
[TestFixture]
12+
public class ChannelProviderTests
13+
{
14+
[Test]
15+
public async Task Should_recover_connection_and_dispose_old_one_when_connection_shutdown()
16+
{
17+
var channelProvider = new TestableChannelProvider();
18+
channelProvider.CreateConnection();
19+
20+
var publishConnection = channelProvider.PublishConnections.Dequeue();
21+
publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test"));
22+
23+
channelProvider.DelayTaskCompletionSource.SetResult();
24+
25+
await channelProvider.FireAndForgetAction(CancellationToken.None);
26+
27+
var recoveredConnection = channelProvider.PublishConnections.Dequeue();
28+
29+
Assert.That(publishConnection.WasDisposed, Is.True);
30+
Assert.That(recoveredConnection.WasDisposed, Is.False);
31+
}
32+
33+
[Test]
34+
public void Should_dispose_connection_when_disposed()
35+
{
36+
var channelProvider = new TestableChannelProvider();
37+
channelProvider.CreateConnection();
38+
39+
var publishConnection = channelProvider.PublishConnections.Dequeue();
40+
channelProvider.Dispose();
41+
42+
Assert.That(publishConnection.WasDisposed, Is.True);
43+
}
44+
45+
[Test]
46+
public async Task Should_not_attempt_to_recover_during_dispose_when_retry_delay_still_pending()
47+
{
48+
var channelProvider = new TestableChannelProvider();
49+
channelProvider.CreateConnection();
50+
51+
var publishConnection = channelProvider.PublishConnections.Dequeue();
52+
publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test"));
53+
54+
// Deliberately not completing the delay task with channelProvider.DelayTaskCompletionSource.SetResult(); before disposing
55+
// to simulate a pending delay task
56+
channelProvider.Dispose();
57+
58+
await channelProvider.FireAndForgetAction(CancellationToken.None);
59+
60+
Assert.That(publishConnection.WasDisposed, Is.True);
61+
Assert.That(channelProvider.PublishConnections.TryDequeue(out _), Is.False);
62+
}
63+
64+
[Test]
65+
public async Task Should_dispose_newly_established_connection()
66+
{
67+
var channelProvider = new TestableChannelProvider();
68+
channelProvider.CreateConnection();
69+
70+
var publishConnection = channelProvider.PublishConnections.Dequeue();
71+
publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test"));
72+
73+
// This simulates the race of the reconnection loop being fired off with the delay task completed during
74+
// the disposal of the channel provider. To achieve that it is necessary to kick off the reconnection loop
75+
// and await its completion after the channel provider has been disposed.
76+
var fireAndForgetTask = channelProvider.FireAndForgetAction(CancellationToken.None);
77+
channelProvider.DelayTaskCompletionSource.SetResult();
78+
channelProvider.Dispose();
79+
80+
await fireAndForgetTask;
81+
82+
var recoveredConnection = channelProvider.PublishConnections.Dequeue();
83+
84+
Assert.That(publishConnection.WasDisposed, Is.True);
85+
Assert.That(recoveredConnection.WasDisposed, Is.True);
86+
}
87+
88+
class TestableChannelProvider() : ChannelProvider(null!, TimeSpan.Zero, null!)
89+
{
90+
public Queue<FakeConnection> PublishConnections { get; } = new();
91+
92+
public TaskCompletionSource DelayTaskCompletionSource { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously);
93+
94+
public Func<CancellationToken, Task> FireAndForgetAction { get; private set; }
95+
96+
protected override IConnection CreatePublishConnection()
97+
{
98+
var connection = new FakeConnection();
99+
PublishConnections.Enqueue(connection);
100+
return connection;
101+
}
102+
103+
protected override void FireAndForget(Func<CancellationToken, Task> action, CancellationToken cancellationToken = default)
104+
=> FireAndForgetAction = _ => action(cancellationToken);
105+
106+
protected override async Task DelayReconnect(CancellationToken cancellationToken = default)
107+
{
108+
await using var _ = cancellationToken.Register(() => DelayTaskCompletionSource.TrySetCanceled(cancellationToken));
109+
await DelayTaskCompletionSource.Task;
110+
}
111+
}
112+
113+
class FakeConnection : IConnection
114+
{
115+
public int LocalPort { get; }
116+
public int RemotePort { get; }
117+
118+
public void Dispose() => WasDisposed = true;
119+
120+
public bool WasDisposed { get; private set; }
121+
122+
public void UpdateSecret(string newSecret, string reason) => throw new NotImplementedException();
123+
124+
public void Abort() => throw new NotImplementedException();
125+
126+
public void Abort(ushort reasonCode, string reasonText) => throw new NotImplementedException();
127+
128+
public void Abort(TimeSpan timeout) => throw new NotImplementedException();
129+
130+
public void Abort(ushort reasonCode, string reasonText, TimeSpan timeout) => throw new NotImplementedException();
131+
132+
public void Close() => throw new NotImplementedException();
133+
134+
public void Close(ushort reasonCode, string reasonText) => throw new NotImplementedException();
135+
136+
public void Close(TimeSpan timeout) => throw new NotImplementedException();
137+
138+
public void Close(ushort reasonCode, string reasonText, TimeSpan timeout) => throw new NotImplementedException();
139+
140+
public IModel CreateModel() => throw new NotImplementedException();
141+
142+
public void HandleConnectionBlocked(string reason) => throw new NotImplementedException();
143+
144+
public void HandleConnectionUnblocked() => throw new NotImplementedException();
145+
146+
public ushort ChannelMax { get; }
147+
public IDictionary<string, object> ClientProperties { get; }
148+
public ShutdownEventArgs CloseReason { get; }
149+
public AmqpTcpEndpoint Endpoint { get; }
150+
public uint FrameMax { get; }
151+
public TimeSpan Heartbeat { get; }
152+
public bool IsOpen { get; }
153+
public AmqpTcpEndpoint[] KnownHosts { get; }
154+
public IProtocol Protocol { get; }
155+
public IDictionary<string, object> ServerProperties { get; }
156+
public IList<ShutdownReportEntry> ShutdownReport { get; }
157+
public string ClientProvidedName { get; } = $"FakeConnection{Interlocked.Increment(ref connectionCounter)}";
158+
public event EventHandler<CallbackExceptionEventArgs> CallbackException = (_, _) => { };
159+
public event EventHandler<ConnectionBlockedEventArgs> ConnectionBlocked = (_, _) => { };
160+
public event EventHandler<ShutdownEventArgs> ConnectionShutdown = (_, _) => { };
161+
public event EventHandler<EventArgs> ConnectionUnblocked = (_, _) => { };
162+
163+
public void RaiseConnectionShutdown(ShutdownEventArgs args) => ConnectionShutdown?.Invoke(this, args);
164+
165+
static int connectionCounter;
166+
}
167+
}
168+
}

src/NServiceBus.Transport.RabbitMQ.TransportTests/When_changing_concurrency.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public class When_changing_concurrency : NServiceBusTransportTest
1616
public async Task Should_complete_current_message(TransportTransactionMode transactionMode)
1717
{
1818
var triggeredChangeConcurrency = CreateTaskCompletionSource();
19+
var sentMessageReceived = CreateTaskCompletionSource();
1920
Task concurrencyChanged = null;
2021
int invocationCounter = 0;
2122

@@ -30,6 +31,7 @@ await StartPump(async (context, ct) =>
3031
await task;
3132
}, ct);
3233

34+
sentMessageReceived.SetResult();
3335
await triggeredChangeConcurrency.Task;
3436

3537
}, (_, _) =>
@@ -40,8 +42,10 @@ await StartPump(async (context, ct) =>
4042
transactionMode);
4143

4244
await SendMessage(InputQueueName);
45+
await sentMessageReceived.Task;
4346
await concurrencyChanged;
4447
await StopPump();
48+
4549
Assert.AreEqual(1, invocationCounter, "message should successfully complete on first processing attempt");
4650
}
4751

@@ -62,6 +66,7 @@ await StartPump((context, _) =>
6266
if (context.Headers.TryGetValue("FromOnError", out var value) && value == bool.TrueString)
6367
{
6468
sentMessageReceived.SetResult();
69+
return Task.CompletedTask;
6570
}
6671

6772
throw new Exception("triggering recoverability pipeline");
@@ -84,9 +89,9 @@ await SendMessage(InputQueueName,
8489
transactionMode);
8590

8691
await SendMessage(InputQueueName);
87-
8892
await sentMessageReceived.Task;
8993
await StopPump();
94+
9095
Assert.AreEqual(2, invocationCounter, "there should be exactly 2 messages (initial message and new message from onError pipeline)");
9196
}
9297
}

src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs

Lines changed: 60 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#nullable enable
2+
13
namespace NServiceBus.Transport.RabbitMQ
24
{
35
using System;
@@ -7,7 +9,7 @@ namespace NServiceBus.Transport.RabbitMQ
79
using global::RabbitMQ.Client;
810
using Logging;
911

10-
sealed class ChannelProvider : IDisposable
12+
class ChannelProvider : IDisposable
1113
{
1214
public ChannelProvider(ConnectionFactory connectionFactory, TimeSpan retryDelay, IRoutingTopology routingTopology)
1315
{
@@ -19,36 +21,56 @@ public ChannelProvider(ConnectionFactory connectionFactory, TimeSpan retryDelay,
1921
channels = new ConcurrentQueue<ConfirmsAwareChannel>();
2022
}
2123

22-
public void CreateConnection()
24+
public void CreateConnection() => connection = CreateConnectionWithShutdownListener();
25+
26+
protected virtual IConnection CreatePublishConnection() => connectionFactory.CreatePublishConnection();
27+
28+
IConnection CreateConnectionWithShutdownListener()
2329
{
24-
connection = connectionFactory.CreatePublishConnection();
25-
connection.ConnectionShutdown += Connection_ConnectionShutdown;
30+
var newConnection = CreatePublishConnection();
31+
newConnection.ConnectionShutdown += Connection_ConnectionShutdown;
32+
return newConnection;
2633
}
2734

28-
void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
35+
void Connection_ConnectionShutdown(object? sender, ShutdownEventArgs e)
2936
{
30-
if (e.Initiator != ShutdownInitiator.Application)
37+
if (e.Initiator == ShutdownInitiator.Application || sender is null)
3138
{
32-
var connection = (IConnection)sender;
33-
34-
// Task.Run() so the call returns immediately instead of waiting for the first await or return down the call stack
35-
_ = Task.Run(() => ReconnectSwallowingExceptions(connection.ClientProvidedName), CancellationToken.None);
39+
return;
3640
}
41+
42+
var connectionThatWasShutdown = (IConnection)sender;
43+
44+
FireAndForget(cancellationToken => ReconnectSwallowingExceptions(connectionThatWasShutdown.ClientProvidedName, cancellationToken), stoppingTokenSource.Token);
3745
}
3846

39-
#pragma warning disable PS0018 // A task-returning method should have a CancellationToken parameter unless it has a parameter implementing ICancellableContext
40-
async Task ReconnectSwallowingExceptions(string connectionName)
41-
#pragma warning restore PS0018 // A task-returning method should have a CancellationToken parameter unless it has a parameter implementing ICancellableContext
47+
async Task ReconnectSwallowingExceptions(string connectionName, CancellationToken cancellationToken)
4248
{
43-
while (true)
49+
while (!cancellationToken.IsCancellationRequested)
4450
{
4551
Logger.InfoFormat("'{0}': Attempting to reconnect in {1} seconds.", connectionName, retryDelay.TotalSeconds);
4652

47-
await Task.Delay(retryDelay).ConfigureAwait(false);
48-
4953
try
5054
{
51-
CreateConnection();
55+
await DelayReconnect(cancellationToken).ConfigureAwait(false);
56+
57+
var newConnection = CreateConnectionWithShutdownListener();
58+
59+
// A race condition is possible where CreatePublishConnection is invoked during Dispose
60+
// where the returned connection isn't disposed so invoking Dispose to be sure
61+
if (cancellationToken.IsCancellationRequested)
62+
{
63+
newConnection.Dispose();
64+
break;
65+
}
66+
67+
var oldConnection = Interlocked.Exchange(ref connection, newConnection);
68+
oldConnection?.Dispose();
69+
break;
70+
}
71+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
72+
{
73+
Logger.InfoFormat("'{0}': Stopped trying to reconnecting to the broker due to shutdown", connectionName);
5274
break;
5375
}
5476
catch (Exception ex)
@@ -60,6 +82,12 @@ async Task ReconnectSwallowingExceptions(string connectionName)
6082
Logger.InfoFormat("'{0}': Connection to the broker reestablished successfully.", connectionName);
6183
}
6284

85+
protected virtual void FireAndForget(Func<CancellationToken, Task> action, CancellationToken cancellationToken = default) =>
86+
// Task.Run() so the call returns immediately instead of waiting for the first await or return down the call stack
87+
_ = Task.Run(() => action(cancellationToken), CancellationToken.None);
88+
89+
protected virtual Task DelayReconnect(CancellationToken cancellationToken = default) => Task.Delay(retryDelay, cancellationToken);
90+
6391
public ConfirmsAwareChannel GetPublishChannel()
6492
{
6593
if (!channels.TryDequeue(out var channel) || channel.IsClosed)
@@ -86,19 +114,32 @@ public void ReturnPublishChannel(ConfirmsAwareChannel channel)
86114

87115
public void Dispose()
88116
{
89-
connection?.Dispose();
117+
if (disposed)
118+
{
119+
return;
120+
}
121+
122+
stoppingTokenSource.Cancel();
123+
stoppingTokenSource.Dispose();
124+
125+
var oldConnection = Interlocked.Exchange(ref connection, null);
126+
oldConnection?.Dispose();
90127

91128
foreach (var channel in channels)
92129
{
93130
channel.Dispose();
94131
}
132+
133+
disposed = true;
95134
}
96135

97136
readonly ConnectionFactory connectionFactory;
98137
readonly TimeSpan retryDelay;
99138
readonly IRoutingTopology routingTopology;
100139
readonly ConcurrentQueue<ConfirmsAwareChannel> channels;
101-
IConnection connection;
140+
readonly CancellationTokenSource stoppingTokenSource = new();
141+
volatile IConnection? connection;
142+
bool disposed;
102143

103144
static readonly ILog Logger = LogManager.GetLogger(typeof(ChannelProvider));
104145
}

src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
<PackageReference Include="Particular.Packaging" Version="4.0.0" PrivateAssets="All" />
1919
</ItemGroup>
2020

21+
<ItemGroup Label="Direct references to transitive dependencies to avoid versions with CVE">
22+
<PackageReference Include="System.Formats.Asn1" Version="8.0.1" />
23+
</ItemGroup>
24+
2125
<PropertyGroup>
2226
<PackageId>NServiceBus.RabbitMQ</PackageId>
2327
<Description>RabbitMQ support for NServiceBus</Description>

0 commit comments

Comments
 (0)