Open
Description
Instead of db query each timeout seconds
my sux version
<?php
namespace application\modules\queue\components\drivers;
use yii\queue\db\Queue;
class Db extends Queue
{
public $listener = 'queue';
public function run($repeat, $timeout = 0)
{
if(!$repeat) {
return parent::run($repeat, $timeout);
}
$this->db->createCommand("LISTEN {$this->listener}")->execute();
return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) {
$first = true;
while($canContinue()) {
if(!$first) {
$notify = $this->db->pdo->pgsqlGetNotify();
if(!$notify) {
sleep($timeout);
continue;
}
}
$first = false;
while(true) {
if ($payload = $this->reserve() and $this->handleMessage(
$payload['id'],
$payload['job'],
$payload['ttr'],
$payload['attempt']
)) {
$this->release($payload);
continue;
}
break;
}
}
});
}
public function push($job)
{
$id = parent::push($job);
$this->notify($id);
return $id;
}
public function notify($payload)
{
$payload = serialize($payload);
$this->db->createCommand("NOTIFY {$this->listener}, '$payload'")->execute();
}
}