Skip to content

Commit

Permalink
add spec tests
Browse files Browse the repository at this point in the history
  • Loading branch information
fibula committed Jul 19, 2017
1 parent c88b011 commit f8be56a
Show file tree
Hide file tree
Showing 17 changed files with 588 additions and 17 deletions.
171 changes: 161 additions & 10 deletions AmqpConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

use Interop\Queue\PsrConnectionFactory;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Connection\AMQPLazyConnection;
use PhpAmqpLib\Connection\AMQPLazySocketConnection;
use PhpAmqpLib\Connection\AMQPSocketConnection;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class AmqpConnectionFactory implements PsrConnectionFactory
Expand All @@ -19,10 +22,35 @@ class AmqpConnectionFactory implements PsrConnectionFactory
private $connection;

/**
* @param array $config
* The config could be an array, string DSN or null. In case of null it will attempt to connect to localhost with default credentials.
*
* [
* 'host' => 'amqp.host The host to connect too. Note: Max 1024 characters.',
* 'port' => 'amqp.port Port on the host.',
* 'vhost' => 'amqp.vhost The virtual host on the host. Note: Max 128 characters.',
* 'user' => 'amqp.user The user name to use. Note: Max 128 characters.',
* 'pass' => 'amqp.password Password. Note: Max 128 characters.',
* 'lazy' => 'the connection will be performed as later as possible, if the option set to true',
* 'stream' => 'stream or socket connection',
* ]
*
* or
*
* amqp://user:pass@host:10000/vhost?lazy=true&socket=true
*
* @param array|string $config
*/
public function __construct(array $config = [])
public function __construct($config = 'amqp://')
{
if (empty($config) || 'amqp://' === $config) {
$config = [];
} elseif (is_string($config)) {
$config = $this->parseDsn($config);
} elseif (is_array($config)) {
} else {
throw new \LogicException('The config must be either an array of options, a DSN string or null');
}

$this->config = array_replace($this->defaultConfig(), $config);
}

Expand All @@ -40,29 +68,152 @@ public function createContext()
private function establishConnection()
{
if (false == $this->connection) {
$this->connection = new AMQPStreamConnection(
$this->config['host'],
$this->config['port'],
$this->config['user'],
$this->config['pass'],
$this->config['vhost']
);
if ($this->config['stream']) {
if ($this->config['lazy']) {
$con = new AMQPLazyConnection(
$this->config['host'],
$this->config['port'],
$this->config['user'],
$this->config['pass'],
$this->config['vhost'],
$this->config['insist'],
$this->config['login_method'],
$this->config['login_response'],
$this->config['locale'],
$this->config['connection_timeout'],
$this->config['read_write_timeout'],
null,
$this->config['keepalive'],
$this->config['heartbeat']
);
} else {
$con = new AMQPStreamConnection(
$this->config['host'],
$this->config['port'],
$this->config['user'],
$this->config['pass'],
$this->config['vhost'],
$this->config['insist'],
$this->config['login_method'],
$this->config['login_response'],
$this->config['locale'],
$this->config['connection_timeout'],
$this->config['read_write_timeout'],
null,
$this->config['keepalive'],
$this->config['heartbeat']
);
}
} else {
if ($this->config['lazy']) {
$con = new AMQPLazySocketConnection(
$this->config['host'],
$this->config['port'],
$this->config['user'],
$this->config['pass'],
$this->config['vhost'],
$this->config['insist'],
$this->config['login_method'],
$this->config['login_response'],
$this->config['locale'],
$this->config['read_timeout'],
$this->config['keepalive'],
$this->config['write_timeout'],
$this->config['heartbeat']
);
} else {
$con = new AMQPSocketConnection(
$this->config['host'],
$this->config['port'],
$this->config['user'],
$this->config['pass'],
$this->config['vhost'],
$this->config['insist'],
$this->config['login_method'],
$this->config['login_response'],
$this->config['locale'],
$this->config['read_timeout'],
$this->config['keepalive'],
$this->config['write_timeout'],
$this->config['heartbeat']
);
}
}

$this->connection = $con;
}

return $this->connection;
}

/**
* @param string $dsn
*
* @return array
*/
private function parseDsn($dsn)
{
$dsnConfig = parse_url($dsn);
if (false === $dsnConfig) {
throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn));
}

$dsnConfig = array_replace([
'scheme' => null,
'host' => null,
'port' => null,
'user' => null,
'pass' => null,
'path' => null,
'query' => null,
], $dsnConfig);

if ('amqp' !== $dsnConfig['scheme']) {
throw new \LogicException(sprintf('The given DSN scheme "%s" is not supported. Could be "amqp" only.', $dsnConfig['scheme']));
}

if ($dsnConfig['query']) {
$query = [];
parse_str($dsnConfig['query'], $query);

$dsnConfig = array_replace($query, $dsnConfig);
}

$dsnConfig['vhost'] = ltrim($dsnConfig['path'], '/');

unset($dsnConfig['scheme'], $dsnConfig['query'], $dsnConfig['fragment'], $dsnConfig['path']);

$config = array_replace($this->defaultConfig(), $dsnConfig);
$config = array_map(function ($value) {
return urldecode($value);
}, $config);

