From a3d654c963342ca4d9622faf10121fbbdc4279bb Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Mon, 16 Mar 2026 20:04:28 +0800 Subject: [PATCH 1/2] fix crash --- lib/ConsumerImpl.cc | 19 +++++++++--- lib/ExecutorService.cc | 7 ++++- tests/ReaderTest.cc | 46 ++++++++++++++++++++++++++++ tests/RetryableOperationCacheTest.cc | 12 ++++++++ 4 files changed, 79 insertions(+), 5 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 757b6e84..06917e60 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -622,6 +622,11 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: if (state == Closing || state == Closed) { return; } + if (!listenerExecutor_) { + LOG_ERROR(getName() << " listenerExecutor_ is null, discarding message to avoid null dereference"); + increaseAvailablePermits(cnx); + return; + } uint32_t numOfMessageReceived = m.impl_->metadata.num_messages_in_batch(); if (ackGroupingTrackerPtr_->isDuplicate(m.getMessageId())) { LOG_DEBUG(getName() << " Ignoring message as it was ACKed earlier by same consumer."); @@ -663,8 +668,10 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: return; } // Trigger message listener callback in a separate thread - while (numOfMessageReceived--) { - listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr())); + if (listenerExecutor_) { + while (numOfMessageReceived--) { + listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr())); + } } } } @@ -713,8 +720,12 @@ void ConsumerImpl::executeNotifyCallback(Message& msg) { // has pending receive, direct callback. if (asyncReceivedWaiting) { - listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback, - get_shared_this_ptr(), ResultOk, msg, callback)); + if (listenerExecutor_) { + listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback, + get_shared_this_ptr(), ResultOk, msg, callback)); + } else { + notifyPendingReceivedCallback(ResultOk, msg, callback); + } return; } diff --git a/lib/ExecutorService.cc b/lib/ExecutorService.cc index 99e2393f..45f095ea 100644 --- a/lib/ExecutorService.cc +++ b/lib/ExecutorService.cc @@ -18,6 +18,8 @@ */ #include "ExecutorService.h" +#include + #include "LogUtils.h" #include "TimeUtils.h" DECLARE_LOG_OBJECT() @@ -130,9 +132,12 @@ void ExecutorService::postWork(std::function task) { ASIO::post(io_c ///////////////////// ExecutorServiceProvider::ExecutorServiceProvider(int nthreads) - : executors_(nthreads), executorIdx_(0), mutex_() {} + : executors_(std::max(1, nthreads)), executorIdx_(0), mutex_() {} ExecutorServicePtr ExecutorServiceProvider::get(size_t idx) { + if (executors_.empty()) { + return nullptr; + } idx %= executors_.size(); Lock lock(mutex_); diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index ba5b2a9f..a75d3beb 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -18,6 +18,7 @@ */ #include #include +#include #include #include @@ -955,5 +956,50 @@ TEST_F(ReaderSeekTest, testSeekInclusiveChunkMessage) { assertStartMessageId(false, secondMsgId); } +// Regression test for segfault when Reader is used with messageListenerThreads=0. +// Verifies ExecutorServiceProvider(0) does not cause undefined behavior and +// ConsumerImpl::messageReceived does not dereference null listenerExecutor_. +TEST(ReaderTest, testReaderWithZeroMessageListenerThreads) { + ClientConfiguration clientConf; + clientConf.setMessageListenerThreads(0); + Client client(serviceUrl, clientConf); + + const std::string topicName = + "testReaderWithZeroMessageListenerThreads-" + std::to_string(time(nullptr)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + + ReaderConfiguration readerConf; + Reader reader; + ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader)); + + constexpr int numMessages = 5; + for (int i = 0; i < numMessages; i++) { + Message msg = MessageBuilder().setContent("msg-" + std::to_string(i)).build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + } + + int received = 0; + for (int i = 0; i < numMessages + 2; i++) { + bool hasMessageAvailable = false; + ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); + if (!hasMessageAvailable) { + break; + } + Message msg; + Result res = reader.readNext(msg, 3000); + ASSERT_EQ(ResultOk, res) << "readNext failed at iteration " << i; + std::string content = msg.getDataAsString(); + EXPECT_EQ("msg-" + std::to_string(received), content); + ++received; + } + EXPECT_EQ(received, numMessages); + + producer.close(); + reader.close(); + client.close(); +} + INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false)); INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true, false)); diff --git a/tests/RetryableOperationCacheTest.cc b/tests/RetryableOperationCacheTest.cc index c9b8a1d7..1b2d28c5 100644 --- a/tests/RetryableOperationCacheTest.cc +++ b/tests/RetryableOperationCacheTest.cc @@ -22,6 +22,7 @@ #include #include +#include "lib/ExecutorService.h" #include "lib/RetryableOperationCache.h" namespace pulsar { @@ -82,6 +83,17 @@ class RetryableOperationCacheTest : public ::testing::Test { using namespace pulsar; +// Regression test: ExecutorServiceProvider(0) must not cause undefined behavior (e.g. idx % 0). +// After fix, nthreads is clamped to at least 1, so get() returns a valid executor. +TEST(ExecutorServiceProviderTest, ZeroThreadsReturnsValidExecutor) { + ExecutorServiceProviderPtr provider = std::make_shared(0); + for (int i = 0; i < 3; i++) { + ExecutorServicePtr executor = provider->get(); + ASSERT_NE(executor, nullptr) << "get() must not return null when created with 0 threads (clamped to 1)"; + } + provider->close(); +} + TEST_F(RetryableOperationCacheTest, testRetry) { auto cache = RetryableOperationCache::create(provider_, std::chrono::seconds(30)); for (int i = 0; i < 10; i++) { From 663d3b3eb137ac7ca9b028dfce960cfd78502796 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Tue, 17 Mar 2026 09:44:45 +0800 Subject: [PATCH 2/2] format --- lib/ConsumerImpl.cc | 3 ++- tests/ReaderTest.cc | 3 +-- tests/RetryableOperationCacheTest.cc | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 06917e60..f5114064 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -670,7 +670,8 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: // Trigger message listener callback in a separate thread if (listenerExecutor_) { while (numOfMessageReceived--) { - listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr())); + listenerExecutor_->postWork( + std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr())); } } } diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index a75d3beb..d8c5317e 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -964,8 +964,7 @@ TEST(ReaderTest, testReaderWithZeroMessageListenerThreads) { clientConf.setMessageListenerThreads(0); Client client(serviceUrl, clientConf); - const std::string topicName = - "testReaderWithZeroMessageListenerThreads-" + std::to_string(time(nullptr)); + const std::string topicName = "testReaderWithZeroMessageListenerThreads-" + std::to_string(time(nullptr)); Producer producer; ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); diff --git a/tests/RetryableOperationCacheTest.cc b/tests/RetryableOperationCacheTest.cc index 1b2d28c5..2daaf3f3 100644 --- a/tests/RetryableOperationCacheTest.cc +++ b/tests/RetryableOperationCacheTest.cc @@ -89,7 +89,8 @@ TEST(ExecutorServiceProviderTest, ZeroThreadsReturnsValidExecutor) { ExecutorServiceProviderPtr provider = std::make_shared(0); for (int i = 0; i < 3; i++) { ExecutorServicePtr executor = provider->get(); - ASSERT_NE(executor, nullptr) << "get() must not return null when created with 0 threads (clamped to 1)"; + ASSERT_NE(executor, nullptr) + << "get() must not return null when created with 0 threads (clamped to 1)"; } provider->close(); }