Last active
March 15, 2019 14:56
-
-
Save lyrixx/4a51e881da89d7bb54bc72d6a081df8c to your computer and use it in GitHub Desktop.
PHP async queue (Process (or not))
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
class Queue | |
{ | |
private $size; | |
private $onJobStart; | |
private $isJobRunning; | |
private $onJobFinish; | |
private $queued; | |
private $running; | |
public function __construct(callable $onJobStart, callable $isJobRunning, callable $onJobFinish, int $size = 5) | |
{ | |
$this->onJobStart = $onJobStart; | |
$this->isJobRunning = $isJobRunning; | |
$this->onJobFinish = $onJobFinish; | |
$this->size = $size; | |
$this->queued = new \SplObjectStorage(); | |
$this->running = new \SplObjectStorage(); | |
} | |
public function add($job, $data) | |
{ | |
$this->queued->attach($job, $data); | |
} | |
public function run() | |
{ | |
loop: | |
if (!$this->queued->count() && !$this->running->count()) { | |
return; | |
} | |
$this->sortQueue(); | |
foreach ($this->running as $job) { | |
$data = $this->running[$job]; | |
if (!($this->isJobRunning)($job, $this, $data)) { | |
$this->running->detach($job); | |
($this->onJobFinish)($job, $this, $data); | |
} | |
} | |
usleep(10000); | |
goto loop; | |
} | |
private function sortQueue() | |
{ | |
while ($this->running->count() < $this->size && $this->queued->count()) { | |
$this->queued->rewind(); | |
$job = $this->queued->current(); | |
$data = $this->queued[$job]; | |
$this->queued->detach($job); | |
$this->running->attach($job, $data); | |
($this->onJobStart)($job, $this, $data); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment