原文链接:https://www.cnblogs.com/jkko123/tag/php%E7%9A%84%E5%A4%9A%E7%BA%BF%E7%A8%8BPthread/
线程,有时称为轻量级进程,是程序执行的最小单元。线程是进程中的一个实体,是被系统独立调度和分派的基本单位,线程自己不拥有系统资源,它与同属一个进程的其它线程共享进程所拥有的全部资源。一个线程可以创建和撤消另一个线程,同一进程中的多个线程之间可以并发执行。每一个程序都至少有一个线程,那就是程序本身,通常称为主线程。线程是程序中一个单一的顺序控制流程。 在单个程序中同时运行多个线程完成不同的工作,称为多线程。
一、基本试用
<?php
//实现多线程必须继承Thread类
class test extends Thread {
public function __construct($arg){
$this->arg = $arg;
}
//当调用start方法时,该对象的run方法中的代码将在独立线程中异步执行。
public function run(){
if($this->arg){
printf("Hello %s\n", $this->arg);
}
}
}
$thread = new test("World");
if($thread->start()) {
//join方法的作用是让当前主线程等待该线程执行完毕
//确认被join的线程执行结束,和线程执行顺序没关系。
//也就是当主线程需要子线程的处理结果,主线程需要等待子线程执行完毕
//拿到子线程的结果,然后处理后续代码。
$thread->join();
}
?>
二、Worker和Threaded
Worker对象与Threaded对象的关系有点像是,船在河中运行,一条河里有很多条船,而河也不止一条。不同的船运行在特定的环境下,比如大吨位的船是无法运行在河床浅的河中。船是可以随时变化的,而河的环境确相对持久。我们通过stack方法把对象加入到Worker中,在对象的run方法中通过对worker的访问来获取信息。
<?php
//Worker是具有持久化上下文(执行环境)的线程对象
//Worker对象start()后,会执行run()方法,run()方法执行完毕,线程也不会消亡
class MySqlWorker extends Worker {
private $name = '';
private $db = null;
public function __construct($name) {
$this->name = $name;
}
public function run() {
$this->db = mysql_connect('127.0.0.1', 'root', '');
mysql_select_db('test', $this->db);
}
public function getDb() {
return $this->db;
}
}
//Stackable是Threaded的一个别称,直到pthreads v.2.0.0
class Query extends Threaded {
private $sql = '';
private $data = array();
public function __construct($sql) {
$this->sql = $sql;
}
public function run() {
//访问线程工作对象
$db = $this->worker->getDb();
$res = mysql_query($this->sql, $db);
$tmp = array();
while($row = mysql_fetch_assoc($res)) {
//这里不能使用$this->data[] = $row;这种方式。
$tmp[] = $row;
}
$this->data = $tmp;
}
public function getData() {
return $this->data;
}
}
$mysqlWork = new MySqlWorker('mysqlWork');
$query1 = new Query('select * from test order by id limit 0,2');
$query2 = new Query('select * from test order by id limit 2,2');
//通过Worker的stack方法,我们把对象加入到Worker中
//会激活Worker执行对象的run()方法。
//说白了就是会执行$query1,$query2的run()方法。
$mysqlWork->stack($query1);
$mysqlWork->stack($query2);
$mysqlWork->start();
//执行完Worker中的对象后,关闭Worker。
//如果把这段代码放到$query1->getData()和$query2->getData()之后
//则会输出两个空数组,那该方法的作用有可能是等待Worker中对象执行完毕,类似join方法()。
$mysqlWork->shutdown();
var_dump($query1->getData());
var_dump($query2->getData());
三、Mutex 互斥量
当我们用多线程操作同一个资源时,在同一时间内只能有一个线程能够对资源进行操作,这时就需要用到互斥量了。比如我们对同一个文件进行读写操作时。当第一个线程给互斥量加锁后,如果在操作期间,其他线程再次给互斥量加锁,会导致线程进入阻塞状态,直到互斥量被解锁。这就很好的保护了文件在同一时间内只能被一个线程操作。如果不加锁,那么对文件的操作结果是不可预知的,因为同一时间内有很多线程同时操作文件,无法判断先后顺序。
<?php
class Add extends Thread {
private $name = '';
private $res = null;
private $mutex = null;
public function __construct($name, $res, $mutex = null) {
$this->name = $name;
$this->res = $res;
$this->mutex = $mutex;
}
public function run() {
if($this->mutex) {
//给互斥量加锁
Mutex::lock($this->mutex);
}
//从文件中获取数据
$data = trim(fgets($this->res));
$data = intval($data);
++$data;
//重置文件指针到开始处
fseek($this->res, 0);
//写入数据
fwrite($this->res, $data);
//重置文件指针
fseek($this->res, 0);
echo "Thread {$this->name} add {$data} \r\n";
if($this->mutex) {
//给互斥量解锁
Mutex::unlock($this->mutex);
}
}
}
$fp = fopen('./add.txt', 'r+');
//创建互斥量,立即加锁
$mutex = Mutex::create(true);
$threads = array();
for($ix = 0; $ix < 20; ++$ix) {
$thread = new Add($ix, $fp, $mutex);
$thread->start();
$threads[] = $thread;
}
Mutex::unlock($mutex);
foreach($threads as $thread) {
$thread->join();
}
//销毁互斥量
Mutex::destroy($mutex);
四、内存共享
有些时候我们希望在多个线程中共享一些需要的数据,我们可以使用shmop扩展。
<?php
class Count extends Thread {
private $name = '';
public function __construct($name) {
$this->name = $name;
}
public function run() {
//在Linux下可以使用sysvshm的扩展, shm_等函数
//共享内存段的key
$shmKey = 123;
//创建共享内存段
$shmId = shmop_open($shmKey, 'c', 0777, 64);
//读取共享内存数据
$data = trim(shmop_read($shmId, 0, 64));
$data = intval($data);
++$data;
shmop_write($shmId, $data, 0);
echo "thread {$this->name} data {$data} \r\n";
//删除关闭共享内存段
shmop_delete($shmId);
shmop_close($shmId);
}
}
$threads = array();
for($ix = 0; $ix < 10; ++$ix) {
$thread = new Count($ix);
$thread->start();
$threads[] = $thread;
}
foreach($threads as $thread) {
$thread->join();
}
五、线程同步
有些时候我们不希望线程调用start()后就立刻执行,在处理完我们的业务逻辑后在需要的时候让线程执行。
<?php
class Sync extends Thread {
private $name = '';
public function __construct($name) {
$this->name = $name;
}
public function run() {
//让线程进入等待状态
$this->synchronized(function($self){
$self->wait();
}, $this);
echo "thread {$this->name} run... \r\n";
}
}
//我们创建10个线程
$threads = array();
for($ix = 0; $ix < 10; ++$ix) {
$thread = new Sync($ix);
$thread->start();
$threads[] = $thread;
}
$num = 1;
while(true) {
if($num > 5) {
//当$num大于5时,我们才唤醒线程让它们执行
foreach($threads as $thread) {
$thread->synchronized(function($self){
$self->notify();
}, $thread);
}
break;
}
//这里我们处理我们需要的代码
//这时候线程是处在等待状态的
echo "wait... \r\n";
sleep(3);
++$num;
}
foreach($threads as $thread) {
$thread->join();
}
echo "end... \r\n";
六、线程池
Pool对象是多个Worker对象的容器,同时也是它们的控制器,对Worker功能更高抽象。
比如Worker是河,而线程是运行在河里的船。Pool则是管理着多条河。
<?php
//继承Threaded类,Threaded提供了隐式的线程安全机制
//这个对象中的所有操作都是线程安全的
class MyWork extends Threaded {
private $name = '';
private $do = false;
private $data = '';
public function __construct($name) {
$this->name = $name;
}
public function run() {
$this->data = "{$this->name} run... in thread [" . $this->worker->getName() . "] \r\n";
//通过do来判断是否完成
//如果为true,则让Pool::collect回收
$this->do = true;
}
public function isDo() {
return $this->do;
}
public function getData() {
return $this->data;
}
}
class MyWorker extends Worker {
public static $name = 0;
public function __construct() {
self::$name++;
}
public function run() {
}
public function getName() {
return self::$name;
}
}
$pool = new Pool(5, 'MyWorker');
$works = array();
for($ix = 0; $ix < 20; ++$ix) {
$work = new MyWork($ix);
$works[] = $work;
}
foreach($works as $work) {
$pool->submit($work);
}
$pool->shutdown();
foreach($works as $work) {
echo $work->getData();
}
//回收已完成的对象
$pool->collect(function($work){
//我们通过自定义函数isDo来判断对象是否执行完毕
return $work->isDo();
});
|
请发表评论