From e81c025743303c297f9f3b53bcf02099b18a792b Mon Sep 17 00:00:00 2001 From: Tom Kay Date: Thu, 19 Mar 2026 11:05:36 +0000 Subject: [PATCH 1/7] Add heartbeat() method to AmqpQueueProvider Allows callers to explicitly trigger a heartbeat check on a specific connection mode or all active connections, disconnecting on missed heartbeats so the next operation triggers a reconnect. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/Provider/Amqp/AmqpQueueProvider.php | 40 +++++++++++++++++++++++++ tests/Provider/AmqpTest.php | 39 ++++++++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/src/Provider/Amqp/AmqpQueueProvider.php b/src/Provider/Amqp/AmqpQueueProvider.php index ffa136c..00ec832 100644 --- a/src/Provider/Amqp/AmqpQueueProvider.php +++ b/src/Provider/Amqp/AmqpQueueProvider.php @@ -892,4 +892,44 @@ public function unbindQueue() ); return $this; } + + /** + * Send a heartbeat on the given connection mode, or all active connections + * + * @param string|null $connectionMode One of CONN_PUSH, CONN_CONSUME, CONN_OTHER, or null for all + * + * @return $this + */ + public function heartbeat($connectionMode = null) + { + if($connectionMode !== null) + { + $this->_heartbeat($connectionMode); + } + else + { + foreach(array_keys($this->_connections) as $mode) + { + $this->_heartbeat($mode); + } + } + return $this; + } + + protected function _heartbeat($connectionMode) + { + if(!empty($this->_connections[$connectionMode]) + && $this->_connections[$connectionMode]->isConnected() + ) + { + try + { + $this->_connections[$connectionMode]->checkHeartBeat(); + } + catch(AMQPHeartbeatMissedException $e) + { + $this->disconnect($connectionMode); + } + } + } } diff --git a/tests/Provider/AmqpTest.php b/tests/Provider/AmqpTest.php index b49cc23..fa71533 100644 --- a/tests/Provider/AmqpTest.php +++ b/tests/Provider/AmqpTest.php @@ -40,6 +40,45 @@ function ($msg, $tag) use ($q) { ); } + public function testHeartbeat() + { + $q = $this->_getProvider('test_heartbeat_fn'); + $q->declareExchange() + ->declareQueue() + ->bindQueue(); + + // heartbeat on a specific connection should not disconnect + $q->push('heartbeat_test'); + $disconnectsBefore = $q->getDisconnectCount(); + $q->heartbeat(AmqpMockProvider::CONN_PUSH); + self::assertEquals($disconnectsBefore, $q->getDisconnectCount()); + + // heartbeat on all connections should not disconnect + $q->heartbeat(); + self::assertEquals($disconnectsBefore, $q->getDisconnectCount()); + } + + public function testHeartbeatAfterMissed() + { + $q = $this->_getProvider('test_heartbeat_missed')->unregisterHeartbeat(); + $q->declareExchange() + ->declareQueue() + ->bindQueue(); + $q->push('trigger'); + + // sleep past the heartbeat interval to cause a missed heartbeat + $timeLeft = (int)$q->config()->getItem('heartbeat') * 3; + while($timeLeft > 0) + { + $timeLeft = sleep($timeLeft); + } + + $disconnectsBefore = $q->getDisconnectCount(); + $q->heartbeat(AmqpMockProvider::CONN_PUSH); + // should have disconnected due to missed heartbeat + self::assertGreaterThan($disconnectsBefore, $q->getDisconnectCount()); + } + public function testAmqp() { $q = $this->_getProvider('test', 'testexchange'); From d6eec8c85bd5883cd11a5449e07b47cb7f5e9d73 Mon Sep 17 00:00:00 2001 From: Tom Kay Date: Thu, 19 Mar 2026 11:10:54 +0000 Subject: [PATCH 2/7] Improve heartbeat tests to verify code path execution Add heartbeat check counter to AmqpMockProvider to assert that _heartbeat() is actually invoked rather than short-circuited. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/Provider/AmqpTest.php | 8 +++++++- tests/Provider/Mock/AmqpMockProvider.php | 12 ++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/tests/Provider/AmqpTest.php b/tests/Provider/AmqpTest.php index fa71533..bd8ca42 100644 --- a/tests/Provider/AmqpTest.php +++ b/tests/Provider/AmqpTest.php @@ -49,12 +49,16 @@ public function testHeartbeat() // heartbeat on a specific connection should not disconnect $q->push('heartbeat_test'); + $checksBefore = $q->getHeartbeatCheckCount(); $disconnectsBefore = $q->getDisconnectCount(); $q->heartbeat(AmqpMockProvider::CONN_PUSH); + self::assertGreaterThan($checksBefore, $q->getHeartbeatCheckCount()); self::assertEquals($disconnectsBefore, $q->getDisconnectCount()); // heartbeat on all connections should not disconnect + $checksBefore = $q->getHeartbeatCheckCount(); $q->heartbeat(); + self::assertGreaterThan($checksBefore, $q->getHeartbeatCheckCount()); self::assertEquals($disconnectsBefore, $q->getDisconnectCount()); } @@ -73,9 +77,11 @@ public function testHeartbeatAfterMissed() $timeLeft = sleep($timeLeft); } + $checksBefore = $q->getHeartbeatCheckCount(); $disconnectsBefore = $q->getDisconnectCount(); $q->heartbeat(AmqpMockProvider::CONN_PUSH); - // should have disconnected due to missed heartbeat + // should have checked and disconnected due to missed heartbeat + self::assertGreaterThan($checksBefore, $q->getHeartbeatCheckCount()); self::assertGreaterThan($disconnectsBefore, $q->getDisconnectCount()); } diff --git a/tests/Provider/Mock/AmqpMockProvider.php b/tests/Provider/Mock/AmqpMockProvider.php index 45be720..4259362 100644 --- a/tests/Provider/Mock/AmqpMockProvider.php +++ b/tests/Provider/Mock/AmqpMockProvider.php @@ -8,6 +8,7 @@ class AmqpMockProvider extends AmqpQueueProvider { protected $_disconnectCount = 0; protected $_unregisterHeartbeat = false; + protected $_heartbeatCheckCount = 0; protected function _getConnection($connectionMode) { @@ -34,6 +35,17 @@ public function getDisconnectCount() return $this->_disconnectCount; } + public function getHeartbeatCheckCount() + { + return $this->_heartbeatCheckCount; + } + + protected function _heartbeat($connectionMode) + { + $this->_heartbeatCheckCount++; + parent::_heartbeat($connectionMode); + } + public function disconnect($connectionMode = null) { parent::disconnect($connectionMode); From 48832ab1fe34710d406b377b5239e88ed6c91dd3 Mon Sep 17 00:00:00 2001 From: Tom Kay Date: Thu, 19 Mar 2026 12:34:37 +0000 Subject: [PATCH 3/7] Add PubSub test coverage and fix gRPC error code handling Add integration tests for GooglePubSubProvider against the PubSub emulator. Fix auto_create not working on gRPC transport - the code compared exception codes against HTTP status codes (404/409) but the Google Cloud PHP library passes gRPC codes (5/6) when using gRPC transport. Co-Authored-By: Claude Opus 4.6 (1M context) --- .github/workflows/unit-test.yml | 10 + src/Provider/Google/GooglePubSubProvider.php | 6 +- tests/Provider/GooglePubSubTest.php | 259 +++++++++++++++++++ 3 files changed, 272 insertions(+), 3 deletions(-) create mode 100644 tests/Provider/GooglePubSubTest.php diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index 1019f54..3481417 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -21,6 +21,14 @@ jobs: image: rabbitmq:${{matrix.rabbit}}-management ports: - 5672:5672 + pubsub: + image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators + ports: + - 8085:8085 + options: >- + --entrypoint "gcloud beta emulators pubsub start --host-port=0.0.0.0:8085" + env: + CLOUDSDK_CORE_PROJECT: test-project steps: - uses: actions/checkout@v6 - uses: shivammathur/setup-php@v2 @@ -30,4 +38,6 @@ jobs: tools: composer, phpunit - run: composer install -n --prefer-dist --no-security-blocking - run: php vendor/phpunit/phpunit/phpunit -c phpunit.xml --coverage-clover=coverage.xml + env: + PUBSUB_EMULATOR_HOST: localhost:8085 - run: php vendor/bin/coverage-check coverage.xml 10 diff --git a/src/Provider/Google/GooglePubSubProvider.php b/src/Provider/Google/GooglePubSubProvider.php index 99f1359..06c1329 100644 --- a/src/Provider/Google/GooglePubSubProvider.php +++ b/src/Provider/Google/GooglePubSubProvider.php @@ -157,7 +157,7 @@ private function _createTopicAndSub() } catch(ConflictException $e) { - if($e->getCode() != 409) + if($e->getCode() != 409 && $e->getCode() != 6) { throw $e; } @@ -169,7 +169,7 @@ private function _createTopicAndSub() } catch(ConflictException $e) { - if($e->getCode() != 409) + if($e->getCode() != 409 && $e->getCode() != 6) { throw $e; } @@ -216,7 +216,7 @@ public function pushBatch(array $batch) } catch(NotFoundException $e) { - if($this->_getAutoCreate() && ($e->getCode() == 404)) + if($this->_getAutoCreate() && ($e->getCode() == 404 || $e->getCode() == 5)) { $this->_createTopicAndSub(); return $topic->publishBatch($messages); diff --git a/tests/Provider/GooglePubSubTest.php b/tests/Provider/GooglePubSubTest.php new file mode 100644 index 0000000..d6a483e --- /dev/null +++ b/tests/Provider/GooglePubSubTest.php @@ -0,0 +1,259 @@ +configure(new ConfigSection('', ['auto_create' => true])); + return $q; + } + + protected function _uniqueName(string $base): string + { + return $base . '_' . uniqid(); + } + + public function testPushAndConsume() + { + $name = $this->_uniqueName('test_push_consume'); + $q = $this->_getProvider($name); + + $q->push('hello world'); + + $result = null; + $q->consume(function ($data) use (&$result) { + $result = $data; + return true; // ack + }); + + $this->assertEquals('hello world', $result); + } + + public function testPushBatch() + { + $name = $this->_uniqueName('test_push_batch'); + $q = $this->_getProvider($name); + + $result = $q->pushBatch(['msg1', 'msg2', 'msg3']); + $this->assertCount(3, $result['messageIds']); + } + + public function testPushBatchEmpty() + { + $name = $this->_uniqueName('test_push_batch_empty'); + $q = $this->_getProvider($name); + + $result = $q->pushBatch([]); + $this->assertNull($result); + } + + public function testConsumeNack() + { + $name = $this->_uniqueName('test_nack'); + $q = $this->_getProvider($name); + + $q->push('nack me'); + + // Nack the message (return false) + $q->consume(function ($data) { + return false; + }); + + // Message should be redelivered after nack + $result = null; + $q->consume(function ($data) use (&$result) { + $result = $data; + return true; + }); + + $this->assertEquals('nack me', $result); + } + + public function testBatchConsume() + { + $name = $this->_uniqueName('test_batch_consume'); + $q = $this->_getProvider($name); + + $q->pushBatch(['b1', 'b2', 'b3']); + + $received = []; + $hadMessages = $q->batchConsume(function ($messages) use ($q, &$received) { + foreach($messages as $ackId => $data) + { + $received[] = $data; + $q->ack($ackId); + } + }, 10); + + $this->assertTrue($hadMessages); + $this->assertCount(3, $received); + } + + public function testBatchAck() + { + $name = $this->_uniqueName('test_batch_ack'); + $q = $this->_getProvider($name); + + $q->pushBatch(['a1', 'a2', 'a3']); + + $received = []; + $q->batchConsume(function ($messages) use ($q, &$received) { + $results = []; + foreach($messages as $ackId => $data) + { + $received[] = $data; + $results[$ackId] = true; + } + $q->batchAck($results); + }, 10); + + $this->assertCount(3, $received); + } + + public function testBatchNack() + { + $name = $this->_uniqueName('test_batch_nack'); + $q = $this->_getProvider($name); + + $q->push('nack_batch'); + + $q->batchConsume(function ($messages) use ($q) { + $results = []; + foreach($messages as $ackId => $data) + { + $results[$ackId] = false; // nack + } + $q->batchAck($results); + }, 10); + + // Nacked message should be redelivered + $redelivered = null; + $q->batchConsume(function ($messages) use ($q, &$redelivered) { + foreach($messages as $ackId => $data) + { + $redelivered = $data; + $q->ack($ackId); + } + }, 10); + + $this->assertEquals('nack_batch', $redelivered); + } + + public function testSingleAckAndNack() + { + $name = $this->_uniqueName('test_single_ack_nack'); + $q = $this->_getProvider($name); + + $q->pushBatch(['keep', 'reject']); + + $nackId = null; + $q->batchConsume(function ($messages) use ($q, &$nackId) { + foreach($messages as $ackId => $data) + { + if($data === 'keep') + { + $q->ack($ackId); + } + else + { + $q->nack($ackId); + $nackId = $ackId; + } + } + }, 10); + + // The nacked message should be redelivered + $redelivered = null; + $q->batchConsume(function ($messages) use ($q, &$redelivered) { + foreach($messages as $ackId => $data) + { + $redelivered = $data; + $q->ack($ackId); + } + }, 10); + + $this->assertEquals('reject', $redelivered); + } + + public function testSeparateTopicAndSubscription() + { + $topic = $this->_uniqueName('test_topic'); + $sub = $this->_uniqueName('test_sub'); + $q = $this->_getProvider($topic, $sub); + + $q->push('separate names'); + + $result = null; + $q->consume(function ($data) use (&$result) { + $result = $data; + return true; + }); + + $this->assertEquals('separate names', $result); + } + + public function testCreateDefaultsSubscriptionToTopic() + { + $name = $this->_uniqueName('test_default_sub'); + $q = GooglePubSubProvider::create($name); + $q->configure(new ConfigSection('', ['auto_create' => true])); + + $q->push('default sub'); + + $result = null; + $q->consume(function ($data) use (&$result) { + $result = $data; + return true; + }); + + $this->assertEquals('default sub', $result); + } + + public function testAckDeadlineConfig() + { + $name = $this->_uniqueName('test_ack_deadline'); + $q = GooglePubSubProvider::create($name); + $q->configure(new ConfigSection('', ['auto_create' => true, 'ack_deadline' => 30])); + + $q->push('with deadline'); + + $result = null; + $q->consume(function ($data) use (&$result) { + $result = $data; + return true; + }); + + $this->assertEquals('with deadline', $result); + } + + public function testMessageEncoding() + { + $name = $this->_uniqueName('test_encoding'); + $q = $this->_getProvider($name); + + $complex = ['key' => 'value', 'nested' => ['a' => 1]]; + $q->push($complex); + + $result = null; + $q->consume(function ($data) use (&$result) { + $result = $data; + return true; + }); + + $this->assertEquals('value', $result->key); + $this->assertEquals(1, $result->nested->a); + } +} \ No newline at end of file From 9c7af6d67178f652ad81bb33d54d995980fe6900 Mon Sep 17 00:00:00 2001 From: Tom Kay Date: Thu, 19 Mar 2026 12:40:19 +0000 Subject: [PATCH 4/7] Fix CI: run PubSub emulator as a step instead of service Docker entrypoint doesn't support multi-word commands as a single string. Run the emulator via docker run in a step with a health check wait. Co-Authored-By: Claude Opus 4.6 (1M context) --- .github/workflows/unit-test.yml | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index 3481417..454d775 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -21,15 +21,12 @@ jobs: image: rabbitmq:${{matrix.rabbit}}-management ports: - 5672:5672 - pubsub: - image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators - ports: - - 8085:8085 - options: >- - --entrypoint "gcloud beta emulators pubsub start --host-port=0.0.0.0:8085" - env: - CLOUDSDK_CORE_PROJECT: test-project steps: + - name: Start PubSub emulator + run: | + docker run -d -p 8085:8085 gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators \ + gcloud beta emulators pubsub start --host-port=0.0.0.0:8085 + timeout 30 bash -c 'until curl -s http://localhost:8085 > /dev/null 2>&1; do sleep 1; done' - uses: actions/checkout@v6 - uses: shivammathur/setup-php@v2 with: From a79cee5b957223bd113854269bcfef98e4d4ac0e Mon Sep 17 00:00:00 2001 From: Tom Kay Date: Thu, 19 Mar 2026 12:48:17 +0000 Subject: [PATCH 5/7] Fix CI: add grpc extension for PubSub emulator on PHP 8.x PubSub v2 requires the grpc extension to use the emulator - without it, the REST transport validates credentials before checking the emulator flag. Co-Authored-By: Claude Opus 4.6 (1M context) --- .github/workflows/unit-test.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index 454d775..8fdcdab 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -31,6 +31,7 @@ jobs: - uses: shivammathur/setup-php@v2 with: php-version: ${{matrix.php}} + extensions: grpc coverage: xdebug tools: composer, phpunit - run: composer install -n --prefer-dist --no-security-blocking From 4c8eb65db34e5d3fbce4cb6f25b9eb4db7bb1f24 Mon Sep 17 00:00:00 2001 From: Tom Kay Date: Thu, 19 Mar 2026 12:53:30 +0000 Subject: [PATCH 6/7] remove grpc --- .github/workflows/unit-test.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index 8fdcdab..454d775 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -31,7 +31,6 @@ jobs: - uses: shivammathur/setup-php@v2 with: php-version: ${{matrix.php}} - extensions: grpc coverage: xdebug tools: composer, phpunit - run: composer install -n --prefer-dist --no-security-blocking From c7ec9c38250f538d89b8ddd355eacc430629b09d Mon Sep 17 00:00:00 2001 From: Tom Kay Date: Thu, 19 Mar 2026 12:53:32 +0000 Subject: [PATCH 7/7] Fix PubSub emulator auth on cloud-pubsub v2 PubSub v2 validates credentials before checking the emulator flag. Pass InsecureCredentials when PUBSUB_EMULATOR_HOST is set and no explicit credentials are configured. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/Provider/Google/GooglePubSubProvider.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Provider/Google/GooglePubSubProvider.php b/src/Provider/Google/GooglePubSubProvider.php index 06c1329..cefafb3 100644 --- a/src/Provider/Google/GooglePubSubProvider.php +++ b/src/Provider/Google/GooglePubSubProvider.php @@ -64,6 +64,10 @@ private function _getClient() { $options['keyFile'] = $this->_loadCredentials($rawCreds); } + elseif(getenv('PUBSUB_EMULATOR_HOST')) + { + $options['credentials'] = new \Google\Auth\Credentials\InsecureCredentials(); + } $this->_client = new PubSubClient($options); } return $this->_client;