Skip to content

[fix][client] Close orphan producer or consumer when the creation is interrupted #24539

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

BewareMyPower
Copy link
Contributor

@BewareMyPower BewareMyPower commented Jul 21, 2025

Motivation

The producer or reader creation has the same issue like what #24100 fixes for consumer: when the thread is interrupted, the returned future will complete exceptionally with the wrapped InterruptedException, but the asynchronous creation will continue in the background.

Modifications

  • Add a common FutureUtils.wait method to call a callback on the completed result.
  • Apply the method above to the creation of producer, consumer and reader.
  • Add ClientInterruptTest to cover the fixes above and remove duplicated ConsumerCloseTest#testInterruptedWhenCreateConsumer.

It should be noted that the complicated fix approach of #24100 is replaced by the solution in this PR.

This PR also reverts the change that the thread interrupt flag is set, which should be the caller's responsibility to do that. This is actually a wrong behavior for many existing synchronous APIs.

Without this change, the 2nd creation of the consumer in the code snippet will fail when the 1st creation is interrupted:

try {
    @Cleanup final var consumer = client.newConsumer().topic("tp").subscriptionName("sub")
            .subscribe();
    fail();
} catch (PulsarClientException e) {
    assertTrue(e.getCause() instanceof InterruptedException);
}
try {
    @Cleanup final var consumer = client.newConsumer().topic("another-tp").subscriptionName("sub")
            .subscribe();
} catch (PulsarClientException e) {
    fail();
}

If we want to make the 2nd call fail, the caller should handle the InterruptException like:

} catch (PulsarClientException e) {
    if (e.getCause() instanceof InterruptedException) {
        Thread.currentThread().interrupt();
    }
}

In short,

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jul 21, 2025
@BewareMyPower BewareMyPower self-assigned this Jul 21, 2025
@BewareMyPower BewareMyPower added this to the 4.1.0 milestone Jul 21, 2025
@BewareMyPower
Copy link
Contributor Author

Actually @nodece raised the opinion about thread interruption here, but it's not addressed, maybe due to the misunderstanding. #24100 (review)

IMO, the key point should be:

When using the Pulsar Client/Admin, the users should not call the thread interrupt.

The client SDK should guarantee when the thread is interrupted, there will be no resource leak. But the SDK should not set the thread interruption flag, which might lead to unexpected behavior.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR also reverts the change that the thread interrupt flag is set, which should be the caller's responsibility to do that. This is actually a wrong behavior for many existing synchronous APIs.

I'm not sure if I agree on this. Whenever code in some method calls another method that throws InterruptedException and it is not propagated to the caller (the method doesn't throw InterruptedException in the method signature), it should set the interrupted flag with Thread.currentThread().interrupt() (possibly first handling the exception and then setting the interrupted flag). This is the way how InterruptedException should be handled in Java according to "Java Concurrency in Practice". More details in https://stackoverflow.com/questions/3976344/handling-interruptedexception-in-java, also with the reference to "Java Concurrency in Practice".

