diff --git a/.github/workflows/ci-pr-validation.yaml b/.github/workflows/ci-pr-validation.yaml index d209b533..f841e058 100644 --- a/.github/workflows/ci-pr-validation.yaml +++ b/.github/workflows/ci-pr-validation.yaml @@ -99,10 +99,25 @@ jobs: key: vcpkg-${{ runner.os }}-${{ hashFiles('vcpkg.json') }} restore-keys: vcpkg-${{ runner.os }}- + - name: Restore vcpkg downloads cache + uses: actions/cache@v4 + with: + path: vcpkg/downloads + key: vcpkg-downloads-${{ runner.os }}-${{ hashFiles('vcpkg.json') }} + restore-keys: vcpkg-downloads-${{ runner.os }}- + - name: Build the project run: | - cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON -DCMAKE_EXPORT_COMPILE_COMMANDS=ON - cmake --build build -j8 + for attempt in 1 2 3; do + echo "Build attempt $attempt/3" + if cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON -DCMAKE_EXPORT_COMPILE_COMMANDS=ON && cmake --build build -j8; then + exit 0 + fi + echo "Attempt $attempt failed (e.g. vcpkg download 502), retrying in 90s..." + sleep 90 + done + echo "All build attempts failed" + exit 1 - name: Tidy check run: | @@ -137,10 +152,25 @@ jobs: key: vcpkg-${{ runner.os }}-${{ hashFiles('vcpkg.json') }} restore-keys: vcpkg-${{ runner.os }}- + - name: Restore vcpkg downloads cache + uses: actions/cache@v4 + with: + path: vcpkg/downloads + key: vcpkg-downloads-${{ runner.os }}-${{ hashFiles('vcpkg.json') }} + restore-keys: vcpkg-downloads-${{ runner.os }}- + - name: Build core libraries run: | - cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=OFF - cmake --build build -j8 + for attempt in 1 2 3; do + echo "Build attempt $attempt/3" + if cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=OFF && cmake --build build -j8; then + exit 0 + fi + echo "Attempt $attempt failed (e.g. vcpkg download 502), retrying in 90s..." + sleep 90 + done + echo "All build attempts failed" + exit 1 - name: Check formatting run: | @@ -164,8 +194,16 @@ jobs: - name: Build with Boost.Asio run: | - cmake -B build-boost-asio -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON - cmake --build build-boost-asio -j8 + for attempt in 1 2 3; do + echo "Build Boost.Asio attempt $attempt/3" + if cmake -B build-boost-asio -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON && cmake --build build-boost-asio -j8; then + exit 0 + fi + echo "Attempt $attempt failed (e.g. vcpkg download 502), retrying in 90s..." + sleep 90 + done + echo "All build attempts failed" + exit 1 - name: Build perf tools run: | diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 026f146d..045f3662 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -334,11 +334,11 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result return ResultAlreadyClosed; } - mutexLock.unlock(); - LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString()); incomingMessages_.clear(); possibleSendToDeadLetterTopicMessages_.clear(); backoff_.reset(); + mutexLock.unlock(); + LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString()); if (!messageListener_ && config_.getReceiverQueueSize() == 0) { // Complicated logic since we don't have a isLocked() function for mutex if (waitingForZeroQueueSizeMessage) { @@ -1819,7 +1819,9 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c if (result == ResultOk) { LockGuard lock(mutex_); if (getCnx().expired() || reconnectionPending_) { - // It's during reconnection, complete the seek future after connection is established + // Reconnection path: delay the seek callback until connectionOpened. clearReceiveQueue() + // and handleCreateConsumer() (which clears incomingMessages_ under the lock) run before + // the seek callback is invoked, so hasMessageAvailable() after seek sees cleared state. seekStatus_ = SeekStatus::COMPLETED; LOG_INFO(getName() << "Delay the seek future until the reconnection is done"); } else {