Skip to content

[Test] Fix flaky test: ReaderSeekTest.testHasMessageAvailableAfterSeekToEnd#549

Open
zhanglistar wants to merge 5 commits intoapache:mainfrom
bigo-sg:fix/issue-528
Open

[Test] Fix flaky test: ReaderSeekTest.testHasMessageAvailableAfterSeekToEnd#549
zhanglistar wants to merge 5 commits intoapache:mainfrom
bigo-sg:fix/issue-528

Conversation

@zhanglistar
Copy link
Contributor

@zhanglistar zhanglistar commented Mar 8, 2026

Fixes #528

Motivation

When a reader seeks to MessageId::latest(), the broker may close the consumer and trigger a reconnect. In seekAsyncInternal, when the seek succeeds but the connection is already expired or reconnectionPending_ is true, we only set seekStatus_ = COMPLETED and did not clear the local receive queue or update startMessageId_. As a result, incomingMessages_ still held prefetched messages from before the seek until connectionOpened() later called clearReceiveQueue(). During that window, hasMessageAvailable() saw !incomingMessages_.empty() and returned true, causing the flaky test ReaderSeekTest.testHasMessageAvailableAfterSeekToEnd.

Modifications

  • In ConsumerImpl::seekAsyncInternal, when seek result is OK and getCnx().expired() || reconnectionPending_ (reconnection path), we now perform the same local state updates as the non-reconnection path:
    • ackGroupingTrackerPtr_->flushAndClean()
    • incomingMessages_.clear()
    • If lastSeekArg_ holds a MessageId, set startMessageId_ from it

So after the seek is acknowledged, stale prefetched messages are cleared immediately and hasMessageAvailable() no longer returns true from old queue content before reconnect completes.

Verifying this change

  • Make sure that the change passes the CI checks.

This change is already covered by existing tests:

  • ReaderSeekTest.testHasMessageAvailableAfterSeekToEnd (parameterized) – asserts that after seek to latest, hasMessageAvailable() becomes false, then after producing a new message it becomes true and the message can be read; the fix removes the flakiness when the broker triggers reconnect on seek-to-end.

Documentation

  • doc-not-needed

Bug fix in C++ client implementation only; no API or behavior contract change and no user-facing docs update required.

Comment on lines +1824 to +1828
ackGroupingTrackerPtr_->flushAndClean();
incomingMessages_.clear();
if (lastSeekArg_.has_value() && std::holds_alternative<MessageId>(lastSeekArg_.value())) {
startMessageId_ = std::get<MessageId>(lastSeekArg_.value());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes the same changes in clearReceiveQueue. As you can see the seek callback is only executed after that, so when hasMessageAvailable is called after seek, the state is guaranteed to be cleared.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the clear in handleCreateConsumer() inside the lock (before unlock()), so the queue is cleared while holding mutex_. That way the seek callback and hasMessageAvailable() always see the cleared state and the flaky test is fixed. No lock-order change, so no new deadlock risk; the only effect is a bit longer lock hold during reconnection.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Flaky test: ReaderSeekTest.testHasMessageAvailableAfterSeekToEnd

2 participants