Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Rate Limiting to Procedure Registrations #166

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
60 changes: 60 additions & 0 deletions src/Thruway/Common/LeakyBucket.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?php

namespace Thruway\Common;

use SplQueue;

/**
* Description of LeakyBucket
*
* @author Binaek
*/
class LeakyBucket
{

protected $maxRate;
protected $minTime;
//holds time of last action (past or future!)
protected $lastSchedAction;

public function __construct($maxRatePerSecond = -1)
{
$this->maxRate = -1;
$this->setMaxRate($maxRatePerSecond);
$this->lastSchedAction = time() - $this->minTime;
}

public function setMaxRate($maxRatePerSecond)
{
if ($maxRatePerSecond > 0.0) {
$this->maxRate = $maxRatePerSecond;
//milliseconds between successive calls
$this->minTime = (int) (1000.0 / $maxRatePerSecond);
}
}

public function canConsume()
{
return ($this->getTimeLeft() <= 0);
}

public function getTimeLeft()
{
$timeLeft = 0;
if ($this->maxRate > 0) {
//we are rate limited
$curTime = time();
//calculate when can we send back
$timeLeft = $this->lastSchedAction + $this->minTime - $curTime;
}
return $timeLeft;
}

public function consume()
{
if ($this->canConsume()) {
$this->lastSchedAction = time();
}
}

}
31 changes: 29 additions & 2 deletions src/Thruway/Procedure.php
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,35 @@ private function getNextRandomRegistration()
//just return this so that we don't have to run mt_rand
return $this->registrations[0];
}
//mt_rand is apparently faster than array_rand(which uses the libc generator)
return $this->registrations[mt_rand(0, count($this->registrations) - 1)];

//getting registrations with 0 unprocessed calls
$possibleRegistrations = array_filter($this->registrations, function($theRegistration, $theIndex) {
return ($theRegistration->getStatistics()['invokeQueueCount'] === 0);
}, ARRAY_FILTER_USE_BOTH);

if (count($possibleRegistrations) > 0) {
//return a random from this set
return $possibleRegistrations[mt_rand(0, count($possibleRegistrations) - 1)];
} else if (count($possibleRegistrations) === 1) {
return $possibleRegistrations[0];
} else {
//create a copy to maintain the original indexing
$possibleRegistrations = array_merge([], $this->registrations);
//sort ascending by number of unprocessed Invocations
usort($possibleRegistrations, function($registrationA, $registrationB) {
$unprocessedA = $registrationA->getStatistics()['invokeQueueCount'];
$unprocessedB = $registrationB->getStatistics()['invokeQueueCount'];
if ($unprocessedA == $unprocessedB) {
return 0;
} else if ($unprocessedA < $unprocessedB) {
return -1;
} else {
return 1;
}
});
//return the first one
return $possibleRegistrations[0];
}
}

private function getNextRoundRobinRegistration()
Expand Down
99 changes: 94 additions & 5 deletions src/Thruway/Registration.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Thruway;

use Thruway\Common\Utils;
use Thruway\Common\LeakyBucket;
use Thruway\Message\ErrorMessage;
use Thruway\Message\RegisterMessage;

Expand All @@ -19,6 +20,21 @@ class Registration
*/
private $id;

/**
* @var int
*/
private $limit;

/**
* @var LeakyBucket
*/
private $leakyBucket;

/**
* @var \SplQueue
*/
private $invokeQueue;

/**
* @var \Thruway\Session
*/
Expand Down Expand Up @@ -96,12 +112,17 @@ class Registration
*/
private $completedCallTimeTotal;

/**
* @bool
*/
private $rateLimited;

const SINGLE_REGISTRATION = 'single';
const THRUWAY_REGISTRATION = '_thruway';
const ROUNDROBIN_REGISTRATION = 'roundrobin';
const RANDOM_REGISTRATION = 'random';
const FIRST_REGISTRATION = 'first';
const LAST_REGISTRATION = 'last';
const THRUWAY_REGISTRATION = '_thruway';

/**
* Constructor
Expand All @@ -127,6 +148,11 @@ public function __construct(Session $session, $procedureName)
$this->lastIdledAt = $this->registeredAt;
$this->busyStart = null;
$this->completedCallTimeTotal = 0;

//no throtling by default
$this->limit = -1;
$this->leakyBucket = new LeakyBucket();
$this->rateLimited = false;
}

/**
Expand Down Expand Up @@ -154,6 +180,11 @@ public static function createRegistrationFromRegisterMessage(Session $session, R
$registration->setInvokeType(Registration::SINGLE_REGISTRATION);
}
}
if (isset($options->_limit) && settype($options->_limit, "integer")) {
$registration->setLimit($options->_limit);
} else {
$registration->setLimit(-1); //setting to UNLIMITED
}

return $registration;
}
Expand Down Expand Up @@ -210,14 +241,35 @@ public function setInvokeType($type)
if ($type !== Registration::SINGLE_REGISTRATION) {
$this->invokeType = $type;
$this->setAllowMultipleRegistrations(true);
}
else {
} else {
$this->invokeType = Registration::SINGLE_REGISTRATION;
$this->setAllowMultipleRegistrations(false);
}
}
}

/**
* @param int $limit The number of calls allowed per second
*/
public function setLimit($limit)
{
$this->limit = $limit;
if ($limit > 0) {
$this->rateLimited = true;
$this->invokeQueue = new \SplQueue();
$this->leakyBucket = new Common\LeakyBucket($this->limit);
}
}

