Skip to content

Commit 932d51a

Browse files
Merge pull request #1442 from Particular/reliability-r70
Prevent connections from leaking in case of reconnects
2 parents ba2c20f + 096b310 commit 932d51a

File tree

6 files changed

+241
-22
lines changed

6 files changed

+241
-22
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ jobs:
5151
creds: ${{ secrets.AZURE_ACI_CREDENTIALS }}
5252
enable-AzPSSession: true
5353
- name: Setup RabbitMQ
54-
uses: Particular/setup-rabbitmq-action@v1.6.0
54+
uses: Particular/setup-rabbitmq-action@v1.7.0
5555
with:
5656
connection-string-name: RabbitMQTransport_ConnectionString
5757
tag: RabbitMQTransport

src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
<PackageReference Include="Particular.Packaging" Version="2.3.0" PrivateAssets="All" />
2424
</ItemGroup>
2525

26+
<ItemGroup Label="Direct references to transitive dependencies to avoid versions with CVE">
27+
<PackageReference Include="System.Formats.Asn1" Version="8.0.1" />
28+
</ItemGroup>
29+
2630
<ItemGroup>
2731
<Compile Include="..\NServiceBus.Transport.RabbitMQ\Configuration\ConnectionConfiguration.cs" Link="Transport\ConnectionConfiguration.cs" />
2832
<Compile Include="..\NServiceBus.Transport.RabbitMQ\Configuration\QueueType.cs" Link="Transport\QueueType.cs" />
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
namespace NServiceBus.Transport.RabbitMQ.Tests.ConnectionString
2+
{
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using global::RabbitMQ.Client;
8+
using global::RabbitMQ.Client.Events;
9+
using NUnit.Framework;
10+
11+
[TestFixture]
12+
public class ChannelProviderTests
13+
{
14+
[Test]
15+
public async Task Should_recover_connection_and_dispose_old_one_when_connection_shutdown()
16+
{
17+
var channelProvider = new TestableChannelProvider();
18+
channelProvider.CreateConnection();
19+
20+
var publishConnection = channelProvider.PublishConnections.Dequeue();
21+
publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test"));
22+
23+
channelProvider.DelayTaskCompletionSource.SetResult(true);
24+
25+
await channelProvider.FireAndForgetAction(CancellationToken.None);
26+
27+
var recoveredConnection = channelProvider.PublishConnections.Dequeue();
28+
29+
Assert.That(publishConnection.WasDisposed, Is.True);
30+
Assert.That(recoveredConnection.WasDisposed, Is.False);
31+
}
32+
33+
[Test]
34+
public void Should_dispose_connection_when_disposed()
35+
{
36+
var channelProvider = new TestableChannelProvider();
37+
channelProvider.CreateConnection();
38+
39+
var publishConnection = channelProvider.PublishConnections.Dequeue();
40+
channelProvider.Dispose();
41+
42+
Assert.That(publishConnection.WasDisposed, Is.True);
43+
}
44+
45+
[Test]
46+
public async Task Should_not_attempt_to_recover_during_dispose_when_retry_delay_still_pending()
47+
{
48+
var channelProvider = new TestableChannelProvider();
49+
channelProvider.CreateConnection();
50+
51+
var publishConnection = channelProvider.PublishConnections.Dequeue();
52+
publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test"));
53+
54+
// Deliberately not completing the delay task with channelProvider.DelayTaskCompletionSource.SetResult(); before disposing
55+
// to simulate a pending delay task
56+
channelProvider.Dispose();
57+
58+
await channelProvider.FireAndForgetAction(CancellationToken.None);
59+
60+
Assert.That(publishConnection.WasDisposed, Is.True);
61+
Assert.That(channelProvider.PublishConnections, Has.Count.Zero);
62+
}
63+
64+
[Test]
65+
public async Task Should_dispose_newly_established_connection()
66+
{
67+
var channelProvider = new TestableChannelProvider();
68+
channelProvider.CreateConnection();
69+
70+
var publishConnection = channelProvider.PublishConnections.Dequeue();
71+
publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test"));
72+
73+
// This simulates the race of the reconnection loop being fired off with the delay task completed during
74+
// the disposal of the channel provider. To achieve that it is necessary to kick off the reconnection loop
75+
// and await its completion after the channel provider has been disposed.
76+
var fireAndForgetTask = channelProvider.FireAndForgetAction(CancellationToken.None);
77+
channelProvider.DelayTaskCompletionSource.SetResult(true);
78+
channelProvider.Dispose();
79+
80+
await fireAndForgetTask;
81+
82+
var recoveredConnection = channelProvider.PublishConnections.Dequeue();
83+
84+
Assert.That(publishConnection.WasDisposed, Is.True);
85+
Assert.That(recoveredConnection.WasDisposed, Is.True);
86+
}
87+
88+
class TestableChannelProvider : ChannelProvider
89+
{
90+
public TestableChannelProvider() : base(null, TimeSpan.Zero, null)
91+
{
92+
}
93+
94+
public Queue<FakeConnection> PublishConnections { get; } = new Queue<FakeConnection>();
95+
96+
public TaskCompletionSource<bool> DelayTaskCompletionSource { get; } = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
97+
98+
public Func<CancellationToken, Task> FireAndForgetAction { get; private set; }
99+
100+
protected override IConnection CreatePublishConnection()
101+
{
102+
var connection = new FakeConnection();
103+
PublishConnections.Enqueue(connection);
104+
return connection;
105+
}
106+
107+
protected override void FireAndForget(Func<CancellationToken, Task> action, CancellationToken cancellationToken = default)
108+
=> FireAndForgetAction = _ => action(cancellationToken);
109+
110+
protected override async Task DelayReconnect(CancellationToken cancellationToken = default)
111+
{
112+
using (var _ = cancellationToken.Register(() => DelayTaskCompletionSource.TrySetCanceled(cancellationToken)))
113+
{
114+
await DelayTaskCompletionSource.Task;
115+
}
116+
}
117+
}
118+
119+
class FakeConnection : IConnection
120+
{
121+
public int LocalPort { get; }
122+
public int RemotePort { get; }
123+
124+
public void Dispose() => WasDisposed = true;
125+
126+
public bool WasDisposed { get; private set; }
127+
128+
public void UpdateSecret(string newSecret, string reason) => throw new NotImplementedException();
129+
130+
public void Abort() => throw new NotImplementedException();
131+
132+
public void Abort(ushort reasonCode, string reasonText) => throw new NotImplementedException();
133+
134+
public void Abort(TimeSpan timeout) => throw new NotImplementedException();
135+
136+
public void Abort(ushort reasonCode, string reasonText, TimeSpan timeout) => throw new NotImplementedException();
137+
138+
public void Close() => throw new NotImplementedException();
139+
140+
public void Close(ushort reasonCode, string reasonText) => throw new NotImplementedException();
141+
142+
public void Close(TimeSpan timeout) => throw new NotImplementedException();
143+
144+
public void Close(ushort reasonCode, string reasonText, TimeSpan timeout) => throw new NotImplementedException();
145+
146+
public IModel CreateModel() => throw new NotImplementedException();
147+
148+
public void HandleConnectionBlocked(string reason) => throw new NotImplementedException();
149+
150+
public void HandleConnectionUnblocked() => throw new NotImplementedException();
151+
152+
public ushort ChannelMax { get; }
153+
public IDictionary<string, object> ClientProperties { get; }
154+
public ShutdownEventArgs CloseReason { get; }
155+
public AmqpTcpEndpoint Endpoint { get; }
156+
public uint FrameMax { get; }
157+
public TimeSpan Heartbeat { get; }
158+
public bool IsOpen { get; }
159+
public AmqpTcpEndpoint[] KnownHosts { get; }
160+
public IProtocol Protocol { get; }
161+
public IDictionary<string, object> ServerProperties { get; }
162+
public IList<ShutdownReportEntry> ShutdownReport { get; }
163+
public string ClientProvidedName { get; } = $"FakeConnection{Interlocked.Increment(ref connectionCounter)}";
164+
public event EventHandler<CallbackExceptionEventArgs> CallbackException = (sender, args) => { };
165+
public event EventHandler<ConnectionBlockedEventArgs> ConnectionBlocked = (sender, args) => { };
166+
public event EventHandler<ShutdownEventArgs> ConnectionShutdown = (sender, args) => { };
167+
public event EventHandler<EventArgs> ConnectionUnblocked = (sender, args) => { };
168+
169+
public void RaiseConnectionShutdown(ShutdownEventArgs args) => ConnectionShutdown?.Invoke(this, args);
170+
171+
static int connectionCounter;
172+
}
173+
}
174+
}

