From f8be56a4f1cb4d0fbc6b4ab258e8bdae7899d25a Mon Sep 17 00:00:00 2001 From: Paul McLaren Date: Wed, 19 Jul 2017 10:30:16 +0300 Subject: [PATCH] add spec tests --- AmqpConnectionFactory.php | 171 +++++++++++++++++- AmqpContext.php | 27 ++- AmqpMessage.php | 8 +- LICENSE | 20 ++ Tests/Spec/AmqpConnectionFactoryTest.php | 14 ++ Tests/Spec/AmqpContextTest.php | 24 +++ Tests/Spec/AmqpMessageTest.php | 17 ++ Tests/Spec/AmqpQueueTest.php | 14 ++ .../AmqpSendToAndReceiveFromQueueTest.php | 38 ++++ .../AmqpSendToAndReceiveFromTopicTest.php | 39 ++++ ...mqpSendToAndReceiveNoWaitFromQueueTest.php | 38 ++++ ...mqpSendToAndReceiveNoWaitFromTopicTest.php | 39 ++++ ...AmqpSendToTopicAndReceiveFromQueueTest.php | 55 ++++++ ...ndToTopicAndReceiveNoWaitFromQueueTest.php | 55 ++++++ Tests/Spec/AmqpTopicTest.php | 14 ++ composer.json | 2 +- phpunit.xml.dist | 30 +++ 17 files changed, 588 insertions(+), 17 deletions(-) create mode 100644 LICENSE create mode 100644 Tests/Spec/AmqpConnectionFactoryTest.php create mode 100644 Tests/Spec/AmqpContextTest.php create mode 100644 Tests/Spec/AmqpMessageTest.php create mode 100644 Tests/Spec/AmqpQueueTest.php create mode 100644 Tests/Spec/AmqpSendToAndReceiveFromQueueTest.php create mode 100644 Tests/Spec/AmqpSendToAndReceiveFromTopicTest.php create mode 100644 Tests/Spec/AmqpSendToAndReceiveNoWaitFromQueueTest.php create mode 100644 Tests/Spec/AmqpSendToAndReceiveNoWaitFromTopicTest.php create mode 100644 Tests/Spec/AmqpSendToTopicAndReceiveFromQueueTest.php create mode 100644 Tests/Spec/AmqpSendToTopicAndReceiveNoWaitFromQueueTest.php create mode 100644 Tests/Spec/AmqpTopicTest.php create mode 100644 phpunit.xml.dist diff --git a/AmqpConnectionFactory.php b/AmqpConnectionFactory.php index a6a4590..7c1aefa 100644 --- a/AmqpConnectionFactory.php +++ b/AmqpConnectionFactory.php @@ -4,6 +4,9 @@ use Interop\Queue\PsrConnectionFactory; use PhpAmqpLib\Connection\AbstractConnection; +use PhpAmqpLib\Connection\AMQPLazyConnection; +use PhpAmqpLib\Connection\AMQPLazySocketConnection; +use PhpAmqpLib\Connection\AMQPSocketConnection; use PhpAmqpLib\Connection\AMQPStreamConnection; class AmqpConnectionFactory implements PsrConnectionFactory @@ -19,10 +22,35 @@ class AmqpConnectionFactory implements PsrConnectionFactory private $connection; /** - * @param array $config + * The config could be an array, string DSN or null. In case of null it will attempt to connect to localhost with default credentials. + * + * [ + * 'host' => 'amqp.host The host to connect too. Note: Max 1024 characters.', + * 'port' => 'amqp.port Port on the host.', + * 'vhost' => 'amqp.vhost The virtual host on the host. Note: Max 128 characters.', + * 'user' => 'amqp.user The user name to use. Note: Max 128 characters.', + * 'pass' => 'amqp.password Password. Note: Max 128 characters.', + * 'lazy' => 'the connection will be performed as later as possible, if the option set to true', + * 'stream' => 'stream or socket connection', + * ] + * + * or + * + * amqp://user:pass@host:10000/vhost?lazy=true&socket=true + * + * @param array|string $config */ - public function __construct(array $config = []) + public function __construct($config = 'amqp://') { + if (empty($config) || 'amqp://' === $config) { + $config = []; + } elseif (is_string($config)) { + $config = $this->parseDsn($config); + } elseif (is_array($config)) { + } else { + throw new \LogicException('The config must be either an array of options, a DSN string or null'); + } + $this->config = array_replace($this->defaultConfig(), $config); } @@ -40,29 +68,152 @@ public function createContext() private function establishConnection() { if (false == $this->connection) { - $this->connection = new AMQPStreamConnection( - $this->config['host'], - $this->config['port'], - $this->config['user'], - $this->config['pass'], - $this->config['vhost'] - ); + if ($this->config['stream']) { + if ($this->config['lazy']) { + $con = new AMQPLazyConnection( + $this->config['host'], + $this->config['port'], + $this->config['user'], + $this->config['pass'], + $this->config['vhost'], + $this->config['insist'], + $this->config['login_method'], + $this->config['login_response'], + $this->config['locale'], + $this->config['connection_timeout'], + $this->config['read_write_timeout'], + null, + $this->config['keepalive'], + $this->config['heartbeat'] + ); + } else { + $con = new AMQPStreamConnection( + $this->config['host'], + $this->config['port'], + $this->config['user'], + $this->config['pass'], + $this->config['vhost'], + $this->config['insist'], + $this->config['login_method'], + $this->config['login_response'], + $this->config['locale'], + $this->config['connection_timeout'], + $this->config['read_write_timeout'], + null, + $this->config['keepalive'], + $this->config['heartbeat'] + ); + } + } else { + if ($this->config['lazy']) { + $con = new AMQPLazySocketConnection( + $this->config['host'], + $this->config['port'], + $this->config['user'], + $this->config['pass'], + $this->config['vhost'], + $this->config['insist'], + $this->config['login_method'], + $this->config['login_response'], + $this->config['locale'], + $this->config['read_timeout'], + $this->config['keepalive'], + $this->config['write_timeout'], + $this->config['heartbeat'] + ); + } else { + $con = new AMQPSocketConnection( + $this->config['host'], + $this->config['port'], + $this->config['user'], + $this->config['pass'], + $this->config['vhost'], + $this->config['insist'], + $this->config['login_method'], + $this->config['login_response'], + $this->config['locale'], + $this->config['read_timeout'], + $this->config['keepalive'], + $this->config['write_timeout'], + $this->config['heartbeat'] + ); + } + } + + $this->connection = $con; } return $this->connection; } + /** + * @param string $dsn + * + * @return array + */ + private function parseDsn($dsn) + { + $dsnConfig = parse_url($dsn); + if (false === $dsnConfig) { + throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn)); + } + + $dsnConfig = array_replace([ + 'scheme' => null, + 'host' => null, + 'port' => null, + 'user' => null, + 'pass' => null, + 'path' => null, + 'query' => null, + ], $dsnConfig); + + if ('amqp' !== $dsnConfig['scheme']) { + throw new \LogicException(sprintf('The given DSN scheme "%s" is not supported. Could be "amqp" only.', $dsnConfig['scheme'])); + } + + if ($dsnConfig['query']) { + $query = []; + parse_str($dsnConfig['query'], $query); + + $dsnConfig = array_replace($query, $dsnConfig); + } + + $dsnConfig['vhost'] = ltrim($dsnConfig['path'], '/'); + + unset($dsnConfig['scheme'], $dsnConfig['query'], $dsnConfig['fragment'], $dsnConfig['path']); + + $config = array_replace($this->defaultConfig(), $dsnConfig); + $config = array_map(function ($value) { + return urldecode($value); + }, $config); + + return $config; + } + /** * @return array */ private function defaultConfig() { return [ + 'stream' => true, + 'lazy' => true, 'host' => 'localhost', 'port' => 5672, - 'vhost' => '/', 'user' => 'guest', 'pass' => 'guest', + 'vhost' => '/', + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'read_timeout' => 3, + 'keepalive' => false, + 'write_timeout' => 3, + 'heartbeat' => 0, + 'connection_timeout' => 3.0, + 'read_write_timeout' => 3.0, ]; } } diff --git a/AmqpContext.php b/AmqpContext.php index 375d574..3376bd5 100644 --- a/AmqpContext.php +++ b/AmqpContext.php @@ -6,6 +6,7 @@ use Interop\Queue\InvalidDestinationException; use Interop\Queue\PsrContext; use Interop\Queue\PsrDestination; +use Interop\Queue\PsrQueue; use Interop\Queue\PsrTopic; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; @@ -37,7 +38,7 @@ public function __construct(AbstractConnection $connection) * * @return AmqpMessage */ - public function createMessage($body = null, array $properties = [], array $headers = []) + public function createMessage($body = '', array $properties = [], array $headers = []) { return new AmqpMessage($body, $properties, $headers); } @@ -69,7 +70,17 @@ public function createTopic($name) */ public function createConsumer(PsrDestination $destination) { - InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class); + $destination instanceof PsrTopic + ? InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpTopic::class) + : InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class) + ; + + if ($destination instanceof AmqpTopic) { + $queue = $this->createTemporaryQueue(); + $this->bind($destination, $queue); + + return new AmqpConsumer($this->getChannel(), $queue); + } return new AmqpConsumer($this->getChannel(), $destination); } @@ -189,6 +200,18 @@ public function bind(PsrDestination $source, PsrDestination $target) } } + /** + * Purge all messages from the given queue. + * + * @param PsrQueue $queue + */ + public function purge(PsrQueue $queue) + { + InvalidDestinationException::assertDestinationInstanceOf($queue, AmqpQueue::class); + + $this->getChannel()->queue_purge($queue->getQueueName()); + } + public function close() { if ($this->channel) { diff --git a/AmqpMessage.php b/AmqpMessage.php index 13f8fcd..f391d73 100644 --- a/AmqpMessage.php +++ b/AmqpMessage.php @@ -7,7 +7,7 @@ class AmqpMessage implements PsrMessage { /** - * @var string|null + * @var string */ private $body; @@ -51,7 +51,7 @@ class AmqpMessage implements PsrMessage * @param array $properties * @param array $headers */ - public function __construct($body = null, array $properties = [], array $headers = []) + public function __construct($body = '', array $properties = [], array $headers = []) { $this->body = $body; $this->properties = $properties; @@ -60,7 +60,7 @@ public function __construct($body = null, array $properties = [], array $headers } /** - * @return null|string + * @return string */ public function getBody() { @@ -68,7 +68,7 @@ public function getBody() } /** - * @param string|null $body + * @param string $body */ public function setBody($body) { diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..6815011 --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) +Copyright (c) 2017 Paul McLaren + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is furnished +to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/Tests/Spec/AmqpConnectionFactoryTest.php b/Tests/Spec/AmqpConnectionFactoryTest.php new file mode 100644 index 0000000..ebc3b8a --- /dev/null +++ b/Tests/Spec/AmqpConnectionFactoryTest.php @@ -0,0 +1,14 @@ +createMock(AMQPChannel::class); + + $con = $this->createMock(AbstractConnection::class); + $con + ->expects($this->any()) + ->method('channel') + ->willReturn($channel) + ; + + return new AmqpContext($con); + } +} diff --git a/Tests/Spec/AmqpMessageTest.php b/Tests/Spec/AmqpMessageTest.php new file mode 100644 index 0000000..57b93cb --- /dev/null +++ b/Tests/Spec/AmqpMessageTest.php @@ -0,0 +1,17 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purge($queue); + + return $queue; + } +} diff --git a/Tests/Spec/AmqpSendToAndReceiveFromTopicTest.php b/Tests/Spec/AmqpSendToAndReceiveFromTopicTest.php new file mode 100644 index 0000000..dfce6cc --- /dev/null +++ b/Tests/Spec/AmqpSendToAndReceiveFromTopicTest.php @@ -0,0 +1,39 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createTopic(PsrContext $context, $topicName) + { + $topic = $context->createTopic($topicName); + $topic->setType('fanout'); + $topic->setDurable(true); + $context->declareTopic($topic); + + return $topic; + } +} diff --git a/Tests/Spec/AmqpSendToAndReceiveNoWaitFromQueueTest.php b/Tests/Spec/AmqpSendToAndReceiveNoWaitFromQueueTest.php new file mode 100644 index 0000000..b4db35c --- /dev/null +++ b/Tests/Spec/AmqpSendToAndReceiveNoWaitFromQueueTest.php @@ -0,0 +1,38 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purge($queue); + + return $queue; + } +} diff --git a/Tests/Spec/AmqpSendToAndReceiveNoWaitFromTopicTest.php b/Tests/Spec/AmqpSendToAndReceiveNoWaitFromTopicTest.php new file mode 100644 index 0000000..a50fc4c --- /dev/null +++ b/Tests/Spec/AmqpSendToAndReceiveNoWaitFromTopicTest.php @@ -0,0 +1,39 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createTopic(PsrContext $context, $topicName) + { + $topic = $context->createTopic($topicName); + $topic->setType('fanout'); + $topic->setDurable(true); + $context->declareTopic($topic); + + return $topic; + } +} diff --git a/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueTest.php b/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueTest.php new file mode 100644 index 0000000..928edaa --- /dev/null +++ b/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueTest.php @@ -0,0 +1,55 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purge($queue); + + $context->bind($context->createTopic($queueName), $queue); + + return $queue; + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createTopic(PsrContext $context, $topicName) + { + $topic = $context->createTopic($topicName); + $topic->setType('fanout'); + $topic->setDurable(true); + $context->declareTopic($topic); + + return $topic; + } +} diff --git a/Tests/Spec/AmqpSendToTopicAndReceiveNoWaitFromQueueTest.php b/Tests/Spec/AmqpSendToTopicAndReceiveNoWaitFromQueueTest.php new file mode 100644 index 0000000..6b4b190 --- /dev/null +++ b/Tests/Spec/AmqpSendToTopicAndReceiveNoWaitFromQueueTest.php @@ -0,0 +1,55 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purge($queue); + + $context->bind($context->createTopic($queueName), $queue); + + return $queue; + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createTopic(PsrContext $context, $topicName) + { + $topic = $context->createTopic($topicName); + $topic->setType('fanout'); + $topic->setDurable(true); + $context->declareTopic($topic); + + return $topic; + } +} diff --git a/Tests/Spec/AmqpTopicTest.php b/Tests/Spec/AmqpTopicTest.php new file mode 100644 index 0000000..89717f0 --- /dev/null +++ b/Tests/Spec/AmqpTopicTest.php @@ -0,0 +1,14 @@ +=5.6", - "php-amqplib/php-amqplib": "^2.6", + "php-amqplib/php-amqplib": "^2.7@dev", "queue-interop/queue-interop": "^0.5@dev", "psr/log": "^1" }, diff --git a/phpunit.xml.dist b/phpunit.xml.dist new file mode 100644 index 0000000..f6b8b17 --- /dev/null +++ b/phpunit.xml.dist @@ -0,0 +1,30 @@ + + + + + + + ./Tests + + + + + + . + + ./vendor + ./Tests + + + +