Last active
September 2, 2019 11:05
-
-
Save zhouyl/764cbaf6414dd96bee9bf35dde434a75 to your computer and use it in GitHub Desktop.
pcntl multi process
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
if (!function_exists('pcntl_fork')) { | |
exit('请重新安装编译 PHP,开启 --enable-pcntl 扩展!'); | |
} | |
if (!class_exists('Redis')) { | |
exit('请安装扩展: pecl install redis'); | |
} | |
$redis = new Redis; | |
try { | |
$redis->connect('127.0.0.1', 6379); | |
} catch (Exception $e) { | |
exit('连接 redis 失败: ' . $e->getMessage()); | |
} | |
if (stripos($redis->ping(), 'PONG') === false) { | |
exit('redis 服务未运行!'); | |
} | |
function writelog($message) | |
{ | |
echo sprintf("[%s]%s\n", date('H:i:s'), $message); | |
} | |
$workers = 3; // 进程数 | |
pcntl_async_signals(true); // 启用异步信号处理 | |
interface Runner | |
{ | |
public function run(); | |
} | |
abstract class Worker implements Runner | |
{ | |
public function onSignal($signals, $callback) | |
{ | |
if (!is_array($signals)) { | |
$signals = [$signals]; | |
} | |
foreach ($signals as $signo) { | |
pcntl_signal($signo, function () use ($signo, $callback) { | |
return $callback($signo); | |
}); | |
} | |
return $this; | |
} | |
abstract public function run(); | |
} | |
class SubWorker extends Worker | |
{ | |
private $pid = 0; | |
private $workerId; | |
private $stop = false; | |
public function __construct($workerId) | |
{ | |
$this->workerId = $workerId; | |
$this->pid = posix_getpid(); | |
} | |
protected function wait() | |
{ | |
while (!$this->stop) { | |
$secs = mt_rand(100000, 999999); | |
writelog("子进程 - 即将睡眠 $secs 微秒 ... [$this->workerId][pid: $this->pid]"); | |
usleep($secs); | |
} | |
} | |
public function run() | |
{ | |
$this->onSignal( | |
[SIGTERM, SIGINT, SIGHUP, SIGQUIT], | |
function ($signo) { | |
$this->stop = true; | |
writelog("子进程 - 接收到信号 [$this->workerId][pid: $this->pid, signo=$signo]"); | |
} | |
); | |
$this->wait(); | |
exit; | |
} | |
} | |
class MasterWorker extends Worker | |
{ | |
private $workers; | |
private $pids = []; | |
public function __construct($workers = 5) | |
{ | |
$this->workers = (int) $workers; | |
} | |
protected function wait() | |
{ | |
while (count($this->pids) > 0) { | |
foreach ($this->pids as $workerId => $pid) { | |
$signo = pcntl_waitpid($pid, $status, WNOHANG); | |
if ($signo === -1 || $signo > 0) { | |
writelog("主进程 - 监测到子进程已退出 [workerId=$workerId, pid=$pid, signo=$signo]"); | |
unset($this->pids[$workerId]); | |
} | |
} | |
sleep(1); | |
} | |
} | |
// 以守护进程模式运行 | |
public function daemon() | |
{ | |
$pid = pcntl_fork(); | |
if ($pid > 0) { | |
exit; // 退出当前进程 | |
} elseif ($pid === -1) { | |
exit('服务启动失败!'); | |
} else { | |
// 子进程脱离用户终端控制,成为新的 session leader | |
posix_setsid(); | |
} | |
return $this; | |
} | |
public function run() | |
{ | |
$this->pids = []; | |
for ($workerId = 1; $workerId <= $this->workers; $workerId++) { | |
$pid = pcntl_fork(); | |
if ($pid === -1) { | |
writelog("创建子进程失败! [workerId=$workerId]"); | |
} elseif ($pid === 0) { | |
(new SubWorker($workerId))->run(); | |
} else { | |
writelog("创建子进程成功! [workerId=$workerId, pid=$pid]"); | |
$this->pids[$workerId] = $pid; | |
} | |
} | |
$this->onSignal( | |
[SIGTERM, SIGINT, SIGHUP, SIGQUIT], | |
function ($signo) { | |
writelog('主进程 - 接收到信号 [pid=' . posix_getpid() . ", signo=$signo]"); | |
// 向子进程发送同样的信号 | |
foreach ($this->pids as $workerId => $pid) { | |
posix_kill($pid, $signo); | |
} | |
} | |
); | |
$this->wait(); | |
writelog('所有子进程已退出!'); | |
return $this; | |
} | |
} | |
(new MasterWorker($workers))->daemon()->run(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment