标签归档:php多线程

PHP多线程编程 – 与数据库交互实例范本

<?php
class mysql{
	public static function getLink($server, $user, $pwd, $db, $port = 3306, $sock = '', $charset = 'utf8') {
		$connectionRetry = 10;
		$link = false;
		while(!isset($link) || (($link === FALSE) && ($connectionRetry != 0))){
			$link = mysqli_connect($server, $user, $pwd, $db, $port, $sock);
			$connectionRetry--;
		}

		if($link) {
			if(@mysqli_select_db($link, $db)) {
				if((trim($charset) != '') && version_compare(@mysqli_get_server_info(), '4.1.0', '>=')) {
					@mysqli_query($this->link, "SET NAMES '" . trim($charset) . "'");
					if (function_exists('mysqli_set_charset')) {
						@mysqli_set_charset($link, trim($charset));
					} else {
						@mysqli_query($link, "SET CHARACTER_SET_CLIENT = '" . trim($charset) . "'");
						@mysqli_query($link, "SET CHARACTER_SET_RESULTS = '" . trim($charset) . "'");
					}
				}

			}
		}
		return $link;
	}
}

class Test{
	public $link = false;
	public function __construct(){
		global $link;
		$this->link = $link;
	}

	public function insert($v){
		mysqli_query($this->link,"INSERT INTO a(aname) VALUES('".$v."')");
	}
}

class MyWorker extends Worker {
	public function __construct(MyShare $ms, $mutex) {
		$this->share = $ms;
		$this->mutex = $mutex;
	}
	public function run() {
	}
}
class MyWork extends Stackable {
	public $link = FALSE;
	public function __construct() {
	}
	
	// 如果要操作共享对象,操作共享对象的代码最好进行互斥操作
	public function run() {
		if ($this->worker->mutex)
			Mutex::lock ($this->worker->mutex);
			
		global $link;
		$link = mysql::getLink('192.168.1.168', "root", "root", "test");
		
		$test = new Test();
		$test->insert(mt_rand(10000,99999));
		
		if ($this->worker->mutex)
			Mutex::unlock($this->worker->mutex);
	}
}
class MyShare extends Stackable {
	public function __construct() {
	}
	public function run() {
	}
}

$myshare = new MyShare();
$mutex = Mutex::create();
$pool = new Pool(5, "MyWorker", array($myshare,$mutex));

for($i = 0; $i < 50; $i++) {
	$pool->submit(new MyWork());
}

$pool->shutdown();
Mutex::destroy($mutex);

PHP的多线程扩展pthreads虽然不支持在多线程之间共享资源等类型的变量,但是在每个线程中初始化资源类型是没有问题的,每个线程中就类似一个PHP脚本一样(资源类型变量不能共享,但是代码空间的代码是可以共用的)。

注意:以上范本程序中,Test类中使用了global的$link变量,在单独的脚本中,只要在实例化Test之前,声明了$link变量,那么Test对象内部$link就可用(因为它声明了global),但是在pthreads的多线程编程中这个默认的行为不起作用,为了其作用,你需要在实例化Test前,明确使用global先声明$link变量,否则Test中的$link就是空的(尽管它声明了global,但是取不到子线程的同名变量)。

初始化资源的代码也可以放入Work中,这个没有问题,需要牢记的是,资源类型无法在多线程中共享即可。

PHP多线程编程应用之多线程采集

由于要采集一些1688.com上面的产品,并且按照格式保存数据,下载所有图片,之前写过采集的程序,但是是单线程采集,产品一多,下载图片将耗费非常多的时间(不能充分利用带宽,只能一个个来,A下载完成后才能下载B),并且通过nginx或apache发起请求还常常遇到超时问题,所以这次打算使用命令行下的PHP多线程编程来干这个事,以下是程序段,备忘:

<?php
ini_set("display_errors","0");
ini_set("max_execution_time","7200");
ini_set("memory_limit","1024M");

require_once dirname(__FILE__) . '/phpQuery/phpQuery.php';
require_once dirname(__FILE__) . '/phpexcel/Classes/PHPExcel.php';

//Tool
class CrawTool{
	public static function getDataUseCurl($url='', $data='', $post=false){
		if('' == $url){ return false; }

        $ch = curl_init();
        curl_setopt($ch, CURLOPT_HEADER, '');
       	curl_setopt($ch, CURLOPT_URL, trim($url));
        curl_setopt($ch, CURLOPT_HEADER, false);
        curl_setopt($ch, CURLOPT_TIMEOUT, 30);
        curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 10);
        if($post === false){
			curl_setopt($ch, CURLOPT_POST, false);
      	}else{
          	curl_setopt($ch, CURLOPT_POST, true);
          	if('' != $data){
           		curl_setopt($ch, CURLOPT_POSTFIELDS,$data);
          	}
     	}
      	if ((int)preg_match('/^HTTPS/i', $url) > 0) {
        	curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
	        curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
        }

        curl_setopt($ch,CURLOPT_RETURNTRANSFER,true);
        curl_setopt($ch,CURLOPT_FOLLOWLOCATION,true);
        curl_setopt($ch,CURLOPT_MAXREDIRS,10);

        curl_setopt($ch, CURLOPT_USERAGENT, "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0");
        $result = curl_exec($ch);
        $errn = curl_errno($ch);
        curl_close($ch);

        if((int)$errn == 0){
                return $result;
        }

        return false;
	}
}


//Get List
class CrawListWorker extends Worker{
	public function __construct(CrawList $cl){
		$this->setCrawList($cl);
	}
	public function setCrawList(CrawList $cl){
		$this->crawList = $cl;
	}
	public function run(){}
}
class CrawList extends Stackable{
	public function run(){}
}
class CrawListWork extends Stackable{
	public function __construct($url){
		$this->setUrl($url);
	}
	public function setUrl($url){
		$this->url = trim($url);
	}
	public function run(){
		$url_return = CrawTool::getDataUseCurl($this->url);
		if($url_return !== false){         
         	preg_match_all("/.*<div\s+class=\"image\">\s*<a\s+href=\"(.*)\"\s+title.*/",iconv("gbk","utf-8",$url_return),$ls);
			$links = $ls[1];
			foreach($links as $l){
            	preg_match('/http:\/\/.*offer\/([0-9]{1,16})\.html/',$l,$pid);
				$id = $pid[1];
               	$arr = array("id"=>$id,"url"=>$l);
               	$this->worker->crawList[$id] = $arr;
         	}
		}
	}
}

//Get Product Detail
class CrawItemWorker extends Worker{
	public function __construct(CrawItems $cis){
		$this->setCrawItems($cis);
	}
	public function setCrawItems(CrawItems $cis){
		$this->crawItems = $cis;
	}
	public function run(){}
}
class CrawItems extends Stackable{
	public function run(){}
}
class CrawItemWork extends Stackable{
	public $url = '';
	public function __construct($url){
		$this->setUrl(trim($url));
	}
	public function setUrl($url){
		$this->url = $url;
	}
	public function run(){
		preg_match('/http:\/\/.*offer\/([0-9]{1,16})\.html/',$this->url,$pid);
        $id = $pid[1];

		$url_return = CrawTool::getDataUseCurl($this->url);
		if($url_return === false){ return false; }
		$rd = iconv("gbk","utf-8",$url_return);
		
		$d = phpQuery::newDocument($rd);
       	// 描述数据连接
       	$data_tfs_url   	= pq("#desc-lazyload-container")->attr("data-tfs-url");
		$jiage				= pq("td.price-title")->next()->find("span.value")->text();
		$mingchen	 		= pq("#mod-detail-hd h1")->text();               	
		$zhongliang        	= pq('td.de-feature:contains("重量")')->next()->text();
		$huohao 			= pq('td.de-feature:contains("货号")')->next()->text();
		$xiaoshouxuliehao 	= pq('td.de-feature:contains("销售序列号")')->next()->text();
		$caizhi				= pq('td.de-feature:contains("材质")')->next()->text();
		$fengge				= pq('td.de-feature:contains("风格")')->next()->text();
		$pinpai				= pq('td.de-feature:contains("品牌")')->next()->text();
		$yangshi			= pq('td.de-feature:contains("样式")')->next()->text();
		$zaoxing			= pq('td.de-feature:contains("造型")')->next()->text();
		$chuligongyi		= pq('td.de-feature:contains("处理工艺")')->next()->text();

		preg_match_all("/\"original\"\:\"(http\:\/\/[^<>]+\.jpg)\"/",$rd,$main_imgs);
		if(isset($main_imgs[1])){
			$tupian["main"] = $main_imgs[1];
		}else{
			$tupian["main"] = array();
		}
		
		preg_match_all("/<span\s+class=\"image\"\s+title=\"([^<>]+)\"\s+data-imgs=/",$rd,$colors);
		if(isset($colors[1])){
			$yanse = $colors[1];
		}else{
			$yanse = array();
		}		
		
		
		$ss = CrawTool::getDataUseCurl($data_tfs_url);

       	preg_match("/.*\'(.*)\'.*/", $ss,$r);
		$thtmll = iconv("gbk","utf-8",$r[1]);
       	$doc = phpQuery::newDocument($thtmll);
		//$imgs = pq("p img");
	   	$des = pq('table:contains("重量")');  // 定位表格

		$dzhongliang 		= $des->find("td:contains('重量')")->next()->text();
      	$dchicun 			= $des->find("td:contains('尺寸')")->next()->text();
       	$ddiandu 			= $des->find("td:contains('电镀')")->next()->text();
       	$dcailiao 			= $des->find("td:contains('材料')")->next()->text();
       	$dbaozhuang 		= $des->find("td:contains('包装')")->next()->text();
		
		preg_match_all("/<img[^<]+src=\"([^<\"]+)\"[^<]+\/>/",$thtmll,$imgs);
		$imgs = array_unique($imgs[1]);
		foreach($imgs as $iurl){
			$tupian["des"][] = preg_replace("/\?.*/",'',$iurl);
		}

      	if(trim($dzhongliang) != ''){ 
			$zhongliang = $dzhongliang; 
		}
		
		$data = array(
			"id"=>$id,
			"url"=>$this->url,
			"jiage"=>$jiage,
			"yanse"=>$yanse,
			"mingchen"=>$mingchen,
			"zhongliang"=>$zhongliang,
			"huohao"=>$huohao,
			"xiaoshouxuliehao"=>$xiaoshouxuliehao,
			"caizhi"=>$caizhi,
			"fengge"=>$fengge,
			"pinpai"=>$pinpai,
			"yangshi"=>$yangshi,
			"zaoxing"=>$zaoxing,
			"chuligongyi"=>$chuligongyi,
			"dchicun"=>$dchicun,
			"ddiandu"=>$ddiandu,
			"dcailiao"=>$dcailiao,
			"dbaozhuang"=>$dbaozhuang,
			"tupian"=>$tupian
		);		

		$this->worker->crawItems[$id]=$data;
	}
}

//Download Images
class CrawImageWorker extends Worker{
	public function run(){}
}

class CrawImageWork extends Stackable{
	public function __contruct($from, $to){
		$this->setFrom($from);
		$this->setTo($to);
	}
	public function setFrom($f){
		$this->from = $f;
	}
	public function setTo($t){
		$this->to = $t;
	}
	public function grabImage($url, $filename=""){
    	if($url == ""){return FALSE;}
      	$extt = strrchr($url, ".");
       	$ext = strtolower($extt);
       	if($ext != ".gif" && $ext != ".jpg" && $ext != ".png" && $ext != ".bmp"){
			echo $url." not support.";return FALSE;
   		}

		if($filename == ""){
      		$filename = time()."$extt";
      	}

		ob_start();
  		readfile($url);
       	$img = ob_get_contents();
      	ob_end_clean();
       	$size = strlen($img);
       	$fp2 = fopen($filename , "a");
     	fwrite($fp2, $img);
     	fclose($fp2);

		return $filename;
	}
	public function run(){
		if(!file_exists($this->to)){
			$this->grabImage($this->from, $this->to);
		}	
	}
}

///////////////////////////////////////////////////////////////
$start = microtime(true);

$url = $argv[1];
preg_match('/http:\/\/(.*)\.1688\.com.*offerlist_(.*)\.htm$/',$url,$pmt);
$shop = $pmt[1];                        //店铺名称
$offlist = "offerlist_".$pmt[2];        //目录编号

// 目录链接
$shop_all_url = "http://".$shop.".1688.com/page/".$offlist.".htm";
$base = dirname(__FILE__);
$new_base = $base. "/".$shop."_".$offlist;

echo "\n\n## Start to Get '".$shop_all_url."'\n";

if((int)$argc < 2){
	echo "## \$argv[1] miss. exit...\n";
	exit;
}

if(!is_dir($new_base)){
 	mkdir($new_base);
}

$html = iconv("gbk","utf-8",file_get_contents($shop_all_url."?pageNum=1"));
phpQuery::newDocument($html);

// 页数
$page_total = (int)pq("li > em.page-count")->text();
echo "## Total ".$page_total." pages.\n";

echo "## Start to get List...\n";
$craw_list = new CrawList();
$pool = new Pool(5,"CrawListWorker",array($craw_list));

for($i=1;$i<=$page_total;$i++){
	$pool->submit(new CrawListWork($shop_all_url."?pageNum=".$i));
}
$pool->shutdown();
unset($pool);

echo "## Start to get Items...\n";
$craw_items = new CrawItems();
$pool_items = new Pool(10,"CrawItemWorker",array($craw_items));
$tt = 0;
foreach($craw_list as $i=>$clt){
	//if($tt>5) break;
	$pool_items->submit(new CrawItemWork($clt['url']));
	$tt++;
}
$pool_items->shutdown();
unset($pool_items);

$excude = array();
echo "## Start to get Images...\n";
$imgs_pool = new Pool(10,"CrawImageWorker",array());

$jj=1;
foreach($craw_items as $idx=>$data){
	//if($jj>3){ break; }
	$iidx = $data["id"];
	$imgss = $data["tupian"];
	$to = $new_base."/".$iidx;
	@mkdir($to,0777,true);
	$i=0;
	foreach($data["tupian"]["main"] as $igs){
		if(in_array($igs,$excude)){ continue; }
		$c = new CrawImageWork($igs,$to."/main_".$i.".jpg");
		$c->setFrom($igs);
		$c->setTo($to."/main_".$i.".jpg");
	
		$imgs_pool->submit($c);
		unset($c);
		$i++;
	}
	$i=0;
	foreach($data["tupian"]["des"] as $igs){
		if(in_array($igs,$excude)){ continue; }
		$c = new CrawImageWork($igs,$to."/des_".$i.".jpg");
		$c->setFrom($igs);
		$c->setTo($to."/des_".$i.".jpg");
	
		$imgs_pool->submit($c);
		unset($c);
		$i++;
	}

	$jj++;
}
$imgs_pool->shutdown();
unset($imgs_pool);

// General PHPExcel
$objPHPExcel = new PHPExcel();
$objPHPExcel->getProperties()->setCreator("vfeelit@qq.com")->setLastModifiedBy("vfeelit@qq.com");

$objPHPExcel->setActiveSheetIndex(0);
$objActSheet = $objPHPExcel->getActiveSheet();
$objActSheet->setTitle('采集表');

//涉及到的列
$colmns = array('A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T');
//列宽
foreach($colmns as $cls){
	$objActSheet->getColumnDimension($cls)->setWidth(20);
}
$objActSheet->getColumnDimension('B')->setWidth(60);
$objActSheet->getColumnDimension('T')->setWidth(120);

$objActSheet->setCellValue('A1','编号');
$objActSheet->setCellValue('B1','名称');
$objActSheet->setCellValue('C1','图片');
$objActSheet->setCellValue('D1','颜色');
$objActSheet->setCellValue('E1','价格');
$objActSheet->setCellValue('F1','重量');
$objActSheet->setCellValue('G1','货号');
$objActSheet->setCellValue('H1','销售序列号');
$objActSheet->setCellValue('I1','材质');
$objActSheet->setCellValue('J1','风格');
$objActSheet->setCellValue('K1','品牌');
$objActSheet->setCellValue('L1','样式');
$objActSheet->setCellValue('M1','造型');
$objActSheet->setCellValue('N1','处理工艺');
$objActSheet->setCellValue('O1','尺寸');
$objActSheet->setCellValue('P1','电镀');
$objActSheet->setCellValue('Q1','材料');
$objActSheet->setCellValue('R1','包装');
$objActSheet->setCellValue('S1','链接');
$objActSheet->setCellValue('T1','图片');

$jj=2;
foreach($craw_items as $idx=>$data){
	//if($jj>3){ break; }
	$iidx = $data["id"];
	$imgss = $data["tupian"];
	$to = $new_base."/".$iidx;
	@mkdir($to,0777,true);
	$i=0; $the_main_pic=''; $mainimgs='';
	foreach($data["tupian"]["main"] as $igs){
		if(in_array($igs,$excude)){ continue; }
		if($i == 0){
			$the_main_pic = $to."/main_".$i.".jpg";
		}
		$mainimgs .= $igs."\r\n\r\n";
		$i++;
	}
	$i=0; $desimgs='';
	foreach($data["tupian"]["des"] as $igs){
		if(in_array($igs,$excude)){ continue; }
		$desimgs .= $igs."\r\n\r\n";
		$i++;
	}
	//行高	
	$objActSheet->getRowDimension($jj)->setRowHeight(95);

	//单元格对齐
	foreach($colmns as $c){
		$align = $objActSheet->getStyle($c.$jj)->getAlignment();
		$align->setHorizontal(PHPExcel_Style_Alignment::HORIZONTAL_LEFT);
		$align->setVertical(PHPExcel_Style_Alignment::VERTICAL_TOP);
		$align->setWrapText(TRUE);
	}
	
	$objActSheet->getStyle('A'.$jj)->getNumberFormat()->setFormatCode(PHPExcel_Style_NumberFormat::FORMAT_NUMBER);
	$objActSheet->setCellValue('A'.$jj,$data["id"]);

	$objActSheet->setCellValue('B'.$jj,$data["mingchen"]);
	
	//主图
	$fsig = $the_main_pic;
	if(file_exists($fsig)){
		//添加图片
		$objDrawing = new PHPExcel_Worksheet_Drawing();
		$objDrawing->setName('Product Image');					//设置名字
		$objDrawing->setDescription('');						//图片描述
			
		$objDrawing->setPath($fsig);							//图片路径
			
		//$objDrawing->setHeight(120);							//图片宽度 像素
		$objDrawing->setWidth(120);								//图片宽度 像素
		$objDrawing->setCoordinates('C'.$jj); 					//坐标 放入哪个单元格
		//$objDrawing->setOffsetX(10);							//针对单元格 水平偏移量
		//$objDrawing->setOffsetY(10);							//针对单元格 垂直偏移量
		//$objDrawing->setRotation(15);   						//旋转
		$objDrawing->setWorksheet($objActSheet);				//应用到工作表
	}else{
		$objActSheet->setCellValue('C'.$jj,"主图没有下载");	
	}	
	
	$colors = '';
	foreach($data["yanse"] as $color){
		$colors .= $color."\r\n";
	}
	$objActSheet->setCellValue('D'.$jj,$colors);
	
	$objActSheet->setCellValue('E'.$jj,$data["jiage"]);
	$objActSheet->setCellValue('F'.$jj,$data["zhongliang"]);
	$objActSheet->setCellValue('G'.$jj,$data["huohao"]);
	$objActSheet->setCellValue('H'.$jj,$data["xiaoshouxuliehao"]);
	$objActSheet->setCellValue('I'.$jj,$data["caizhi"]);
	$objActSheet->setCellValue('J'.$jj,$data["fengge"]);
	$objActSheet->setCellValue('K'.$jj,$data["pinpai"]);
	$objActSheet->setCellValue('L'.$jj,$data["yangshi"]);
	$objActSheet->setCellValue('M'.$jj,$data["zaoxing"]);
	$objActSheet->setCellValue('N'.$jj,$data["chuligongyi"]);
	$objActSheet->setCellValue('O'.$jj,$data["dchicun"]);
	$objActSheet->setCellValue('P'.$jj,$data["ddiandu"]);
	$objActSheet->setCellValue('Q'.$jj,$data["dcailiao"]);
	$objActSheet->setCellValue('R'.$jj,$data["dbaozhuang"]);
	$objActSheet->setCellValue('S'.$jj,$data["url"]);
	$objActSheet->setCellValue('T'.$jj,$mainimgs."\r\n\r\n".$desimgs);
	
	$jj++;
}
$objWriter = PHPExcel_IOFactory::createWriter($objPHPExcel,'Excel5');
$objWriter->save($base."/".$shop."_".$offlist.".xls");
	
$end = microtime(true);
echo "## Time Use:".($end-$start)."\n\n\n";

采集产品链接时开了5个线程,采集产品信息时开了10个线程,下载图片时开了10个线程。 这里是模拟浏览器的正常访问采集,所以需要设置一下php.ini文件的user_agent值。由于浏览器本身就是一个多线程程序,它可以同时发起多个连接,所以服务器端是绝对要允许同一个客户端开多个链接,所以我们可以模拟浏览器访问,但是线程数一多就可能被识别为非浏览器访问,那可能就会被堵塞。

测试:

/usr/local/php-5.5.15/bin/php craw.php "http://xxx.1688.com/page/offerlist_xxx.htm"


## Start to Get 'http://xxx.1688.com/page/offerlist_xxx.htm'
## Total 2 pages.
## Start to get List...
## Start to get Items...
## Start to get Images...
## Time Use:306.9495668411255


一共采集了40个产品,包括下载所有图片,共用了306秒(5分钟),如果线程数开大点估计时间会更加少(还取决于下载数据,毕竟图片多)。如果是顺序采集(单线程),至少30分钟吧。这个多线程编程的效果非常明显。

原创文章,转载务必保留出处。
永久链接:http://blog.ifeeline.com/1188.html

PHP多线程编程 之 Stackable详解

在Worker中,需要用到Stackable类型的对象,不过这个类型到底是什么东西,文档并没有详细说明。

/usr/local/php-5.5.15/bin/php --rc Threaded
Class [ <internal:pthreads> <iterateable> class Threaded implements Traversable, Countable ] {
...
/usr/local/php-5.5.15/bin/php --rc Stackable
Class [ <internal:pthreads> <iterateable> class Threaded implements Traversable, Countable ] {

查看Threaded和Stackable类的反射输出,发现它们是一模一样的。所以Stackable是Threaded的别名。

用一个例子验证一下:

class a{}
class_alias("a","b");
ReflectionClass::export("a");
ReflectionClass::export("b");

输出:

/usr/local/php-5.5.15/bin/php rr.php
Class [ <user> class a ] {
  @@ /root/rr.php 2-2
...
Class [ <user> class a ] {
  @@ /root/rr.php 2-2

继承Threaded的类需要实现run方法,除非这个类不是用来运行代码的(作为其它类的通用父类,比如Thread和Worker就是)。

看一个实例:

class Work extends Stackable {
        public function run() {
                if ($this->worker) {
                        printf("Running in %s\n", __CLASS__);
                } else printf("failed to get worker something is wrong ...\n");
                print_r($this);
        }
}
class ExampleWorker extends Worker {
        public $count = 0;
        public function __construct() { $this->count += 1; }
        public function run() {}
}
$worker = new ExampleWorker();
$work = new Work();

$worker->start();
$worker->stack($work);

$worker->shutdown();

例子中,在Stackable类Work内,调用了$this->worker判断Worker是否已经设置。先查看输出:

/usr/local/php-5.5.15/bin/php ts.php
Running in Work
Work Object
(
    [worker] => ExampleWorker Object
        (
            [count] => 1
        )

)

Work对象中有一个worker的属性指向Worker对象,这是何时设置的?官方文档有简单说明,大概是说Stackable对象被Worker压栈后在Stackable内可以调用Worker对应的方法,但是具体如何用,没有说明。通过实例可以推导,在Worker对象调用stack方法时,被压入的Stackable对象被修改了,它为之增加了一个worker属性,并引用到自己(Stackable::worker = $this)。

原创文章,转载务必保留出处。
永久链接:http://blog.ifeeline.com/1132.html

PHP多线程编程 之 入门(官方文档)

Getting Started 入门

First thing is first, here’s a mantra, repeat it:
Efficient Multi-Threading is about using minimum threads to do maximum work.
高效的多线程是如何使用最少线程做的最多的工作。
When setting out in the world of multi-threading, one might be tempted to just throw threads at something and expect that it will perform better. This is rarely the case in the real world. Efficient use of Multi-Threading relies on your ability to minimize on resources in order to maximize on your hardware. In this tutorial, I take a mundane task – fetching an external web page – and tutor on the best ways to multi-thread that task in the real world.
当在多线程的世界里启程时,人们可能对在某些时候仅仅抛出线程并期望它会表现更好而动心。在真实世界里这个情况是很罕见的。高效使用多线程依赖你使用最少的资源在你的硬件上达到最大化的能力。在这个手册中,我使用一个通俗的任务-获取外部网页-指导以最好的方式在真实的世界中的任务多线程。
The Simplest of All Worlds
In the simplest of all worlds, we create a Thread in order to fetch content asynchronously.
创建一个Thread同步获取内容。

class WebRequest extends Thread {
    public $url;
    public $response;
     
    public function __construct($url){
        $this->url = $url;
    }
     
    public function run() {
        $this->response = file_get_contents($this->url);
    }
}
 
$request = new WebRequest("http://pthreads.org");
 
if ($request->start()) {
     
    /* do some work */
     
    /* ensure we have data */
    $request->join();
     
    /* we can now manipulate the response */
    var_dump($request->response);
}

The problem with the above implementation is that the process is left to deal with the data, which is a big waste of resources. Can you see why ?
以上实现的问题是the process is left to deal with the data,它是对资源的很大浪费。你可以看到为何吗?
The Best of Both Worlds
A savvy designer will notice that the WebRequest Thread could and should perform manipulation of the data, such that the processing of the response can also be performed asynchronously, not just the download of data.
精明的设计师会注意到WebRequest可以和应该执行数据的操作,这样响应处理也可以异步执行,而不仅是数据下载。

class WebRequest extends Thread {
    public $url;
    public $data;
     
    public function __construct($url){
        $this->url = $url;
    }
     
    public function run() {
        $response = file_get_contents($this->url);
        if ($response) {
            /* process response into useable data */
             
            $this->data = array($response);
        }
    }
}
 
$request = new WebRequest("http://pthreads.org");
 
if ($request->start()) {
     
    /* do some work */
     
    /* ensure we have data */
    $request->join();
     
    /* we can now manipulate the response */
    var_dump($request->data);
}

The example above makes much more efficient use of Multi-Threading, omitted is the logic that takes care of parsing the data down into a useable (more importantly, shareable) form.
以上的例子中更加高效地使用多线程,省略的是小心解析数据到一个可用的(更重要,可共享)表单的逻辑。

注意:废话一大堆,无非是想说在另一个线程中完成解析数据的逻辑。而这个被认为是浪费,真是十分勉强。它作为所谓的Getting Started,实在是跑题太远。

永久链接:http://blog.ifeeline.com/1123.html

PHP多线程编程 之 worker测试

<?php
class S extends Stackable{
	public function run(){
		usleep(1000000);
		echo "S Stackable Run -->\t".Thread::getCurrentThreadID()."--".microtime(true)."\n";
	}
}
class SS extends Stackable{
        public function run(){
		usleep(1000000);
                echo "SS Stackable Run -->\t".Thread::getCurrentThreadID()."--".microtime(true)."\n";
        }
}
class T extends Worker{
	public function run(){
		echo "Worker Run -->\t\t".Thread::getCurrentThreadID()."--".microtime(true)."\n";
	}
}

$t = new T();
$t->start();

$s = new S();
$ss = new SS();

$t->stack($s);
$t->stack($ss);

//最后一定要调用shutdown,它会等待子线程结束,否则将出现段错误
//原因是子线程引用了主线程的对象,主线程退出后,子线程引用的对象已经销毁
$t->shutdown();

echo "Super -->\t\t".Thread::getCurrentThreadID()."--".microtime(true)."\n";

输出:

/usr/local/php-5.5.15/bin/php tw.php
Worker Run -->		140129030506240--1406547417.7107
S Stackable Run -->	140129030506240--1406547418.7123
SS Stackable Run -->	140129030506240--1406547419.7295
Super -->		140129225074624--1406547419.73

从输出结果看,Worker的run方法和Stackable对象的run方法都在同一个线程中执行,多个Stackable对象重用线程,按顺序执行(看时间可以验证)。

把以上代码的shutdown()语句去掉,运行结果:

/usr/local/php-5.5.15/bin/php tw.php
Worker Run -->		140428491556608--1406547565.413
Super -->		140428686124992--1406547565.4134
S Stackable Run -->	140428491556608--1406547566.414
Segmentation fault

这里可见,Super在第二行就输出了,接着它的线程就结束了,而Stackable对象的线程引用了Super的线程创建的对象,Super线程结束意味着它的对象被销毁,而Stackable对象的线程还在试图获取已经被销毁的对象,段错误就这样产生了。

原创文章,转载务必保留出处。
永久链接:http://blog.ifeeline.com/1115.html

PHP多线程编程 – 实例之Fetch

实例来自PHP的PECL扩展包pthreads-2.0.7中的examples。

<?php
class TestObject {
	public $val;
}

class Fetching extends Thread {
	public function run(){
		echo "Begin Fetching run method: ".Thread::getCurrentThreadId()."\n";
		/*
		* of course ...
		*/
		$this->sym = 10245;
		$this->arr = array(
			"1", "2", "3"
		);
		
		/*
		* objects do work, no preparation needed ...
		* read/write objects isn't finalized ..
		* so do the dance to make it work ...
		*/
		$obj = new TestObject();
		$obj->val = "testval";
		$this->obj = $obj;
		
		/*
		* will always work
		*/
		$this->objs = serialize($this->obj);
		
		/*
		* nooooooo
		*/
		$this->res = fopen("php://stdout", "w");
		
		/*
		* tell the waiting process we have created symbols and fetch will succeed
		*/

		$this->synchronized(function(){
			echo "Begin ".Thread::getCurrentThreadId()." notify.\n"; 
		    	$this->notify();
			echo "End ".Thread::getCurrentThreadId()." notify.\n";
		});
		
		/* wait for the process to be finished with the stream */
		$this->synchronized(function(){
		    echo "Begin ".Thread::getCurrentThreadId()." wait.\n";
			$this->wait();
			echo "End ".Thread::getCurrentThreadId()." wait.\n";
		});
		echo "End Thread run method: ".Thread::getCurrentThreadId()."\n";
	}
}

$thread = new Fetching();

$thread->start();

$thread->synchronized(function($me){
	echo "Begin ".Thread::getCurrentThreadId()." wait.\n";
    $me->wait();
	echo "End ".Thread::getCurrentThreadId()." wait.\n";
}, $thread);
/*
* we just got notified that there are symbols waiting
*/
foreach(array("sym", "arr", "obj", "objs", "res") as $symbol){
	printf("\$thread->%s: ", $symbol);	
	$fetched = $thread->$symbol;
	if ($fetched) {
		switch($symbol){
			/*
			* manual unserialize
			*/
			case "objs":
				var_dump(unserialize($fetched));
			break;
			
			default: var_dump($fetched);
		}
	}
	printf("\n");
}

/* notify the thread so it can destroy resource */
$thread->synchronized(function($me){
    $me->notify();
}, $thread);

为了能观察到输出,我添加了一些输出语句。以下是输出:

/usr/local/php-5.5.15/bin/php Fetch.php
Begin Fetching run method: 140107248678656
Begin 	140107248678656 notify.
End  	140107248678656 notify.
Begin 140107248678656 wait.
Begin 140107443247040 wait.

一旦线程对象的start()方法执行,那么它的run()方法就会马上运行,这里可以看到run()方法所在的线程ID是140107248678656,它一直运行到它的最后遇到wait()时才被堵塞,这个过程中,线程ID保持不变,这个说明这段代码是在同一个线程中。

紧接着运行如下这段代码:

$thread->synchronized(function($me){
	echo "Begin ".Thread::getCurrentThreadId()." wait.\n";
    $me->wait();
	echo "End ".Thread::getCurrentThreadId()." wait.\n";
}, $thread);

这段代码让当前线程堵塞(Begin 140107443247040 wait.)。同时这段代码和全局文件处于同一个线程中,所以它不会继续执行以下代码,这个时候实际两个线程都堵塞了,所以它会一直堵塞下去。

另外一种情况是不会出现堵塞的情况,以上这段代码如果在run()方法的notify执行之前被执行了,那么主线程就可以被唤醒:

/usr/local/php-5.5.15/bin/php Fetch.php
Begin Fetching run method: 139715053770496
Begin 139715248338880 wait.
Begin 139715053770496 notify.
End 139715053770496 notify.
End 139715248338880 wait.
Begin 139715053770496 wait.
$thread->sym: int(10245)

$thread->arr: array(3) {
  [0]=>
  string(1) "1"
  [1]=>
  string(1) "2"
  [2]=>
  string(1) "3"
}

$thread->obj: object(TestObject)#2 (1) {
  ["val"]=>
  string(7) "testval"
}

$thread->objs: object(TestObject)#2 (1) {
  ["val"]=>
  string(7) "testval"
}

$thread->res: resource(4) of type (stream)

End 139715053770496 wait.
End Fetching run method: 139715053770496

同一个线程中的代码,如果被堵塞,那么之后的代码将不会被执行,直到它被重新唤醒。线程对象的run()方法在一个独立的线程空间中执行,全局代码也处于一个独立的线程空间中。

永久链接: http://blog.ifeeline.com/1111.html

PHP多线程编程 – 实例之Benchmark

实例来自PHP的PECL扩展包pthreads-2.0.7中的examples。

class T extends Thread {
	public function run() {}
}

$max = @$argv[1] ? $argv[1] : 100;
$sample = @$argv[2] ? $argv[2] : 5;

printf("Start(%d) ...", $max);
$it = 0;
do {
    $s = microtime(true);
    /* begin test */
    $ts = [];
    while (count($ts)<$max) {
        $t = new T();
        $t->start();
        $ts[]=$t;
    }
    $ts = [];
    /* end test */
    
    //每秒的线程数 每秒事务量(TPS)
    $ti [] = $max/(microtime(true)-$s);
    printf(".");
} while ($it++ < $sample);

printf(" %.3f tps\n", array_sum($ti) / count($ti));

每次开启$max个线程,执行$sample次。通过这个脚本测试系统多线程编程的TPS。执行$max线程需要的时间,反推1秒执行的线程数。结果如下:

/usr/local/php-5.5.15/bin/php Benchmark.php 200 2
Start(200) ...... 523.638 tps
/usr/local/php-5.5.15/bin/php Benchmark.php 500 2
Start(500) ...... 218.953 tps
/usr/local/php-5.5.15/bin/php Benchmark.php 1000 2
Start(1000) ...... 139.939 tps

永久链接: http://blog.ifeeline.com/1107.html