src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs

Lines changed: 62 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@ namespace NServiceBus.Transport.RabbitMQ
22
{
33
using System;
44
using System.Collections.Concurrent;
5+
using System.Threading;
56
using System.Threading.Tasks;
67
using global::RabbitMQ.Client;
78
using Logging;
89

9-
sealed class ChannelProvider : IDisposable
10+
class ChannelProvider : IDisposable
1011
{
1112
public ChannelProvider(ConnectionFactory connectionFactory, TimeSpan retryDelay, IRoutingTopology routingTopology)
1213
{
@@ -18,46 +19,73 @@ public ChannelProvider(ConnectionFactory connectionFactory, TimeSpan retryDelay,
1819
channels = new ConcurrentQueue<ConfirmsAwareChannel>();
1920
}
2021

21-
public void CreateConnection()
22+
public void CreateConnection() => connection = CreateConnectionWithShutdownListener();
23+
24+
protected virtual IConnection CreatePublishConnection() => connectionFactory.CreatePublishConnection();
25+
26+
IConnection CreateConnectionWithShutdownListener()
2227
{
23-
connection = connectionFactory.CreatePublishConnection();
24-
connection.ConnectionShutdown += Connection_ConnectionShutdown;
28+
var newConnection = CreatePublishConnection();
29+
newConnection.ConnectionShutdown += Connection_ConnectionShutdown;
30+
return newConnection;
2531
}
2632

2733
void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
2834
{
29-
if (e.Initiator != ShutdownInitiator.Application)
35+
if (e.Initiator == ShutdownInitiator.Application || sender is null)
3036
{
31-
var connection = (IConnection)sender;
32-
33-
_ = Task.Run(() => Reconnect(connection.ClientProvidedName));
37+
return;
3438
}
39+
40+
var connectionThatWasShutdown = (IConnection)sender;
41+
42+
FireAndForget(cancellationToken => ReconnectSwallowingExceptions(connectionThatWasShutdown.ClientProvidedName, cancellationToken), stoppingTokenSource.Token);
3543
}
3644

37-
async Task Reconnect(string connectionName)
45+
async Task ReconnectSwallowingExceptions(string connectionName, CancellationToken cancellationToken)
3846
{
39-
var reconnected = false;
40-
41-
while (!reconnected)
47+
while (!cancellationToken.IsCancellationRequested)
4248
{
4349
Logger.InfoFormat("'{0}': Attempting to reconnect in {1} seconds.", connectionName, retryDelay.TotalSeconds);
4450

45-
await Task.Delay(retryDelay).ConfigureAwait(false);
46-
4751
try
4852
{
49-
CreateConnection();
50-
reconnected = true;
53+
await DelayReconnect(cancellationToken).ConfigureAwait(false);
54+
55+
var newConnection = CreateConnectionWithShutdownListener();
5156

52-
Logger.InfoFormat("'{0}': Connection to the broker reestablished successfully.", connectionName);
57+
// A race condition is possible where CreatePublishConnection is invoked during Dispose
58+
// where the returned connection isn't disposed so invoking Dispose to be sure
59+
if (cancellationToken.IsCancellationRequested)
60+
{
61+
newConnection.Dispose();
62+
break;
63+
}
64+
65+
var oldConnection = Interlocked.Exchange(ref connection, newConnection);
66+
oldConnection?.Dispose();
67+
break;
5368
}
54-
catch (Exception e)
69+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
5570
{
56-
Logger.InfoFormat("'{0}': Reconnecting to the broker failed: {1}", connectionName, e);
71+
Logger.InfoFormat("'{0}': Stopped trying to reconnecting to the broker due to shutdown", connectionName);
72+
break;
73+
}
74+
catch (Exception ex)
75+
{
76+
Logger.InfoFormat("'{0}': Reconnecting to the broker failed: {1}", connectionName, ex);
5777
}
5878
}
79+
80+
Logger.InfoFormat("'{0}': Connection to the broker reestablished successfully.", connectionName);
5981
}
6082

83+
protected virtual void FireAndForget(Func<CancellationToken, Task> action, CancellationToken cancellationToken = default) =>
84+
// Task.Run() so the call returns immediately instead of waiting for the first await or return down the call stack
85+
_ = Task.Run(() => action(cancellationToken), CancellationToken.None);
86+
87+
protected virtual Task DelayReconnect(CancellationToken cancellationToken = default) => Task.Delay(retryDelay, cancellationToken);
88+
6189
public ConfirmsAwareChannel GetPublishChannel()
6290
{
6391
if (!channels.TryDequeue(out var channel) || channel.IsClosed)
@@ -84,19 +112,32 @@ public void ReturnPublishChannel(ConfirmsAwareChannel channel)
84112

85113
public void Dispose()
86114
{
87-
connection?.Dispose();
115+
if (disposed)
116+
{
117+
return;
118+
}
119+
120+
stoppingTokenSource.Cancel();
121+
stoppingTokenSource.Dispose();
122+
123+
var oldConnection = Interlocked.Exchange(ref connection, null);
124+
oldConnection?.Dispose();
88125

89126
foreach (var channel in channels)
90127
{
91128
channel.Dispose();
92129
}
130+
131+
disposed = true;
93132
}
94133

95134
readonly ConnectionFactory connectionFactory;
96135
readonly TimeSpan retryDelay;
97136
readonly IRoutingTopology routingTopology;
98137
readonly ConcurrentQueue<ConfirmsAwareChannel> channels;
99-
IConnection connection;
138+
readonly CancellationTokenSource stoppingTokenSource = new CancellationTokenSource();
139+
volatile IConnection connection;
140+
bool disposed;
100141

101142
static readonly ILog Logger = LogManager.GetLogger(typeof(ChannelProvider));
102143
}

0 commit comments

Comments
 (0)