/**
* Get the Limit per second on this registrations
* @return int
*/
public function getLimit()
{
return $this->limit;
}

/**
* Process call
*
Expand All @@ -244,8 +296,44 @@ public function processCall(Call $call)
}
$this->invocationCount++;
$this->lastCallStartedAt = new \DateTime();
if ($this->rateLimited) {
if ($this->leakyBucket->canConsume()) {
$this->leakyBucket->consume();
$this->getSession()->sendMessage($call->getInvocationMessage());
} else {
$this->invokeQueue->enqueue($call->getInvocationMessage());
if ($this->invokeQueue->count() === 1) {
//start the timer if I am the first addition to the queue
$this->session->getLoop()->addTimer($this->leakyBucket->getTimeLeft() / 1000, $this);
}
}
} else {
$this->getSession()->sendMessage($call->getInvocationMessage());
}
}

/**
* Get whether rate limited
*
* @return bool
*/
public function isRateLimited(){
return $this->rateLimited;
}

$this->getSession()->sendMessage($call->getInvocationMessage());
/**
* Process Invocation Queue
*
* Using __invoke magic method
*
* @param none
*/
public function __invoke()
{
$this->getSession()->sendMessage($this->invokeQueue->dequeue());
if ($this->invokeQueue->count() > 0) {
$this->session->getLoop()->addTimer($this->leakyBucket->getTimeLeft() / 1000, $this);
}
}

/**
Expand Down Expand Up @@ -386,7 +474,8 @@ public function getStatistics()
'busyStart' => $this->busyStart,
'lastIdledAt' => $this->lastIdledAt,
'lastCallStartedAt' => $this->lastCallStartedAt,
'completedCallTimeTotal' => $this->completedCallTimeTotal
'completedCallTimeTotal' => $this->completedCallTimeTotal,
'invokeQueueCount' => $this->rateLimited ? $this->invokeQueue->count() : 0
];
}

Expand Down
42 changes: 36 additions & 6 deletions tests/Unit/RegistrationTest.php
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
<?php

require_once __DIR__ . '/../bootstrap.php';

class RegistrationTest extends PHPUnit_Framework_TestCase
class RegistrationTest extends Thruway\TestCase
{

/**
* @var \Thruway\Session
*/
private $_calleeSession;

/**
* @var \Thruway\Session
*/
private $_callerSession;

/**
* @var \Thruway\Registration
*/
Expand All @@ -20,7 +24,7 @@ public function setup()
{
$this->_calleeSession = new \Thruway\Session(new \Thruway\Transport\DummyTransport());
$this->_callerSession = new \Thruway\Session(new \Thruway\Transport\DummyTransport());
$this->_registration = new \Thruway\Registration($this->_calleeSession, 'test_procedure');
$this->_registration = new \Thruway\Registration($this->_calleeSession, 'test_procedure');
}

public function testMakingCallIncrementsCallCount()
Expand All @@ -30,9 +34,7 @@ public function testMakingCallIncrementsCallCount()
$this->assertEquals(0, $this->_registration->getCurrentCallCount());

$callMsg = new \Thruway\Message\CallMessage(
\Thruway\Common\Utils::getUniqueId(),
new \stdClass(),
'test_procedure'
\Thruway\Common\Utils::getUniqueId(), new \stdClass(), 'test_procedure'
);


Expand All @@ -42,8 +44,35 @@ public function testMakingCallIncrementsCallCount()
$this->_registration->processCall($call);

$this->assertEquals(1, $this->_registration->getCurrentCallCount());
}

public function testRateLimitedRegistration()
{
$procedure = new \Thruway\Procedure('rate.limit.procedure');

$callerSession = new \Thruway\Session(new Thruway\Transport\DummyTransport());
$calleeSession = new \Thruway\Session(new Thruway\Transport\DummyTransport());

$calleeSession->setLoop(\React\EventLoop\Factory::create());

$throttledRegisterMsg = new \Thruway\Message\RegisterMessage(
\Thruway\Common\Utils::getUniqueId(), [ "_limit" => 1], 'rate.limit.procedure'
);

$procedure->processRegister($calleeSession, $throttledRegisterMsg);

$this->assertEquals(1, count($procedure->getRegistrations()));

$this->assertTrue($procedure->getRegistrations()[0]->isRateLimited());

$callMsg = new \Thruway\Message\CallMessage(
\Thruway\Common\Utils::getUniqueId(), new \stdClass(), 'rate.limit.procedure'
);
$call = new \Thruway\Call($callerSession, $callMsg, $procedure);

$procedure->getRegistrations()[0]->processCall($call);

$this->assertEquals(1, $procedure->getRegistrations()[0]->getStatistics()['invokeQueueCount']);
}

/**
Expand All @@ -53,4 +82,5 @@ public function testRemoveCall()
{
$this->assertTrue(true);
}
}

}