For cancelling asynchronous tasks where a CompletableFuture is the result, .cancel(true) should be called and there should be an action that is done when the future is cancelled. There's a helper FutureUtils.whenCancelledOrTimedOut in Pulsar's code base for this purpose. This will only work if the cancel method is called on the instance where the Runnable has been "attached" with FutureUtils.whenCancelledOrTimedOut. (CompletableFuture doesn't have a way to propagate signals "upstream" like there is in ReactiveStreams)

@lhotari
Copy link
Member

lhotari commented Jul 21, 2025

The client SDK should guarantee when the thread is interrupted, there will be no resource leak. But the SDK should not set the thread interruption flag, which might lead to unexpected behavior.

As mention in my previous comment, I disagree on this point.

@BewareMyPower
Copy link
Contributor Author

Whenever code in some method calls another method that throws InterruptedException and it is not propagated to the caller (the method doesn't throw InterruptedException in the method signature)

The key point is that from my perspective, the InterruptedException is propagated to the caller from the underlying of PulsarClientException. It's an issue with the poor JavaDocs for client APIs. The synchronous API should note how should the caller handle the PulsarClientException except for simply logging it.

The real use case is that I'm using a single thread executor to perform two types of tasks:

  • LOAD: create a reader and read a topic from a given position to load key-values of this topic
  • UNLOAD: remove the key-values from this topic

They're synchronous tasks executed in a single thread, executed by ExecutorService#submit. Before scheduling each task, I will cancel the previous task (typically LOAD because it's blocking) by calling .cancel(true) and handling the possible InterruptedException after unwrapping PulsarClientException. If the interruption flag is set internally by the client SDK, there is no graceful way to cancel this flag because this thread could schedule new tasks in future.

You can see https://gist.github.com/BewareMyPower/fbd1e67fa947967c71b4722289ba7cea for reference.

Since I used Reader rather than Consumer, I didn't suffer the breaking change brought by #24100.

Since this PR uses a common method to handle interruption for creation of producer, reader and consumer. I have to choose whether to set the interruption flag. I chose NOT from the perspective of the downstream application developer.

@BewareMyPower
Copy link
Contributor Author

But the SDK should not set the thread interruption flag, which might lead to unexpected behavior.

I'm correcting the description to "in certain cases".

For example, setting the interruption flag in Consumer#receive makes sense. Because once it's interrupted, it means the internal blocking queue has been interrupted. There is no way from the caller side to continue calling receive safely, so the client SDK must set the interruption flag to fail all subsequent receive calls in the same thread.

However, for this case, take consumer for example, if the creation failed:

    @Cleanup final var consumer = client.newConsumer().topic("tp").subscriptionName("sub")
            .subscribe();

The consumer variable will never be touched by the caller. Setting the interruption flag could fail all subsequent blocking calls that might throw an InterruptedException (e.g. Thread.sleep), this would be really confusing.

BTW, @shibd I'm wondering that if you know the impact when you added Thread.currentThread().interrupt(); in #24100?

@shibd
Copy link
Member

shibd commented Jul 21, 2025

hi, @BewareMyPower @lhotari. Thank you all for discussing this.

BTW, @shibd I'm wondering that if you know the impact when you added Thread.currentThread().interrupt(); in #24100?

Initially, I didn't think through the use cases carefully and just followed the best practice, as mentioned here

I've reconsidered this case, and I can think of a scenario like this:

Suppose a user tries to create a consumer twice within a single thread. If the creation of the first consumer fails, the code then attempts to create the second one and subsequently receive messages.

If we don't restore the interrupt flag, the creation of the second consumer will succeed. This is not the expected behavior, because the user's intent was to stop the entire thread.

    @Test
    public void testInterruptedWhenCreateConsumer() throws InterruptedException {

        String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
        String subName = "test-sub";
        String mlCursorPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/"
                + TopicName.get(tpName).getPersistenceNamingEncoding() + "/" + subName;

        // Make create cursor delay 1s
        CountDownLatch topicLoadLatch = new CountDownLatch(1);
        for (int i = 0; i < 5; i++) {
            mockZooKeeper.delay(1000, (op, path) -> {
                if (mlCursorPath.equals(path)) {
                    topicLoadLatch.countDown();
                    return true;
                }
                return false;
            });
        }

        Thread startConsumer = new Thread(() -> {
            // 1. First, try to create consumer
            Consumer<byte[]> consumer = null;
            try {
                 consumer = pulsarClient.newConsumer()
                        .topic(tpName)
                        .subscriptionName(subName)
                        .subscribe();
            } catch (PulsarClientException e) {
                log.error("failed to create consumer");
            }

            log.info("Thread stats {}", Thread.currentThread().isInterrupted());
            
            // 2. if creating pulsar consumer failed, try to create consumer again
            try {
                if (consumer == null) {
                    consumer = pulsarClient.newConsumer()
                            .topic(tpName)
                            .subscriptionName(subName)
                            .subscribe();
                }
                // receive message and do somthing
            } catch (PulsarClientException e) {
                throw new RuntimeException(e);
            }
            
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Message<byte[]> receive = consumer.receive();
                    log.info("Receive message {}", receive);
                } catch (PulsarClientException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        
        startConsumer.start();
        topicLoadLatch.await();
        // user try to interrupt the thread
        startConsumer.interrupt();

        PulsarClientImpl clientImpl = (PulsarClientImpl) pulsarClient;
        Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
            Assert.assertEquals(clientImpl.consumersCount(), 0);
        });
        log.info("Thread is alive {}", startConsumer.isAlive());
    }

@BewareMyPower
Copy link
Contributor Author

BewareMyPower commented Jul 21, 2025

the user's intent was to stop the entire thread.

You said, it's the user's intent. Then user can manually set the thread's interruption flag to fail the subsequent receive calls:

            // 1. First, try to create consumer
            Consumer<byte[]> consumer = null;
            try {
                /* ... */
            } catch (PulsarClientException e) {
    // HERE
    if (e.getCause() instanceof InterruptedException) {
        Thread.currentThread().interrupt();
    }
            }

Why does it have to be the client SDK that does that? If the user don't want to stop the thread before, there is no way unless a very ugly solution like:

            // 1. First, try to create consumer
            Consumer<byte[]> consumer = null;
            try {
                /* ... */
            } catch (PulsarClientException e) {
        // HERE
        if (e.getCause() instanceof InterruptedException) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException ignored) {
            }
        }
            }

The code above is also anti-intuitive because when an InterruptedException is thrown, it usually means the thread interruption flag is cleared. See docs of Thread.sleep:

     * @throws  InterruptedException
     *          if any thread has interrupted the current thread. The
     *          <i>interrupted status</i> of the current thread is
     *          cleared when this exception is thrown.

My previous comment already mentions this example. This is a real world case.

The key point is, JDK provides the ability to set the flag by Thread#interrupt but does not provide the ability to clear the flag directly (as my example shows, users can unset the flag by manually clearing the InterruptedException). When the creation of a consumer fails, the PulsarClient's state is still healthy. There is no reason to fail the subsequent creation.

Copy link
Member

@shibd shibd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reconsidered your example, and I now understand that in a real-world use case, it is indeed difficult for the caller to handle the situation if we set the interrupt flag internally.

Maybe in the future, we should consider explicitly declaring that our methods throw InterruptedException instead of wrapping it in a PulsarClientException.

For now, I agree with the current approach of not setting the interrupt flag.

@lhotari
Copy link
Member

lhotari commented Jul 22, 2025

The key point is, JDK provides the ability to set the flag by Thread#interrupt but does not provide the ability to clear the flag directly (as my example shows, users can unset the flag by manually clearing the InterruptedException).

@BewareMyPower The interrupt flag can be cleared with the interrupted method in java.lang.Thread. It's not very intuitively named method.

When the creation of a consumer fails, the PulsarClient's state is still healthy. There is no reason to fail the subsequent creation.

When the current thread is interrupted, there's some sort of shutdown in progress. I agree that it's very context specific whether subsequent calls should fail or not. It depends a lot on which thread is in question. In most cases, when a thread's interrupted flag is set, that thread should finish up work after cleanup. Since when a thread is interrupted, there's some sort of shutdown in progress. Typical cases are that a task running in a ExecutorService is cancelled, or the complete JVM is shutdown and it's part of the shutdown sequence that threads get interrupted.
If there's a need to keep on calling blocking methods to cleanup, there are ways to handle that.
For example, Guava contains com.google.common.util.concurrent.Uninterruptibles utilities which will run the work in the current thread after resetting the possible interrupted status and then restore the interrupted status.

@lhotari
Copy link
Member

lhotari commented Jul 22, 2025

Whenever code in some method calls another method that throws InterruptedException and it is not propagated to the caller (the method doesn't throw InterruptedException in the method signature)

The key point is that from my perspective, the InterruptedException is propagated to the caller from the underlying of PulsarClientException. It's an issue with the poor JavaDocs for client APIs. The synchronous API should note how should the caller handle the PulsarClientException except for simply logging it.

The real use case is that I'm using a single thread executor to perform two types of tasks:

  • LOAD: create a reader and read a topic from a given position to load key-values of this topic
  • UNLOAD: remove the key-values from this topic

They're synchronous tasks executed in a single thread, executed by ExecutorService#submit. Before scheduling each task, I will cancel the previous task (typically LOAD because it's blocking) by calling .cancel(true) and handling the possible InterruptedException after unwrapping PulsarClientException. If the interruption flag is set internally by the client SDK, there is no graceful way to cancel this flag because this thread could schedule new tasks in future.

You can see https://gist.github.com/BewareMyPower/fbd1e67fa947967c71b4722289ba7cea for reference.

Since I used Reader rather than Consumer, I didn't suffer the breaking change brought by #24100.

Since this PR uses a common method to handle interruption for creation of producer, reader and consumer. I have to choose whether to set the interruption flag. I chose NOT from the perspective of the downstream application developer.

Java Concurrency in Practice is pretty clear about the correct handling of InterruptedException:
image

@BewareMyPower
Copy link
Contributor Author

Yes, the key point is that InterruptedException has already been propagated as the cause of the PulsarClientException. The issue is that it's not documented to be a formal behavior, but all methods, including the method that has set the interruption flag, all have the same behavior.

@codecov-commenter
Copy link

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 74.28%. Comparing base (bbc6224) to head (25a4687).
Report is 1210 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24539      +/-   ##
============================================
+ Coverage     73.57%   74.28%   +0.70%     
- Complexity    32624    32905     +281     
============================================
  Files          1877     1869       -8     
  Lines        139502   146081    +6579     
  Branches      15299    16760    +1461     
============================================
+ Hits         102638   108510    +5872     
- Misses        28908    28953      +45     
- Partials       7956     8618     +662     
Flag Coverage Δ
inttests 26.72% <57.14%> (+2.14%) ⬆️
systests 23.34% <57.14%> (-0.99%) ⬇️
unittests 73.77% <100.00%> (+0.92%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...apache/pulsar/client/impl/ConsumerBuilderImpl.java 86.83% <100.00%> (-0.11%) ⬇️
...apache/pulsar/client/impl/ProducerBuilderImpl.java 83.07% <100.00%> (+0.50%) ⬆️
...g/apache/pulsar/client/impl/ReaderBuilderImpl.java 87.23% <100.00%> (+2.12%) ⬆️
...java/org/apache/pulsar/common/util/FutureUtil.java 74.21% <100.00%> (-0.33%) ⬇️

... and 1096 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@BewareMyPower
Copy link
Contributor Author

Let me merge this PR first, since the key point of this PR is to close orphan producer or consumer.

But how to handle the interruption will be another interesting topic. This is a part that previous PRs hardly touched and discussed in details. I'm preparing a PIP at the moment. We can discuss more after it's out.

@BewareMyPower BewareMyPower merged commit de33f3b into apache:master Jul 22, 2025
68 of 71 checks passed
@BewareMyPower BewareMyPower deleted the bewaremypower/handle-interrupt-sync branch July 22, 2025 13:09
lhotari pushed a commit that referenced this pull request Jul 22, 2025
lhotari pushed a commit that referenced this pull request Jul 23, 2025
lhotari pushed a commit that referenced this pull request Jul 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants