Swoole从⼊门到⼊⼟(23)——多进程[进程池ProcessPool] Swoole提供的进程池为Process\Pool,基于 Swoole\Server 的 Manager 管理进程模块实现。可管理多个⼯作进程。该模块的核⼼功能为进程管理,相⽐ Process 实现多进程,Process\Pool 更加简单,封装层次更⾼,开发者⽆需编写过多代码即可实现进程管理功能,配合
Co\Server 可以创建纯协程风格的,能利⽤多核 CPU 的服务端程序。同样,我们同过⼀段代码引出本节的详细讨论:
$workerNum = 10;
$pool = new Swoole\Process\Pool($workerNum);
$pool->on("WorkerStart", function ($pool, $workerId) {
echo "Worker#{$workerId} is started\n";
$redis = new Redis();
$redis->pconnect('127.0.0.1', 6379);
$key = "key1";
while (true) {
$msg = $redis->brpop($key, 2);
if ( $msg == null) continue;
var_dump($msg);
}
});
$pool->on("WorkerStop", function ($pool, $workerId) {
echo "Worker#{$workerId} is stopped\n";
});
$pool->start();
现在让我们⼀起了解,这个类为我们提供了哪些成员。
常量
成员函数
1) __construct():构造⽅法
Swoole\Process\Pool::__construct(int $worker_num, int $ipc_type = 0, int $msgqueue_key = 0, bool $enable_coroutine = false);
$worker_num:⼯作进程的数量(当⼀个进程退出后,Pool会及时拉取另⼀个进程进⾏补充)
$ipc_type:进程间通信的模式【默认为 0 表⽰不使⽤任何进程间通信特性】,这个参数需要注意如下事项:
- 设置为 0 时必须设置 onWorkerStart 回调,并且必须在 onWorkerStart 中实现循环逻辑,当 onWorker
Start 函数退出时⼯作进程会⽴即退出,之后会由 Manager 进程重新拉起进程;
- 设置为 SWOOLE_IPC_MSGQUEUE 表⽰使⽤系统消息队列通信,可设置 $msgqueue_key 指定消息队列的 KEY,未设置消息队列 KEY,将申请私有队列;
- 设置为 SWOOLE_IPC_SOCKET 表⽰使⽤ Socket 进⾏通信,需要使⽤ listen ⽅法指定监听的地址和端⼝;
- 设置为 SWOOLE_IPC_UNIXSOCK 表⽰使⽤ unixSocket 进⾏通信,协程模式下使⽤,强烈推荐⽤此种⽅式进程间通讯;
- 使⽤⾮ 0 设置时,必须设置 onMessage 回调,onWorkerStart 变更为可选。
$msgqueue_key:消息列队的key
$enable_coroutine:是否开启协程⽀持【使⽤协程后将⽆法设置 onMessage 回调】
⽰例1:
$pool = new Swoole\Process\Pool(1, SWOOLE_IPC_NONE, 0, true);
$pool->on('workerStart', function (Swoole\Process\Pool $pool, int $workerId) {
while (true) {
Co::sleep(0.5);
echo "hello world\n";
}
});
$pool->start();
⽰例2:开启协程后 Swoole 会禁⽌设置 onMessage 事件回调,需要进程间通讯的话需要将第⼆个设置为 SWOOLE_IPC_UNIXSOCK 表⽰使⽤ unixSocket 进⾏通信,然后使⽤ $pool->getProcess()->exportSocket() 导出 Coroutine\Socket 对象,实现 Worker 进程间通信。
进程通信方式$pool = new Swoole\Process\Pool(2, SWOOLE_IPC_UNIXSOCK, 0, true);
$pool->on('workerStart', function (Swoole\Process\Pool $pool, int $workerId) {
$process = $pool->getProcess(0);
$socket = $process->exportSocket();
if ($workerId == 0) {
echo$socket->recv();
$socket->send("hello proc1\n");
echo "proc0 stop\n";
} else {
$socket->send("hello proc0\n");
echo$socket->recv();
echo "proc1 stop\n";
$pool->shutdown();
}
});
$pool->start();
2) set():设置参数
Swoole\Process\Pool->set(array$settings)
可以使⽤ enable_coroutine 来控制是否启⽤协程,和构造函数的第四个参数作⽤⼀致。
Swoole\Process\Pool->set(['enable_coroutine' => true]);
3) on():设置进程池回调函数
Swoole\Process\Pool->on(string$event, callable $function);
$event:指定事件
$function:回调函数
事件说明:
- onWorkerStart:⼦进程启动,回调函数格式:
function onWorkerStart(Swoole\Process\Pool $pool, int $workerId) {
echo "Worker#{$workerId} is started\n";
}
- onWorkerStop:⼦进程结束,回调函数同onWorkerStart。
- onMessage:消息接收,收到外部投递的消息。⼀次连接只能投递⼀次消息,类似于 PHP-FPM 的短连接机制。回调函数格式如下:function onMessage(Swoole\Process\Pool $pool, string$data) {
var_dump($data);
}
4) listen():监听 SOCKET,必须在 $ipc_mode = SWOOLE_IPC_SOCKET 时才能使⽤。
Swoole\Process\Pool->listen(string$host, int $port = 0, int $backlog = 2048): bool
$host:监听的地址【⽀持 TCP 和 unixSocket 两种类型。127.0.0.1 表⽰监听 TCP 地址,需要指定 $port。unix:/tmp/php.sock 监听unixSocket 地址。】
$port:监听的端⼝【在 TCP 模式下需要指定。】
$backlog:监听的队列长度
返回值:成功监听返回 true、监听失败返回 false,可调⽤ swoole_errno 获取错误码。监听失败后,调⽤ start 时会⽴即返回 false。
注意:向监听端⼝发送数据时,客户端必须在请求前增加 4 字节、⽹络字节序的长度值。协议格式为:packet = htonl(strlen(data)) + data;
⽰例:
$pool->listen('127.0.0.1', 8089);
$pool->listen('unix:/tmp/php.sock');
5) write():向对端写⼊数据,必须在 $ipc_mode 为 SWOOLE_IPC_SOCKET 时才能使⽤。
Swoole\Process\Pool->write(string$data): bool
$data:写⼊的数据内容【可多次调⽤ write,底层会在 onMessage 函数退出后将数据全部写⼊ socket 中,并 close 连接】
说明:此⽅法为内存操作,没有 IO 消耗,发送数据操作是同步阻塞 IO
⽰例:
/**** 服务端 ****/
$pool = new Swoole\Process\Pool(2, SWOOLE_IPC_SOCKET);
$pool->on("Message", function ($pool, $message) {
echo "Message: {$message}\n";
$pool->write("hello ");
$pool->write("world ");
$pool->write("\n");
});
$pool->listen('127.0.0.1', 8089);
$pool->start();
/**** 客户端 ****/
$fp = stream_socket_client("tcp://127.0.0.1:8089", $errno, $errstr) or die("error: $errstr\n");
$msg = json_encode(['data' => 'hello', 'uid' => 1991]);
fwrite($fp, pack('N', strlen($msg)) . $msg);
sleep(1);
//将显⽰ hello world\n
$data = fread($fp, 8192);
var_dump(substr($data, 4, unpack('N', substr($data, 0, 4))[1]));
fclose($fp);
6) start():启动⼯作进程
Swoole\Process\Pool->start(): bool
说明1:
启动成功,当前进程进⼊ wait 状态,管理⼯作进程;
启动失败,返回 false,可使⽤ swoole_errno 获取错误码。
说明2:关于进程管理:
- 某个⼯作进程遇到致命错误、主动退出时管理器会进⾏回收,避免出现僵⼫进程
- ⼯作进程退出后,管理器会⾃动拉起、创建⼀个新的⼯作进程
- 主进程收到 SIGTERM 信号时将停⽌ fork 新进程,并 kill 所有正在运⾏的⼯作进程
- 主进程收到 SIGUSR1 信号时将将逐个 kill 正在运⾏的⼯作进程,并重新启动新的⼯作进程
说明3:信号处理:
底层仅设置了主进程(管理进程)的信号处理,并未对 Worker ⼯作进程设置信号,需要开发者⾃⾏实现信号的监听。
- ⼯作进程为异步模式,请使⽤ Swoole\Process::signal 监听信号
- ⼯作进程为同步模式,请使⽤ pcntl_signal 和 pcntl_signal_dispatch 监听信号
在⼯作进程中应当监听 SIGTERM 信号,当主进程需要终⽌该进程时,会向此进程发送 SIGTERM 信号。如果⼯作进程未监听 SIGTERM 信号,底层会强⾏终⽌当前进程,造成部分逻辑丢失。
⽰例:
$pool->on("WorkerStart", function ($pool, $workerId) {
$running = true;
pcntl_signal(SIGTERM, function () use (&$running) {
$running = false;
});
echo "Worker#{$workerId} is started\n";
$redis = new Redis();
$redis->pconnect('127.0.0.1', 6379);
$key = "key1";
while ($running) {
$msg = $redis->brpop($key);
pcntl_signal_dispatch();
if ( $msg == null) continue;
var_dump($msg);
}
});
7) shutdown():终⽌⼯作进程
Swoole\Process\Pool->shutdown(): bool
8) getProcess():获取当前⼯作进程对象。返回 Swoole\Process 对象。
Swoole\Process\Pool->getProcess(int $worker_id): Swoole\Process;
$worker_id:指定获取 worker 【可选参数,默认当前 worker】
注意:
必须在 start 之后,在⼯作进程的 onWorkerStart 或其他回调函数中调⽤;
返回的 Process 对象是单例模式,在⼯作进程中重复调⽤ getProcess() 将返回同⼀个对象。
⽰例:
$workerNum = 10;
$pool = new Swoole\Process\Pool($workerNum);
$pool->on("WorkerStart", function ($pool, $workerId) {
$process = $pool->getProcess();
$process->exec("/bin/sh", ["ls", '-l']);
});
$pool->on("WorkerStop", function ($pool, $workerId) {
echo "Worker#{$workerId} is stopped\n";
});
$pool->start();
Swoole提供的进程池成员简练,使⽤⽅便。以上就是进程池的所有内容,下⼀节我们将讨论进程管理器Manager:)
--------------------------- 我是可爱的分割线 ----------------------------
最后博主借地宣传⼀下,招新了,这是⼀个⾯向漳州青少年信息学/软件设计的学习⼩组,有意向的同学点击链接,联系我吧。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论