Skip to content

Commit 4256e08

Browse files
authored
Add a way to disable broker requirement checks (#1592)
* Add a way to disable broker requirement checks * Update comments * Update API approval
1 parent a13f4a2 commit 4256e08

File tree

10 files changed

+128
-32
lines changed

10 files changed

+128
-32
lines changed

src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/MigrateQueue/QueueMigrateToQuorumTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ public async Task SetUp()
287287
var connectionConfiguration = ConnectionConfiguration.Create(connectionString);
288288

289289
var managementClient = new ManagementClient(connectionConfiguration);
290-
var brokerVerifier = new BrokerVerifier(managementClient, true);
290+
var brokerVerifier = new BrokerVerifier(managementClient, BrokerRequirementChecks.None, true);
291291

292292
var connectionFactory = new RabbitMQ.ConnectionFactory("unit-tests", connectionConfiguration, null, true, false, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30), null);
293293

src/NServiceBus.Transport.RabbitMQ.CommandLine/BrokerConnectionBinder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ protected override BrokerConnection GetBoundValue(BindingContext bindingContext)
2626
var managementApiConfiguration = ManagementApiConfiguration.Create(managementApiUrl, managementApiUserName, managementApiPassword);
2727

2828
var managementClient = new ManagementClient(connectionConfiguration, managementApiConfiguration);
29-
var brokerVerifier = new BrokerVerifier(managementClient, true);
29+
var brokerVerifier = new BrokerVerifier(managementClient, BrokerRequirementChecks.None, true);
3030

3131
var certificateCollection = new X509Certificate2Collection();
3232

src/NServiceBus.Transport.RabbitMQ.CommandLine/BrokerVerifierBinder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ protected override BrokerVerifier GetBoundValue(BindingContext bindingContext)
2020
var managementApiConfiguration = ManagementApiConfiguration.Create(managementApiUrl, managementApiUserName, managementApiPassword);
2121

2222
var managementClient = new ManagementClient(connectionConfiguration, managementApiConfiguration);
23-
var brokerVerifier = new BrokerVerifier(managementClient, true);
23+
var brokerVerifier = new BrokerVerifier(managementClient, BrokerRequirementChecks.None, true);
2424

2525
return brokerVerifier;
2626
}

src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
<ItemGroup>
3030
<Compile Include="..\NServiceBus.Transport.RabbitMQ\Administration\BrokerVerifier.cs" Link="Transport\BrokerVerifier.cs" />
3131
<Compile Include="..\NServiceBus.Transport.RabbitMQ\Administration\ManagementApi\**\*.cs" Link="Transport\ManagementApi\%(RecursiveDir)\%(Filename)%(Extension)" />
32+
<Compile Include="..\NServiceBus.Transport.RabbitMQ\Configuration\BrokerRequirementChecks.cs" Link="Transport\ManagementApi\BrokerRequirementChecks.cs" />
3233
<Compile Include="..\NServiceBus.Transport.RabbitMQ\Configuration\ConnectionConfiguration.cs" Link="Transport\ConnectionConfiguration.cs" />
3334
<Compile Include="..\NServiceBus.Transport.RabbitMQ\Configuration\ManagementApiConfiguration.cs" Link="Transport\ManagementApi\ManagementApiConfiguration.cs" />
3435
<Compile Include="..\NServiceBus.Transport.RabbitMQ\Configuration\QueueType.cs" Link="Transport\QueueType.cs" />

src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,13 @@
33
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"ServiceControl.Transports.RabbitMQ, PublicKey=0024000004800000940000000602000000240000525341310004000001000100dde965e6172e019ac82c2639ffe494dd2e7dd16347c34762a05732b492e110f2e4e2e1b5ef2d85c848ccfb671ee20a47c8d1376276708dc30a90ff1121b647ba3b7259a6bc383b2034938ef0e275b58b920375ac605076178123693c6c4f1331661a62eba28c249386855637780e3ff5f23a6d854700eaa6803ef48907513b92")]
44
namespace NServiceBus
55
{
6+
[System.Flags]
7+
public enum BrokerRequirementChecks
8+
{
9+
None = 0,
10+
Version310OrNewer = 1,
11+
StreamsEnabled = 2,
12+
}
613
public class ManagementApiConfiguration
714
{
815
public ManagementApiConfiguration(string url) { }
@@ -29,6 +36,7 @@ namespace NServiceBus
2936
public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString) { }
3037
public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery) { }
3138
public System.Security.Cryptography.X509Certificates.X509Certificate2 ClientCertificate { get; set; }
39+
public NServiceBus.BrokerRequirementChecks DisabledBrokerRequirementChecks { get; set; }
3240
public System.TimeSpan HeartbeatInterval { get; set; }
3341
public NServiceBus.ManagementApiConfiguration ManagementApiConfiguration { get; set; }
3442
public System.Func<RabbitMQ.Client.Events.BasicDeliverEventArgs, string> MessageIdStrategy { get; set; }
@@ -51,6 +59,7 @@ namespace NServiceBus
5159
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> ConnectionString(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions, System.Func<string> getConnectionString) { }
5260
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> ConnectionString(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions, string connectionString) { }
5361
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> CustomMessageIdStrategy(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions, System.Func<RabbitMQ.Client.Events.BasicDeliverEventArgs, string> customIdStrategy) { }
62+
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> DisableBrokerRequirementChecks(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions, NServiceBus.BrokerRequirementChecks brokerRequirementChecks) { }
5463
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> DisableDurableExchangesAndQueues(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions) { }
5564
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> DisableRemoteCertificateValidation(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions) { }
5665
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> DoNotValidateDeliveryLimits(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions) { }

