Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,11 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
if (state == Closing || state == Closed) {
return;
}
if (!listenerExecutor_) {
LOG_ERROR(getName() << " listenerExecutor_ is null, discarding message to avoid null dereference");
increaseAvailablePermits(cnx);
return;
}
uint32_t numOfMessageReceived = m.impl_->metadata.num_messages_in_batch();
if (ackGroupingTrackerPtr_->isDuplicate(m.getMessageId())) {
LOG_DEBUG(getName() << " Ignoring message as it was ACKed earlier by same consumer.");
Expand Down Expand Up @@ -663,8 +668,11 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
return;
}
// Trigger message listener callback in a separate thread
while (numOfMessageReceived--) {
listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr()));
if (listenerExecutor_) {
while (numOfMessageReceived--) {
listenerExecutor_->postWork(
std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr()));
}
}
}
}
Expand Down Expand Up @@ -713,8 +721,12 @@ void ConsumerImpl::executeNotifyCallback(Message& msg) {

// has pending receive, direct callback.
if (asyncReceivedWaiting) {
listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
get_shared_this_ptr(), ResultOk, msg, callback));
if (listenerExecutor_) {
listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
get_shared_this_ptr(), ResultOk, msg, callback));
} else {
notifyPendingReceivedCallback(ResultOk, msg, callback);
}
return;
}

Expand Down
7 changes: 6 additions & 1 deletion lib/ExecutorService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
#include "ExecutorService.h"

#include <algorithm>

#include "LogUtils.h"
#include "TimeUtils.h"
DECLARE_LOG_OBJECT()
Expand Down Expand Up @@ -130,9 +132,12 @@ void ExecutorService::postWork(std::function<void(void)> task) { ASIO::post(io_c
/////////////////////

ExecutorServiceProvider::ExecutorServiceProvider(int nthreads)
: executors_(nthreads), executorIdx_(0), mutex_() {}
: executors_(std::max(1, nthreads)), executorIdx_(0), mutex_() {}

ExecutorServicePtr ExecutorServiceProvider::get(size_t idx) {
if (executors_.empty()) {
return nullptr;
}
idx %= executors_.size();
Lock lock(mutex_);

Expand Down
45 changes: 45 additions & 0 deletions tests/ReaderTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>
#include <pulsar/ClientConfiguration.h>
#include <pulsar/Reader.h>
#include <time.h>

Expand Down Expand Up @@ -955,5 +956,49 @@ TEST_F(ReaderSeekTest, testSeekInclusiveChunkMessage) {
assertStartMessageId(false, secondMsgId);
}

// Regression test for segfault when Reader is used with messageListenerThreads=0.
// Verifies ExecutorServiceProvider(0) does not cause undefined behavior and
// ConsumerImpl::messageReceived does not dereference null listenerExecutor_.
TEST(ReaderTest, testReaderWithZeroMessageListenerThreads) {
ClientConfiguration clientConf;
clientConf.setMessageListenerThreads(0);
Client client(serviceUrl, clientConf);

const std::string topicName = "testReaderWithZeroMessageListenerThreads-" + std::to_string(time(nullptr));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));

ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));

constexpr int numMessages = 5;
for (int i = 0; i < numMessages; i++) {
Message msg = MessageBuilder().setContent("msg-" + std::to_string(i)).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}

int received = 0;
for (int i = 0; i < numMessages + 2; i++) {
bool hasMessageAvailable = false;
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
if (!hasMessageAvailable) {
break;
}
Message msg;
Result res = reader.readNext(msg, 3000);
ASSERT_EQ(ResultOk, res) << "readNext failed at iteration " << i;
std::string content = msg.getDataAsString();
EXPECT_EQ("msg-" + std::to_string(received), content);
++received;
}
EXPECT_EQ(received, numMessages);

producer.close();
reader.close();
client.close();
}

INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true, false));
13 changes: 13 additions & 0 deletions tests/RetryableOperationCacheTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <chrono>
#include <stdexcept>

#include "lib/ExecutorService.h"
#include "lib/RetryableOperationCache.h"

namespace pulsar {
Expand Down Expand Up @@ -82,6 +83,18 @@ class RetryableOperationCacheTest : public ::testing::Test {

using namespace pulsar;

// Regression test: ExecutorServiceProvider(0) must not cause undefined behavior (e.g. idx % 0).
// After fix, nthreads is clamped to at least 1, so get() returns a valid executor.
TEST(ExecutorServiceProviderTest, ZeroThreadsReturnsValidExecutor) {
ExecutorServiceProviderPtr provider = std::make_shared<ExecutorServiceProvider>(0);
for (int i = 0; i < 3; i++) {
ExecutorServicePtr executor = provider->get();
ASSERT_NE(executor, nullptr)
<< "get() must not return null when created with 0 threads (clamped to 1)";
}
provider->close();
}

TEST_F(RetryableOperationCacheTest, testRetry) {
auto cache = RetryableOperationCache<int>::create(provider_, std::chrono::seconds(30));
for (int i = 0; i < 10; i++) {
Expand Down
Loading