diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index 1019f54..454d775 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -22,6 +22,11 @@ jobs: ports: - 5672:5672 steps: + - name: Start PubSub emulator + run: | + docker run -d -p 8085:8085 gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators \ + gcloud beta emulators pubsub start --host-port=0.0.0.0:8085 + timeout 30 bash -c 'until curl -s http://localhost:8085 > /dev/null 2>&1; do sleep 1; done' - uses: actions/checkout@v6 - uses: shivammathur/setup-php@v2 with: @@ -30,4 +35,6 @@ jobs: tools: composer, phpunit - run: composer install -n --prefer-dist --no-security-blocking - run: php vendor/phpunit/phpunit/phpunit -c phpunit.xml --coverage-clover=coverage.xml + env: + PUBSUB_EMULATOR_HOST: localhost:8085 - run: php vendor/bin/coverage-check coverage.xml 10 diff --git a/src/Provider/Amqp/AmqpQueueProvider.php b/src/Provider/Amqp/AmqpQueueProvider.php index ffa136c..00ec832 100644 --- a/src/Provider/Amqp/AmqpQueueProvider.php +++ b/src/Provider/Amqp/AmqpQueueProvider.php @@ -892,4 +892,44 @@ public function unbindQueue() ); return $this; } + + /** + * Send a heartbeat on the given connection mode, or all active connections + * + * @param string|null $connectionMode One of CONN_PUSH, CONN_CONSUME, CONN_OTHER, or null for all + * + * @return $this + */ + public function heartbeat($connectionMode = null) + { + if($connectionMode !== null) + { + $this->_heartbeat($connectionMode); + } + else + { + foreach(array_keys($this->_connections) as $mode) + { + $this->_heartbeat($mode); + } + } + return $this; + } + + protected function _heartbeat($connectionMode) + { + if(!empty($this->_connections[$connectionMode]) + && $this->_connections[$connectionMode]->isConnected() + ) + { + try + { + $this->_connections[$connectionMode]->checkHeartBeat(); + } + catch(AMQPHeartbeatMissedException $e) + { + $this->disconnect($connectionMode); + } + } + } } diff --git a/src/Provider/Google/GooglePubSubProvider.php b/src/Provider/Google/GooglePubSubProvider.php index 99f1359..cefafb3 100644 --- a/src/Provider/Google/GooglePubSubProvider.php +++ b/src/Provider/Google/GooglePubSubProvider.php @@ -64,6 +64,10 @@ private function _getClient() { $options['keyFile'] = $this->_loadCredentials($rawCreds); } + elseif(getenv('PUBSUB_EMULATOR_HOST')) + { + $options['credentials'] = new \Google\Auth\Credentials\InsecureCredentials(); + } $this->_client = new PubSubClient($options); } return $this->_client; @@ -157,7 +161,7 @@ private function _createTopicAndSub() } catch(ConflictException $e) { - if($e->getCode() != 409) + if($e->getCode() != 409 && $e->getCode() != 6) { throw $e; } @@ -169,7 +173,7 @@ private function _createTopicAndSub() } catch(ConflictException $e) { - if($e->getCode() != 409) + if($e->getCode() != 409 && $e->getCode() != 6) { throw $e; } @@ -216,7 +220,7 @@ public function pushBatch(array $batch) } catch(NotFoundException $e) { - if($this->_getAutoCreate() && ($e->getCode() == 404)) + if($this->_getAutoCreate() && ($e->getCode() == 404 || $e->getCode() == 5)) { $this->_createTopicAndSub(); return $topic->publishBatch($messages); diff --git a/tests/Provider/AmqpTest.php b/tests/Provider/AmqpTest.php index b49cc23..bd8ca42 100644 --- a/tests/Provider/AmqpTest.php +++ b/tests/Provider/AmqpTest.php @@ -40,6 +40,51 @@ function ($msg, $tag) use ($q) { ); } + public function testHeartbeat() + { + $q = $this->_getProvider('test_heartbeat_fn'); + $q->declareExchange() + ->declareQueue() + ->bindQueue(); + + // heartbeat on a specific connection should not disconnect + $q->push('heartbeat_test'); + $checksBefore = $q->getHeartbeatCheckCount(); + $disconnectsBefore = $q->getDisconnectCount(); + $q->heartbeat(AmqpMockProvider::CONN_PUSH); + self::assertGreaterThan($checksBefore, $q->getHeartbeatCheckCount()); + self::assertEquals($disconnectsBefore, $q->getDisconnectCount()); + + // heartbeat on all connections should not disconnect + $checksBefore = $q->getHeartbeatCheckCount(); + $q->heartbeat(); + self::assertGreaterThan($checksBefore, $q->getHeartbeatCheckCount()); + self::assertEquals($disconnectsBefore, $q->getDisconnectCount()); + } + + public function testHeartbeatAfterMissed() + { + $q = $this->_getProvider('test_heartbeat_missed')->unregisterHeartbeat(); + $q->declareExchange() + ->declareQueue() + ->bindQueue(); + $q->push('trigger'); + + // sleep past the heartbeat interval to cause a missed heartbeat + $timeLeft = (int)$q->config()->getItem('heartbeat') * 3; + while($timeLeft > 0) + { + $timeLeft = sleep($timeLeft); + } + + $checksBefore = $q->getHeartbeatCheckCount(); + $disconnectsBefore = $q->getDisconnectCount(); + $q->heartbeat(AmqpMockProvider::CONN_PUSH); + // should have checked and disconnected due to missed heartbeat + self::assertGreaterThan($checksBefore, $q->getHeartbeatCheckCount()); + self::assertGreaterThan($disconnectsBefore, $q->getDisconnectCount()); + } + public function testAmqp() { $q = $this->_getProvider('test', 'testexchange'); diff --git a/tests/Provider/GooglePubSubTest.php b/tests/Provider/GooglePubSubTest.php new file mode 100644 index 0000000..d6a483e --- /dev/null +++ b/tests/Provider/GooglePubSubTest.php @@ -0,0 +1,259 @@ +configure(new ConfigSection('', ['auto_create' => true])); + return $q; + } + + protected function _uniqueName(string $base): string + { + return $base . '_' . uniqid(); + } + + public function testPushAndConsume() + { + $name = $this->_uniqueName('test_push_consume'); + $q = $this->_getProvider($name); + + $q->push('hello world'); + + $result = null; + $q->consume(function ($data) use (&$result) { + $result = $data; + return true; // ack + }); + + $this->assertEquals('hello world', $result); + } + + public function testPushBatch() + { + $name = $this->_uniqueName('test_push_batch'); + $q = $this->_getProvider($name); + + $result = $q->pushBatch(['msg1', 'msg2', 'msg3']); + $this->assertCount(3, $result['messageIds']); + } + + public function testPushBatchEmpty() + { + $name = $this->_uniqueName('test_push_batch_empty'); + $q = $this->_getProvider($name); + + $result = $q->pushBatch([]); + $this->assertNull($result); + } + + public function testConsumeNack() + { + $name = $this->_uniqueName('test_nack'); + $q = $this->_getProvider($name); + + $q->push('nack me'); + + // Nack the message (return false) + $q->consume(function ($data) { + return false; + }); + + // Message should be redelivered after nack + $result = null; + $q->consume(function ($data) use (&$result) { + $result = $data; + return true; + }); + + $this->assertEquals('nack me', $result); + } + + public function testBatchConsume() + { + $name = $this->_uniqueName('test_batch_consume'); + $q = $this->_getProvider($name); + + $q->pushBatch(['b1', 'b2', 'b3']); + + $received = []; + $hadMessages = $q->batchConsume(function ($messages) use ($q, &$received) { + foreach($messages as $ackId => $data) + { + $received[] = $data; + $q->ack($ackId); + } + }, 10); + + $this->assertTrue($hadMessages); + $this->assertCount(3, $received); + } + + public function testBatchAck() + { + $name = $this->_uniqueName('test_batch_ack'); + $q = $this->_getProvider($name); + + $q->pushBatch(['a1', 'a2', 'a3']); + + $received = []; + $q->batchConsume(function ($messages) use ($q, &$received) { + $results = []; + foreach($messages as $ackId => $data) + { + $received[] = $data; + $results[$ackId] = true; + } + $q->batchAck($results); + }, 10); + + $this->assertCount(3, $received); + } + + public function testBatchNack() + { + $name = $this->_uniqueName('test_batch_nack'); + $q = $this->_getProvider($name); + + $q->push('nack_batch'); + + $q->batchConsume(function ($messages) use ($q) { + $results = []; + foreach($messages as $ackId => $data) + { + $results[$ackId] = false; // nack + } + $q->batchAck($results); + }, 10); + + // Nacked message should be redelivered + $redelivered = null; + $q->batchConsume(function ($messages) use ($q, &$redelivered) { + foreach($messages as $ackId => $data) + { + $redelivered = $data; + $q->ack($ackId); + } + }, 10); + + $this->assertEquals('nack_batch', $redelivered); + } + + public function testSingleAckAndNack() + { + $name = $this->_uniqueName('test_single_ack_nack'); + $q = $this->_getProvider($name); + + $q->pushBatch(['keep', 'reject']); + + $nackId = null; + $q->batchConsume(function ($messages) use ($q, &$nackId) { + foreach($messages as $ackId => $data) + { + if($data === 'keep') + { + $q->ack($ackId); + } + else + { + $q->nack($ackId); + $nackId = $ackId; + } + } + }, 10); + + // The nacked message should be redelivered + $redelivered = null; + $q->batchConsume(function ($messages) use ($q, &$redelivered) { + foreach($messages as $ackId => $data) + { + $redelivered = $data; + $q->ack($ackId); + } + }, 10); + + $this->assertEquals('reject', $redelivered); + } + + public function testSeparateTopicAndSubscription() + { + $topic = $this->_uniqueName('test_topic'); + $sub = $this->_uniqueName('test_sub'); + $q = $this->_getProvider($topic, $sub); + + $q->push('separate names'); + + $result = null; + $q->consume(function ($data) use (&$result) { + $result = $data; + return true; + }); + + $this->assertEquals('separate names', $result); + } + + public function testCreateDefaultsSubscriptionToTopic() + { + $name = $this->_uniqueName('test_default_sub'); + $q = GooglePubSubProvider::create($name); + $q->configure(new ConfigSection('', ['auto_create' => true])); + + $q->push('default sub'); + + $result = null; + $q->consume(function ($data) use (&$result) { + $result = $data; + return true; + }); + + $this->assertEquals('default sub', $result); + } + + public function testAckDeadlineConfig() + { + $name = $this->_uniqueName('test_ack_deadline'); + $q = GooglePubSubProvider::create($name); + $q->configure(new ConfigSection('', ['auto_create' => true, 'ack_deadline' => 30])); + + $q->push('with deadline'); + + $result = null; + $q->consume(function ($data) use (&$result) { + $result = $data; + return true; + }); + + $this->assertEquals('with deadline', $result); + } + + public function testMessageEncoding() + { + $name = $this->_uniqueName('test_encoding'); + $q = $this->_getProvider($name); + + $complex = ['key' => 'value', 'nested' => ['a' => 1]]; + $q->push($complex); + + $result = null; + $q->consume(function ($data) use (&$result) { + $result = $data; + return true; + }); + + $this->assertEquals('value', $result->key); + $this->assertEquals(1, $result->nested->a); + } +} \ No newline at end of file diff --git a/tests/Provider/Mock/AmqpMockProvider.php b/tests/Provider/Mock/AmqpMockProvider.php index 45be720..4259362 100644 --- a/tests/Provider/Mock/AmqpMockProvider.php +++ b/tests/Provider/Mock/AmqpMockProvider.php @@ -8,6 +8,7 @@ class AmqpMockProvider extends AmqpQueueProvider { protected $_disconnectCount = 0; protected $_unregisterHeartbeat = false; + protected $_heartbeatCheckCount = 0; protected function _getConnection($connectionMode) { @@ -34,6 +35,17 @@ public function getDisconnectCount() return $this->_disconnectCount; } + public function getHeartbeatCheckCount() + { + return $this->_heartbeatCheckCount; + } + + protected function _heartbeat($connectionMode) + { + $this->_heartbeatCheckCount++; + parent::_heartbeat($connectionMode); + } + public function disconnect($connectionMode = null) { parent::disconnect($connectionMode);