src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ class BrokerVerifierTests
1919
public void Initialize_Should_Get_Response_When_Management_Client_Is_Available_And_Valid()
2020
{
2121
var managementClient = new ManagementClient(connectionConfiguration);
22-
using var brokerVerifier = new BrokerVerifier(managementClient, true);
22+
using var brokerVerifier = new BrokerVerifier(managementClient, BrokerRequirementChecks.None, true);
2323

2424
Assert.DoesNotThrowAsync(async () => await brokerVerifier.Initialize());
2525
}
@@ -28,7 +28,7 @@ public void Initialize_Should_Get_Response_When_Management_Client_Is_Available_A
2828
public async Task ValidateDeliveryLimit_Should_Set_Delivery_Limit_Policy()
2929
{
3030
var managementClient = new ManagementClient(connectionConfiguration);
31-
using var brokerVerifier = new BrokerVerifier(managementClient, true);
31+
using var brokerVerifier = new BrokerVerifier(managementClient, BrokerRequirementChecks.None, true);
3232
await brokerVerifier.Initialize();
3333

3434
if (brokerVerifier.BrokerVersion < BrokerVerifier.BrokerVersion4)
@@ -67,7 +67,7 @@ public async Task ValidateDeliveryLimit_Should_Throw_When_Queue_Does_Not_Exist()
6767
{
6868
var queueName = "WrongQueue";
6969
var managementClient = new ManagementClient(connectionConfiguration);
70-
using var brokerVerifier = new BrokerVerifier(managementClient, true);
70+
using var brokerVerifier = new BrokerVerifier(managementClient, BrokerRequirementChecks.None, true);
7171
await brokerVerifier.Initialize();
7272

7373
var exception = Assert.ThrowsAsync<InvalidOperationException>(async () => await brokerVerifier.ValidateDeliveryLimit(queueName));
@@ -81,7 +81,7 @@ public async Task ValidateDeliveryLimit_Should_Pass_When_Called_For_Classic_Queu
8181
await CreateQueue(queueName, queueType: "classic");
8282

8383
var managementClient = new ManagementClient(connectionConfiguration);
84-
using var brokerVerifier = new BrokerVerifier(managementClient, true);
84+
using var brokerVerifier = new BrokerVerifier(managementClient, BrokerRequirementChecks.None, true);
8585
await brokerVerifier.Initialize();
8686

8787
await brokerVerifier.ValidateDeliveryLimit(queueName);
@@ -95,7 +95,7 @@ public async Task ValidateDeliveryLimit_Should_Throw_When_Queue_Argument_Has_Del
9595
await CreateQueue(queueName, deliveryLimit: deliveryLimit);
9696

9797
var managementClient = new ManagementClient(connectionConfiguration);
98-
using var brokerVerifier = new BrokerVerifier(managementClient, true);
98+
using var brokerVerifier = new BrokerVerifier(managementClient, BrokerRequirementChecks.None, true);
9999
await brokerVerifier.Initialize();
100100

101101
var exception = Assert.ThrowsAsync<InvalidOperationException>(async () => await brokerVerifier.ValidateDeliveryLimit(queueName));
@@ -109,7 +109,7 @@ public async Task ValidateDeliveryLimit_Should_Throw_When_A_Policy_On_Queue_Has_
109109
await CreateQueue(queueName);
110110

111111
var managementClient = new ManagementClient(connectionConfiguration);
112-
using var brokerVerifier = new BrokerVerifier(managementClient, true);
112+
using var brokerVerifier = new BrokerVerifier(managementClient, BrokerRequirementChecks.None, true);
113113
await brokerVerifier.Initialize();
114114

115115
var deliveryLimit = 15;

src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs

Lines changed: 53 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,10 @@ namespace NServiceBus.Transport.RabbitMQ;
77
using System.Net.Http;
88
using System.Threading;
99
using System.Threading.Tasks;
10-
#if !COMMANDLINE
11-
using NServiceBus.Logging;
12-
#endif
1310
using NServiceBus.Transport.RabbitMQ.ManagementApi;
1411

15-
class BrokerVerifier(ManagementClient managementClient, bool validateDeliveryLimits) : IDisposable
12+
class BrokerVerifier(ManagementClient managementClient, BrokerRequirementChecks disabledBrokerRequirementChecks, bool validateDeliveryLimits) : IDisposable
1613
{
17-
#if !COMMANDLINE
18-
static readonly ILog Logger = LogManager.GetLogger(typeof(BrokerVerifier));
19-
#endif
20-
2114
static readonly Version MinimumSupportedBrokerVersion = Version.Parse("3.10.0");
2215
public static readonly Version BrokerVersion4 = Version.Parse("4.0.0");
2316

@@ -39,6 +32,14 @@ public Version BrokerVersion
3932

4033
public async Task Initialize(CancellationToken cancellationToken = default)
4134
{
35+
//This needs to stay in sync with changes to BrokerRequirementChecks
36+
var all = BrokerRequirementChecks.Version310OrNewer | BrokerRequirementChecks.StreamsEnabled;
37+
38+
if (disabledBrokerRequirementChecks == all && !validateDeliveryLimits)
39+
{
40+
return;
41+
}
42+
4243
try
4344
{
4445
var overview = await managementClient.GetOverview(cancellationToken).ConfigureAwait(false);
@@ -67,36 +68,58 @@ static Version RemovePrereleaseFromVersion(string version)
6768

6869
public async Task VerifyRequirements(CancellationToken cancellationToken = default)
6970
{
70-
if (BrokerVersion < MinimumSupportedBrokerVersion)
71+
if ((disabledBrokerRequirementChecks & BrokerRequirementChecks.Version310OrNewer) == BrokerRequirementChecks.Version310OrNewer)
7172
{
72-
throw new InvalidOperationException($"An unsupported broker version was detected: {BrokerVersion}. The broker must be at least version {MinimumSupportedBrokerVersion}.");
73+
LogWarning("Verification of the minimum supported broker version has been disabled. The transport will not be able to ensure the delayed delivery infrastructure works properly.");
74+
}
75+
else
76+
{
77+
VerifyBrokerMinimumVersion();
7378
}
7479

75-
bool streamsEnabled;
76-
77-
try
80+
if ((disabledBrokerRequirementChecks & BrokerRequirementChecks.StreamsEnabled) == BrokerRequirementChecks.StreamsEnabled)
7881
{
79-
var featureFlags = await managementClient.GetFeatureFlags(cancellationToken).ConfigureAwait(false);
80-
streamsEnabled = featureFlags.HasEnabledFeature(FeatureFlag.StreamQueue);
82+
LogWarning("Verification of the 'stream_queue' feature flag has been disabled. The transport will not be able to ensure the delayed delivery infrastructure works properly.");
8183
}
82-
catch (HttpRequestException ex)
84+
else
8385
{
84-
throw new InvalidOperationException("There was a problem accessing the RabbitMQ management API to verify broker requirements. See the inner exception for details.", ex);
86+
await VerifyStreamEnabled(cancellationToken).ConfigureAwait(false);
8587
}
8688

87-
if (!streamsEnabled)
89+
void VerifyBrokerMinimumVersion()
8890
{
89-
throw new InvalidOperationException("An unsupported broker configuration was detected. The 'stream_queue' feature flag needs to be enabled.");
91+
if (BrokerVersion < MinimumSupportedBrokerVersion)
92+
{
93+
throw new InvalidOperationException($"An unsupported broker version was detected: {BrokerVersion}. The broker must be at least version {MinimumSupportedBrokerVersion}.");
94+
}
95+
}
96+
97+
async Task VerifyStreamEnabled(CancellationToken cancellationToken)
98+
{
99+
bool streamsEnabled;
100+
101+
try
102+
{
103+
var featureFlags = await managementClient.GetFeatureFlags(cancellationToken).ConfigureAwait(false);
104+
streamsEnabled = featureFlags.HasEnabledFeature(FeatureFlag.StreamQueue);
105+
}
106+
catch (HttpRequestException ex)
107+
{
108+
throw new InvalidOperationException("There was a problem accessing the RabbitMQ management API to verify broker requirements. See the inner exception for details.", ex);
109+
}
110+
111+
if (!streamsEnabled)
112+
{
113+
throw new InvalidOperationException("An unsupported broker configuration was detected. The 'stream_queue' feature flag needs to be enabled.");
114+
}
90115
}
91116
}
92117

93118
public async Task ValidateDeliveryLimit(string queueName, CancellationToken cancellationToken = default)
94119
{
95120
if (!validateDeliveryLimits)
96121
{
97-
#if !COMMANDLINE
98-
Logger.Warn("Validation of delivery limits has been disabled. The transport will not be able to ensure that messages are not lost after repeated retries.");
99-
#endif
122+
LogWarning("Validation of delivery limits has been disabled. The transport will not be able to ensure that messages are not lost after repeated retries.");
100123
return;
101124
}
102125

@@ -193,6 +216,14 @@ async Task SetDeliveryLimitViaPolicy(Queue queue, CancellationToken cancellation
193216
}
194217
}
195218

219+
static void LogWarning(string message)
220+
{
221+
#if !COMMANDLINE
222+
var logger = Logging.LogManager.GetLogger(typeof(BrokerVerifier));
223+
logger.Warn(message);
224+
#endif
225+
}
226+
196227
protected virtual void Dispose(bool disposing)
197228
{
198229
if (!disposed)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
namespace NServiceBus;
2+
3+
using System;
4+
5+
//Any changes to BrokerRequirementChecks need to be synchronzied with the `all` value in BrokerVerifier.Initialize.
6+
7+
/// <summary>
8+
/// The broker requirements that the transport will verify.
9+
/// </summary>
10+
[Flags]
11+
public enum BrokerRequirementChecks
12+
{
13+
/// <summary>
14+
/// None
15+
/// </summary>
16+
None = 0,
17+
18+
/// <summary>
19+
/// The transport requires broker version 3.10 or newer.
20+
/// </summary>
21+
Version310OrNewer = 1,
22+
23+
/// <summary>
24+
/// The 'stream-queue' feature flag needs to be enabled.
25+
/// </summary>
26+
StreamsEnabled = 2,
27+
}
28+

src/NServiceBus.Transport.RabbitMQ/Configuration/RabbitMQTransportSettingsExtensions.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,25 @@ public static TransportExtensions<RabbitMQTransport> CustomMessageIdStrategy(thi
114114
return transportExtensions;
115115
}
116116

117+
/// <summary>
118+
/// Disables the specified broker requirement checks.
119+
/// </summary>
120+
/// <param name="transportExtensions"></param>
121+
/// <param name="brokerRequirementChecks">The broker requirement checks to disable.</param>
122+
/// <remarks>
123+
/// Using a broker that does not meet all of the requirements can result in message loss or other incorrect operation, so disabling checks is not recommended.
124+
/// </remarks>
125+
[PreObsolete("https://github.com/Particular/NServiceBus/issues/6811",
126+
Message = "This is now part of routing topology configuration, which has been moved to the constructor of the RabbitMQTransport class.",
127+
Note = "Should not be converted to an ObsoleteEx until API mismatch described in issue is resolved.")]
128+
public static TransportExtensions<RabbitMQTransport> DisableBrokerRequirementChecks(this TransportExtensions<RabbitMQTransport> transportExtensions, BrokerRequirementChecks brokerRequirementChecks)
129+
{
130+
ArgumentNullException.ThrowIfNull(transportExtensions);
131+
132+
transportExtensions.Transport.DisabledBrokerRequirementChecks = brokerRequirementChecks;
133+
return transportExtensions;
134+
}
135+
117136
/// <summary>
118137
/// Specifies that exchanges and queues should be declared as non-durable.
119138
/// </summary>

src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,14 @@ public PrefetchCountCalculation PrefetchCountCalculation
150150
/// </summary>
151151
public ManagementApiConfiguration ManagementApiConfiguration { get; set; }
152152

153+
/// <summary>
154+
/// The broker requirement checks to disable.
155+
/// </summary>
156+
/// <remarks>
157+
/// Using a broker that does not meet all of the requirements can result in message loss or other incorrect operation, so disabling the checks is not recommended.
158+
/// </remarks>
159+
public BrokerRequirementChecks DisabledBrokerRequirementChecks { get; set; }
160+
153161
/// <summary>
154162
/// The interval for heartbeats between the endpoint and the broker.
155163
/// </summary>
@@ -229,7 +237,7 @@ public override async Task<TransportInfrastructure> Initialize(HostSettings host
229237

230238
ManagementClient = new ManagementClient(ConnectionConfiguration, ManagementApiConfiguration);
231239

232-
var brokerVerifier = new BrokerVerifier(ManagementClient, ValidateDeliveryLimits);
240+
var brokerVerifier = new BrokerVerifier(ManagementClient, DisabledBrokerRequirementChecks, ValidateDeliveryLimits);
233241
await brokerVerifier.Initialize(cancellationToken).ConfigureAwait(false);
234242
await brokerVerifier.VerifyRequirements(cancellationToken).ConfigureAwait(false);
235243

0 commit comments

Comments
 (0)