Skip to content

[Bug] unacked message count is zero when using exclusive subscription. #24159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
3 tasks done
nodece opened this issue Apr 8, 2025 · 2 comments
Open
3 tasks done

[Bug] unacked message count is zero when using exclusive subscription. #24159

nodece opened this issue Apr 8, 2025 · 2 comments
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@nodece
Copy link
Member

nodece commented Apr 8, 2025

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

Pulsar: master

Minimal reproduce step

@Test
    public void testUnackedMessages() throws Exception {
        String topicName = "testUnackedMessages";
        String subscriptionName = "my-sub";
        String serviceUrl = "pulsar://localhost:6650";

        @Cleanup PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();

        @Cleanup Consumer<byte[]> consumer =
                pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
                        .subscriptionType(SubscriptionType.Exclusive).messageListener((c, message) -> {
                            // noop
                        }).subscribe();

        @Cleanup Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
        producer.send("1".getBytes());

        TopicStats stats = admin.topics().getStats(topicName);
        assertThat(stats.getSubscriptions().get(subscriptionName).getUnackedMessages()).isNotEqualTo(0);
    }

What did you expect to see?

unackedMessages is not equal to 0.

What did you see instead?

unackedMessages is equal to 0.

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@nodece nodece added the type/bug The PR fixed a bug or issue reported a bug label Apr 8, 2025
@fretory
Copy link

fretory commented Apr 14, 2025

It seems that exclusive subscriptions lack support for counting unacknowledged messages.

 if (Subscription.isIndividualAckMode(subType)) {
            if (dispatcher instanceof AbstractPersistentDispatcherMultipleConsumers) {
                AbstractPersistentDispatcherMultipleConsumers d =
                        (AbstractPersistentDispatcherMultipleConsumers) dispatcher;
                subStats.unackedMessages = d.getTotalUnackedMessages();
   ....
            }
        }

https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L1332

    static boolean isIndividualAckMode(SubType subType) {
        return SubType.Shared.equals(subType) || SubType.Key_Shared.equals(subType);
    }

https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java#L134 #

@nodece
Copy link
Member Author

nodece commented Apr 14, 2025

You are right. By the way, we should also implement org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer#addUnAckedMessages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

2 participants