Skip to content

Commit 624cf2e

Browse files
Allow the dispatcher concurrency to be overridden per channel (#1669)
* Change consumer dispatch concurrency value to a `ushort` * Create `CreateChannelAsync` extension to use `DefaultConsumerDispatchConcurrency` * Standardize on `ConsumerDispatchConcurrency` name
1 parent 2cc12a5 commit 624cf2e

16 files changed

+60
-31
lines changed

projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class BasicDeliverConsumerDispatching : ConsumerDispatcherBase
3535
public int Count { get; set; }
3636

3737
[Params(1, 2)]
38-
public int Concurrency { get; set; }
38+
public ushort Concurrency { get; set; }
3939

4040
[GlobalSetup(Target = nameof(AsyncConsumerDispatcher))]
4141
public async Task SetUpAsyncConsumer()

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ RabbitMQ.Client.ConnectionFactory.ClientProperties.set -> void
189189
RabbitMQ.Client.ConnectionFactory.ClientProvidedName.get -> string
190190
RabbitMQ.Client.ConnectionFactory.ClientProvidedName.set -> void
191191
RabbitMQ.Client.ConnectionFactory.ConnectionFactory() -> void
192-
RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.get -> int
192+
RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.get -> ushort
193193
RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.set -> void
194194
RabbitMQ.Client.ConnectionFactory.ContinuationTimeout.get -> System.TimeSpan
195195
RabbitMQ.Client.ConnectionFactory.ContinuationTimeout.set -> void
@@ -472,7 +472,7 @@ RabbitMQ.Client.IConnectionFactory.ClientProperties.get -> System.Collections.Ge
472472
RabbitMQ.Client.IConnectionFactory.ClientProperties.set -> void
473473
RabbitMQ.Client.IConnectionFactory.ClientProvidedName.get -> string
474474
RabbitMQ.Client.IConnectionFactory.ClientProvidedName.set -> void
475-
RabbitMQ.Client.IConnectionFactory.ConsumerDispatchConcurrency.get -> int
475+
RabbitMQ.Client.IConnectionFactory.ConsumerDispatchConcurrency.get -> ushort
476476
RabbitMQ.Client.IConnectionFactory.ConsumerDispatchConcurrency.set -> void
477477
RabbitMQ.Client.IConnectionFactory.ContinuationTimeout.get -> System.TimeSpan
478478
RabbitMQ.Client.IConnectionFactory.ContinuationTimeout.set -> void
@@ -700,7 +700,6 @@ readonly RabbitMQ.Client.ConnectionConfig.AuthMechanisms -> System.Collections.G
700700
readonly RabbitMQ.Client.ConnectionConfig.ClientProperties -> System.Collections.Generic.IDictionary<string, object>
701701
readonly RabbitMQ.Client.ConnectionConfig.ClientProvidedName -> string
702702
readonly RabbitMQ.Client.ConnectionConfig.ContinuationTimeout -> System.TimeSpan
703-
readonly RabbitMQ.Client.ConnectionConfig.DispatchConsumerConcurrency -> int
704703
readonly RabbitMQ.Client.ConnectionConfig.HandshakeContinuationTimeout -> System.TimeSpan
705704
readonly RabbitMQ.Client.ConnectionConfig.HeartbeatInterval -> System.TimeSpan
706705
readonly RabbitMQ.Client.ConnectionConfig.MaxChannelCount -> ushort
@@ -826,7 +825,6 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
826825
~RabbitMQ.Client.IChannel.WaitForConfirmsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<bool>
827826
~RabbitMQ.Client.IChannel.WaitForConfirmsOrDieAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
828827
~RabbitMQ.Client.IConnection.CloseAsync(ushort reasonCode, string reasonText, System.TimeSpan timeout, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
829-
~RabbitMQ.Client.IConnection.CreateChannelAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel>
830828
~RabbitMQ.Client.IConnection.UpdateSecretAsync(string newSecret, string reason, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
831829
~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
832830
~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(System.Collections.Generic.IEnumerable<RabbitMQ.Client.AmqpTcpEndpoint> endpoints, string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
@@ -894,3 +892,7 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client
894892
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
895893
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
896894
RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
895+
const RabbitMQ.Client.ConnectionFactory.DefaultConsumerDispatchConcurrency = 1 -> ushort
896+
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
897+
static RabbitMQ.Client.IConnectionExtensions.CreateChannelAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
898+
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort

projects/RabbitMQ.Client/client/api/ConnectionConfig.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public sealed class ConnectionConfig
139139
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
140140
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
141141
/// </summary>
142-
public readonly int DispatchConsumerConcurrency;
142+
public readonly ushort ConsumerDispatchConcurrency;
143143

144144
internal readonly Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> FrameHandlerFactoryAsync;
145145

@@ -150,7 +150,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
150150
ushort maxChannelCount, uint maxFrameSize, uint maxInboundMessageBodySize, bool topologyRecoveryEnabled,
151151
TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler,
152152
TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout,
153-
int dispatchConsumerConcurrency, Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
153+
ushort consumerDispatchConcurrency, Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
154154
{
155155
VirtualHost = virtualHost;
156156
UserName = userName;
@@ -170,7 +170,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
170170
ContinuationTimeout = continuationTimeout;
171171
HandshakeContinuationTimeout = handshakeContinuationTimeout;
172172
RequestedConnectionTimeout = requestedConnectionTimeout;
173-
DispatchConsumerConcurrency = dispatchConsumerConcurrency;
173+
ConsumerDispatchConcurrency = consumerDispatchConcurrency;
174174
FrameHandlerFactoryAsync = frameHandlerFactoryAsync;
175175
}
176176
}

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ namespace RabbitMQ.Client
9292
///hosts with an empty name are not addressable. </para></remarks>
9393
public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
9494
{
95+
/// <summary>
96+
/// Default value for consumer dispatch concurrency.
97+
/// </summary>
98+
public const ushort DefaultConsumerDispatchConcurrency = 1;
99+
95100
/// <summary>
96101
/// Default value for the desired maximum channel number. Default: 2047.
97102
/// </summary>
@@ -175,7 +180,7 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
175180
/// </summary>
176181
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
177182
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
178-
public int ConsumerDispatchConcurrency { get; set; } = 1;
183+
public ushort ConsumerDispatchConcurrency { get; set; } = DefaultConsumerDispatchConcurrency;
179184

180185
/// <summary>The host to connect to.</summary>
181186
public string HostName { get; set; } = "localhost";

projects/RabbitMQ.Client/client/api/IConnection.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,17 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
234234
/// <summary>
235235
/// Asynchronously create and return a fresh channel, session, and channel.
236236
/// </summary>
237+
/// <param name="consumerDispatchConcurrency">
238+
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
239+
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
240+
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
241+
///
242+
/// Defaults to <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>.
243+
///
244+
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
245+
/// In addition to that consumers need to be thread/concurrency safe.
246+
/// </param>
237247
/// <param name="cancellationToken">Cancellation token</param>
238-
Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default);
239-
248+
Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default);
240249
}
241250
}

projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ namespace RabbitMQ.Client
77
{
88
public static class IConnectionExtensions
99
{
10+
/// <summary>
11+
/// Asynchronously create and return a fresh channel, session, and channel.
12+
/// </summary>
13+
public static Task<IChannel> CreateChannelAsync(this IConnection connection, CancellationToken cancellationToken = default) =>
14+
connection.CreateChannelAsync(ConnectionFactory.DefaultConsumerDispatchConcurrency, cancellationToken);
15+
1016
/// <summary>
1117
/// Asynchronously close this connection and all its channels.
1218
/// </summary>

projects/RabbitMQ.Client/client/api/IConnectionFactory.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,6 @@ Task<IConnection> CreateConnectionAsync(IEnumerable<AmqpTcpEndpoint> endpoints,
194194
/// </summary>
195195
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
196196
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
197-
int ConsumerDispatchConcurrency { get; set; }
197+
ushort ConsumerDispatchConcurrency { get; set; }
198198
}
199199
}

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ namespace RabbitMQ.Client.Framing.Impl
3838
{
3939
internal class Channel : ChannelBase
4040
{
41-
public Channel(ConnectionConfig config, ISession session) : base(config, session)
41+
public Channel(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
42+
: base(config, session, consumerDispatchConcurrency)
4243
{
4344
}
4445

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
5252
private bool _usesPublisherConfirms;
5353
private bool _tracksPublisherConfirmations;
5454
private bool _usesTransactions;
55+
private ushort _consumerDispatchConcurrency;
5556

5657
internal IConsumerDispatcher ConsumerDispatcher => InnerChannel.ConsumerDispatcher;
5758

@@ -70,10 +71,12 @@ public TimeSpan ContinuationTimeout
7071
set => InnerChannel.ContinuationTimeout = value;
7172
}
7273

73-
public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel innerChannel)
74+
public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel innerChannel,
75+
ushort consumerDispatchConcurrency)
7476
{
7577
_connection = conn;
7678
_innerChannel = innerChannel;
79+
_consumerDispatchConcurrency = consumerDispatchConcurrency;
7780
}
7881

7982
public event EventHandler<BasicAckEventArgs> BasicAcks
@@ -160,7 +163,8 @@ internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection con
160163

161164
_connection = conn;
162165

163-
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(cancellationToken)
166+
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(_consumerDispatchConcurrency,
167+
cancellationToken: cancellationToken)
164168
.ConfigureAwait(false);
165169
newChannel.TakeOver(_innerChannel);
166170

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ private async ValueTask RecoverExchangesAsync(IConnection connection,
295295
{
296296
try
297297
{
298-
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false))
298+
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false))
299299
{
300300
await recordedExchange.RecoverAsync(ch, cancellationToken)
301301
.ConfigureAwait(false);
@@ -347,7 +347,7 @@ private async Task RecoverQueuesAsync(IConnection connection,
347347
try
348348
{
349349
string newName = string.Empty;
350-
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false))
350+
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false))
351351
{
352352
newName = await recordedQueue.RecoverAsync(ch, cancellationToken)
353353
.ConfigureAwait(false);
@@ -458,7 +458,7 @@ private async ValueTask RecoverBindingsAsync(IConnection connection,
458458
{
459459
try
460460
{
461-
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false))
461+
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false))
462462
{
463463
await binding.RecoverAsync(ch, cancellationToken)
464464
.ConfigureAwait(false);

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,11 @@ public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer
173173

174174
public IProtocol Protocol => Endpoint.Protocol;
175175

176-
public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(CancellationToken cancellationToken)
176+
public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(ushort consumerDispatchConcurrency,
177+
CancellationToken cancellationToken = default)
177178
{
178179
ISession session = InnerConnection.CreateSession();
179-
var result = new RecoveryAwareChannel(_config, session);
180+
var result = new RecoveryAwareChannel(_config, session, consumerDispatchConcurrency);
180181
return (RecoveryAwareChannel)await result.OpenAsync(cancellationToken).ConfigureAwait(false);
181182
}
182183

@@ -239,12 +240,12 @@ await CloseInnerConnectionAsync()
239240
}
240241
}
241242

242-
public async Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default)
243+
public async Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
243244
{
244245
EnsureIsOpen();
245-
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cancellationToken)
246+
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(consumerDispatchConcurrency, cancellationToken)
246247
.ConfigureAwait(false);
247-
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel);
248+
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, consumerDispatchConcurrency);
248249
await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
249250
.ConfigureAwait(false);
250251
return channel;

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,10 @@ internal abstract class ChannelBase : IChannel, IRecoverable
7373

7474
internal readonly IConsumerDispatcher ConsumerDispatcher;
7575

76-
protected ChannelBase(ConnectionConfig config, ISession session)
76+
protected ChannelBase(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
7777
{
7878
ContinuationTimeout = config.ContinuationTimeout;
79-
ConsumerDispatcher = new AsyncConsumerDispatcher(this, config.DispatchConsumerConcurrency);
79+
ConsumerDispatcher = new AsyncConsumerDispatcher(this, consumerDispatchConcurrency);
8080
Action<Exception, string> onException = (exception, context) =>
8181
OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
8282
_basicAcksWrapper = new EventingWrapper<BasicAckEventArgs>("OnBasicAck", onException);

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler)
7272

7373
_sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize);
7474
_session0 = new MainSession(this, config.MaxInboundMessageBodySize);
75-
_channel0 = new Channel(_config, _session0); ;
75+
_channel0 = new Channel(_config, _session0, ConnectionFactory.DefaultConsumerDispatchConcurrency); ;
7676

7777
ClientProperties = new Dictionary<string, object?>(_config.ClientProperties)
7878
{
@@ -253,11 +253,11 @@ await CloseAsync(ea, true,
253253
}
254254
}
255255

256-
public Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default)
256+
public Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
257257
{
258258
EnsureIsOpen();
259259
ISession session = CreateSession();
260-
var channel = new Channel(_config, session);
260+
var channel = new Channel(_config, session, consumerDispatchConcurrency);
261261
return channel.OpenAsync(cancellationToken);
262262
}
263263

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ namespace RabbitMQ.Client.ConsumerDispatching
88
{
99
internal sealed class AsyncConsumerDispatcher : ConsumerDispatcherChannelBase
1010
{
11-
internal AsyncConsumerDispatcher(ChannelBase channel, int concurrency)
11+
internal AsyncConsumerDispatcher(ChannelBase channel, ushort concurrency)
1212
: base(channel, concurrency)
1313
{
1414
}

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase,
1717
private bool _quiesce = false;
1818
private bool _disposed;
1919

20-
internal ConsumerDispatcherChannelBase(ChannelBase channel, int concurrency)
20+
internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency)
2121
{
2222
_channel = channel;
2323
var workChannel = Channel.CreateUnbounded<WorkStruct>(new UnboundedChannelOptions

projects/RabbitMQ.Client/client/impl/RecoveryAwareChannel.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ namespace RabbitMQ.Client.Impl
3737
{
3838
internal sealed class RecoveryAwareChannel : Channel
3939
{
40-
public RecoveryAwareChannel(ConnectionConfig config, ISession session) : base(config, session)
40+
public RecoveryAwareChannel(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
41+
: base(config, session, consumerDispatchConcurrency)
4142
{
4243
ActiveDeliveryTagOffset = 0;
4344
MaxSeenDeliveryTag = 0;

0 commit comments

Comments
 (0)