return $config;
}

/**
* @return array
*/
private function defaultConfig()
{
return [
'stream' => true,
'lazy' => true,
'host' => 'localhost',
'port' => 5672,
'vhost' => '/',
'user' => 'guest',
'pass' => 'guest',
'vhost' => '/',
'insist' => false,
'login_method' => 'AMQPLAIN',
'login_response' => null,
'locale' => 'en_US',
'read_timeout' => 3,
'keepalive' => false,
'write_timeout' => 3,
'heartbeat' => 0,
'connection_timeout' => 3.0,
'read_write_timeout' => 3.0,
];
}
}
27 changes: 25 additions & 2 deletions AmqpContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Interop\Queue\InvalidDestinationException;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrDestination;
use Interop\Queue\PsrQueue;
use Interop\Queue\PsrTopic;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AbstractConnection;
Expand Down Expand Up @@ -37,7 +38,7 @@ public function __construct(AbstractConnection $connection)
*
* @return AmqpMessage
*/
public function createMessage($body = null, array $properties = [], array $headers = [])
public function createMessage($body = '', array $properties = [], array $headers = [])
{
return new AmqpMessage($body, $properties, $headers);
}
Expand Down Expand Up @@ -69,7 +70,17 @@ public function createTopic($name)
*/
public function createConsumer(PsrDestination $destination)
{
InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class);
$destination instanceof PsrTopic
? InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpTopic::class)
: InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class)
;

if ($destination instanceof AmqpTopic) {
$queue = $this->createTemporaryQueue();
$this->bind($destination, $queue);

return new AmqpConsumer($this->getChannel(), $queue);
}

return new AmqpConsumer($this->getChannel(), $destination);
}
Expand Down Expand Up @@ -189,6 +200,18 @@ public function bind(PsrDestination $source, PsrDestination $target)
}
}

/**
* Purge all messages from the given queue.
*
* @param PsrQueue $queue
*/
public function purge(PsrQueue $queue)
{
InvalidDestinationException::assertDestinationInstanceOf($queue, AmqpQueue::class);

$this->getChannel()->queue_purge($queue->getQueueName());
}

public function close()
{
if ($this->channel) {
Expand Down
8 changes: 4 additions & 4 deletions AmqpMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
class AmqpMessage implements PsrMessage
{
/**
* @var string|null
* @var string
*/
private $body;

Expand Down Expand Up @@ -51,7 +51,7 @@ class AmqpMessage implements PsrMessage
* @param array $properties
* @param array $headers
*/
public function __construct($body = null, array $properties = [], array $headers = [])
public function __construct($body = '', array $properties = [], array $headers = [])
{
$this->body = $body;
$this->properties = $properties;
Expand All @@ -60,15 +60,15 @@ public function __construct($body = null, array $properties = [], array $headers
}

/**
* @return null|string
* @return string
*/
public function getBody()
{
return $this->body;
}

/**
* @param string|null $body
* @param string $body
*/
public function setBody($body)
{
Expand Down
20 changes: 20 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
The MIT License (MIT)
Copyright (c) 2017 Paul McLaren

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is furnished
to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
14 changes: 14 additions & 0 deletions Tests/Spec/AmqpConnectionFactoryTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

namespace Enqueue\AmqpLib\Tests\Spec;

use Enqueue\AmqpLib\AmqpConnectionFactory;
use Interop\Queue\Spec\PsrConnectionFactorySpec;

class AmqpConnectionFactoryTest extends PsrConnectionFactorySpec
{
protected function createConnectionFactory()
{
return new AmqpConnectionFactory();
}
}
24 changes: 24 additions & 0 deletions Tests/Spec/AmqpContextTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php
namespace Enqueue\AmqpLib\Tests\Spec;

use Enqueue\AmqpLib\AmqpContext;
use Interop\Queue\Spec\PsrContextSpec;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AbstractConnection;

class AmqpContextTest extends PsrContextSpec
{
protected function createContext()
{
$channel = $this->createMock(AMQPChannel::class);

$con = $this->createMock(AbstractConnection::class);
$con
->expects($this->any())
->method('channel')
->willReturn($channel)
;

return new AmqpContext($con);
}
}
17 changes: 17 additions & 0 deletions Tests/Spec/AmqpMessageTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

namespace Enqueue\AmqpLib\Tests\Spec;

use Enqueue\AmqpLib\AmqpMessage;
use Interop\Queue\Spec\PsrMessageSpec;

class AmqpMessageTest extends PsrMessageSpec
{
/**
* {@inheritdoc}
*/
protected function createMessage()
{
return new AmqpMessage();
}
}
14 changes: 14 additions & 0 deletions Tests/Spec/AmqpQueueTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

namespace Enqueue\AmqpLib\Tests\Spec;

use Enqueue\AmqpLib\AmqpQueue;
use Interop\Queue\Spec\PsrQueueSpec;

class AmqpQueueTest extends PsrQueueSpec
{
protected function createQueue()
{
return new AmqpQueue(self::EXPECTED_QUEUE_NAME);
}
}
Loading

0 comments on commit f8be56a

Please sign in to comment.