phpPthread多线程(六)Pool类线程池Pool对象是多个Worker对象的容器,同时也是它们的控制器,对Worker功能更⾼抽象。
⽐如Worker是河,⽽线程是运⾏在河⾥的船。Pool则是管理着多条河。
<?php
//继承Collectable垃圾收集类,好让Pool::collect进⾏收集
class Sql extends Collectable {
private $sql = '';
private $data = array();
public function __construct($sql) {
$this->sql = $sql;
}
public function run() {
$db = $this->worker->getDb();
$res = mysql_query($this->sql, $db);
$tmp = array();
while($row = mysql_fetch_assoc($res)) {
//这⾥不能使⽤$this->data[] = $row;这种⽅式。
$tmp[] = $row;
}
$this->data = $tmp;
//这⾥⼯作完后,设置为垃圾
//在Pool::collect中isGarbage()判断时则为真
$this->setGarbage();
}
public function getData() {
return $this->data;
}
}
class SqlWorker extends Worker {
protected static $db = null;
public function getDb() {
if(!self::$db) {
self::$db = mysql_connect('127.0.0.1', 'root', '');
mysql_select_db('test', self::$db);
}
return self::$db;
}
}
//这⾥创建5个Worker对象的Pool线程池
$pool = new Pool(5, 'SqlWorker');
//我们创建20个Sql线程对象,并提交到Pool的Worker中
$sqls = array();
for($ix = 0; $ix < 20; ++$ix) {
$sql = new Sql("select * from test order by id limit {$ix},1");
$sqls[] = $sql;
/
/$pool->submit($sql);不要在这⾥submit
}
//注意,这⾥循环提交$sql
//如果把$pool->submit放到前⾯的for循环内,会出现⼀个错误
//第⼀个Sql对象的sql语句会跟最后⼀个相同,导致结果出现问题
foreach($sqls as $sql) {
//这⾥的submit⽅法有问题,它会修改$sqls
//导致第⼀个Sql对象与最后⼀个相同
//不知是不是BUG
$pool->submit($sql);
}
/
/等待队列中执⾏完成
$pool->shutdown();
$ret = array();
foreach($sqls as $sql) {
$ret[] = $sql->getData();
}
file_put_contents('', var_export($ret, true));
//回收已完成的对象
$pool->collect(function($sql){
return $sql->isGarbage();
});
0 =>
array (
0 =>
array (
'id' => '20',
'name' => 'mmm',    ),
),
1 =>
array (
0 =>
array (
'id' => '2',
'name' => '222',    ),
),
2 =>
array (
0 =>
array (
'id' => '3',
'name' => '333',    ),
),
3 =>
array (
0 =>
array (
'id' => '4',
'name' => '444',    ),
),
4 =>
array (
0 =>
array (
'id' => '5',
'name' => '555',    ),
),
5 =>
array (
0 =>
array (
'id' => '6',
'name' => '666',    ),
),
6 =>
array (
0 =>
array (
'id' => '7',
'name' => '777',    ),
),
7 =>
array (
0 =>
array (
'id' => '8',
'name' => '888',    ),
)
,
8 =>
array (
0 =>
array (
'id' => '9',
'name' => '999',    ),
),
9 =>
array (
0 =>
array (
'id' => '10',
'name' => 'aaa',    ),
),
10 =>
array (
0 =>
'id' => '11',
'name' => 'bbb',
),
),
11 =>
array (
0 =>
array (
'id' => '12',
'name' => 'ccc',
),
),
12 =>
array (
0 =>
array (
'id' => '13',
'name' => 'ddd',
),php支持多线程吗
),
13 =>
array (
0 =>
array (
'id' => '14',
'name' => 'eee',
)
,
),
14 =>
array (
0 =>
array (
'id' => '15',
'name' => 'fff',
),
),
15 =>
array (
0 =>
array (
'id' => '16',
'name' => 'ggg',
),
),
16 =>
array (
0 =>
array (
'id' => '17',
'name' => 'vvv',
),
),
17 =>
array (
0 =>
array (
'id' => '18',
'name' => 'hhh',
),
)
,
18 =>
array (
0 =>
array (
'id' => '19',
'name' => 'nnn',
),
),
19 =>
array (
0 =>
array (
'id' => '20',
'name' => 'mmm',
),
),
)
从结果可以看出,第⼀条记录跟最后⼀条是相同的,再没有pool->submit之前$sqls数组中的对象都是正确的,submit之后第⼀个对象的数据就改变了,不知道是不是pthreads的BUG。
上述代码我们通过创建⼀个包含5个SqlWorker对象的pool,然后创建20个Sql对象加⼊到pool中。
当然我们的Sql类并不⼀定⾮要继承⾃Collectable类,我们也可⾃定义判断什么时候可回收。<?php
//继承Threaded类,Threaded提供了隐式的线程安全机制
//这个对象中的所有操作都是线程安全的
class MyWork extends Threaded {
private $name = '';
private $do = false;
private $data = '';
public function __construct($name) {
$this->name = $name;
}
public function run() {
$this->data = "{$this->name} in thread [" . $this->worker->getName() . "] \r\n";
//通过do来判断是否完成
/
/如果为true,则让Pool::collect回收
$this->do = true;
}
public function isDo() {
return  $this->do;
}
public function getData() {
return $this->data;
}
}
class MyWorker extends Worker {
public static $name = 0;
public function __construct() {
self::$name++;
}
public function run() {
}
public function getName() {
return self::$name;
}
}
$pool = new Pool(5, 'MyWorker');
$works = array();
for($ix = 0; $ix < 20; ++$ix) {
$work = new MyWork($ix);
$works[] = $work;
}
foreach($works as $work) {
$pool->submit($work);
}
$pool->shutdown();
foreach($works as $work) {
echo $work->getData();
}
//回收已完成的对象
$pool->collect(function($work){
//我们通过⾃定义函数isDo来判断对象是否执⾏完毕
return $work->isDo();
});
执⾏结果如下:
in thread [5]
in thread [2]
in thread [3]
in thread [4]
in thread [5]
in thread [1]
in thread [2]
in thread [3]
in thread [4]
in thread [5]
in thread [1]
in thread [2]
in thread [3]
in thread [4]
in thread [5]
in thread [1]
in thread [2]
in thread [3]
in thread [4]
in thread [5]
第⼀条记录为什么会有问题,前⾯我已经说过了。这⾥我们看20个MyWork对象,它们顺序的加⼊到5个MyWorker对象中,如果第⼀条记录没有问题的话,它们分别加⼊到1,2,3,4,5的MyWorker中,然后远⾏run⽅法。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。