Perl中的多线程的实现一般有两种办法,而老版本的办法实际上是一种多进程的办法。 一 Thread->New 该办法是传统的老办法,它与folk很类似,新建一个进程时,会把当前内存空间的所有变量都复制一份传到新的进程里面。已实现共享数据。而随着技术的发展,本文不针对该方法做深入研究。 二 IThread 这种方法是通过新建一个新的perl interpreter。 默认情况下,所有的数据和变量是不被线程共享的。 如果想共享一个变量,需通过threads::shared来实现。在使用此方法的时候,需要注意以下三点:
my $var1 : shared = "value"; 以下是一个简单的使用perl 多线程的例子。
use threads; @domain = ("", "", "", ""); for ($i=0;$i<4;$i++) { print $i.'.'.$domain[$i].' '; } print "\n"; my $thr0 = threads->new(\&checkwhois, '0', $domain[0]); my $thr1 = threads->new(\&checkwhois, '1', $domain[1]); my $thr2 = threads->new(\&checkwhois, '2', $domain[2]); my $thr3 = threads->new(\&checkwhois, '3', $domain[3]); sub checkwhois() { my ($l,$r)=@_; my $i=0; while($i<1000000) { $i*$i; $i++; } print "done --$l\t\n"; print $l.$r." query successful! \n"; } $thr0->join; $thr1->join; $thr2->join; $thr3->join;
这个简单的perl主要是新建了4个子线程去做不同的事情,然后调用join方法等待他们执行完成并让线程自动回收。但有时,还是需要结合folk 做一些复杂的工作,下面是关于这个的例外一个demo。
use strict;
use English; use threads; use threads::shared; my $items = 20; my $maxchild = 65; my $pid; my $forks : shared = 1; print "startn\n"; my $item : shared = 0; my $myid = 1; my $main_pid = $PID; print "$main_pid \n"; sub Process { my $sid; { lock($item); $item++ if ($item < $items); } if($sid < $items) { print "Child process ($PID/$myid) start : $sid/$forks\n"; print "$sid \n"; sleep(1); print "Child process ($PID/$myid) end : $sid/$forks\n"; return 1; } elsif($main_pid == $PID) { wait; exit 1; } else { print "Child process ($PID/$myid) exit : $sid/$forks\n"; exit 1; } } while($item < $items) { if(($forks < $maxchild) && ($PID == $main_pid)) { if($pid = fork) { $| = 1; $forks ++; $myid++; print "Starting Sub Process : ($pid/$PID)\n"; } elsif(defined $pid) { $| = 1; last unless (Process); } else { die "cann't fork: $!\n"; } } }
该实例使用了folk 和共享数据等比较高级的用法。 在本文最后,给一个比较留下的perl 多线程的例子:上传文件到文件服务器ftp。
#use strict;
use File::Copy; use File::stat; use File::Find; use Net::FTP; use threads; use threads::shared; my $maxthread=20; # all running threads. my $CurrentThreads : shared = 0; # total files my $total_files : shared = 0; # succeed files my $processed_files : shared = 0; # skip files my $skipped_files : shared = 0; # ftp retry times my $ftp_retrytimes : shared = 3; # whether upload all the files or not, -1 indecate no and 1 indicate yes. my $g_isAllFiles_uploadSuccess : shared = 1; my $ftp_server=""; my $ftp_dir=""; my $ftp_uid=""; my $ftp_pw=""; my $ftp_timeout = 1800; my $ftp_debug=0; my @src_dir_files=(); my @src_dir_NameListFile=(); my @wc_exclude=("_vti", ".lob", "\\bak", "\\data", ""); my $logFileName = 'upload.log'; my $log_cnt=0; my $span=0; my $start_date = TimeString(time); print $start_date . "\n"; my $g_uploadSuccess = 1; my $g_strLastError=""; ################################################################################ ################ Convert between "\"(backlash) and "/" ######################## ################################################################################ sub BacklashToLash { my ($s) = @_; $s = s/\\/\//gis; return $s; } sub LashToBacklash { my ($s) = @_; $s = s/\//\\/gis; return $s; } ################################################################################ ####################### format the time strings ############################### ################################################################################ sub TimeString { my ($tm) = @_; my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime($tm); return sprintf("%04d-%02d-%02d %02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec); } sub ShortTimeString { my ($tm) = @_; my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime($tm); return sprintf("%04d-%02d-%02d_%02d_%02d", $year+1900, $mon+1, $mday, $hour, $min); } sub ScanDate { # scan the date format "2009-03-29 09:09:51" my ($date) = @_; my ($year, $month, $day, $hour, $minute, $seconds); $year = substr($date, 0, 4); $month = substr($date, 5, 2); $day = substr($date, 8, 2); $hour = substr($date, 11, 2); $minute = substr($date, 14, 2); $seconds = substr($date, 17, 2); return ($year, $month, $day, $hour, $minute, $seconds); } ################################################################################ ############### get the directory of current file name ######################## ################################################################################ sub GetDirFromFileName { my ($s) = @_; my $pos = rindex($s, "\\"); return substr($s, 0, $pos); } ################################################################################ ######################## log method to log files ############################## ################################################################################ my $HLOG; sub LOG { my ($text) = @_; my $time = TimeString(time); my $LOG_STEP = 10; FlushLogFile() if ($log_cnt % $LOG_STEP) == 0 or $log_cnt == 0; $log_cnt ++; print HLOG "[$time] $text\n"; } sub OpenLogFile { CloseLogFile(); open(HLOG, ">>$logFileName") or die ("Open file error."); } sub CloseLogFile { close(HLOG) if defined HLOG; } sub FlushLogFile { CloseLogFile(); OpenLogFile(); } ################################################################################ ######################## Process File method ############################## ################################################################################ sub ProcessFile { # The total thread number add one { lock($CurrentThreads); $CurrentThreads++; } # get the thread my ($srcThread, $dstThread, $dstdirThread) = @_; # Increase file number. { lock($total_files); $total_files++; LOG("Processing $total_files \"$srcThread\" "); } my $need_upload = 0; my $bPutResult = 0; my $t1 = $lookup{$srcThread}; my $t2 = TimeString(stat($srcThread)->mtime); if(not defined $t1) { $lookup{$srcThread} = $t2; $need_upload = 1; } else { # time longer than 5 my $delta_sec = 10; $need_upload = 1 if $delta_sec > 5; } if($need_upload > 0) { for(my $nProcessIndex = 1; $nProcessIndex < $ftp_retrytimes; $nProcessIndex++) { my $ftp = Net::FTP->new($ftp_server, Debug=>$ftp_debug, Timeout=>$ftp_timeout); if($@) { $g_strLastError = "Can't connect to the FTP server, the reason: " . $@; LOG("$g_strLastError\n"); } else { $ftp->binary; LOG("The $nProcessIndex time to try upload file from \"$srcThread\" to \"$dstThread\". Current total thread number is $CurrentThreads"); { $bPutResult = 0; $ftp->mkdir($dstdirThread, 1); $ftp->put($srcThread, $dstThread) or $bPutResult = -1; } if($bPutResult < 0) { LOG("The $nProcessIndex time to try upload file FAILED from \"$srcThread\" to \"$dstThread\" (des-dir : \"$dstdirThread\")."); if($@) { LOG("The reason is $@ \n"); } } else { LOG("The $nProcessIndex time to try upload file SUCCEED from \"$srcThread\" to \"$dstThread\""); { lock($processed_files); $processed_files++; } #close the connect $ftp->quit() if ($ftp); last; } } $ftp->quit() if ($ftp); } if($bPutResult < 0) { # failed for $ftp_retrytimes and skipp { lock($skipped_files); $skipped_files ++; lock($g_isAllFiles_uploadSuccess); $g_isAllFiles_uploadSuccess = -1; } } } else { # skipp { lock($skipped_files); $skipped_files ++; } } # decrease current thread { lock($CurrentThreads); $CurrentThreads--; } } sub ProcessFiles { my $srcdir = LashToBacklash($File::Find::dir); my $srcpath = LashToBacklash($File::Find::name); my $base = LashToBacklash($File::Find::topdir); foreach my $exclude (@wc_exclude) { if(index($srcpath, $exclude) > -1) { $File::Find::prune = 1 if -d $srcpath; return; } } if(-d $srcpath) { return; } my $dstdir = $srcdir; my $dstpath = $srcpath; $dstdir =~ s{\Q$base\E}{$ftp_dir}is; $dstpath =~ s{\Q$base\E}{$ftp_dir}is; $dstdir = BacklashToLash($dstdir); $dstpath = BacklashToLash($dstpath); # old way. one by one # processFile($srcpath, $dstpath, $detdir); # new way threads while(1) { if($CurrentThreads < $maxthread) { my $thread = threads->create('ProcessFile', $srcpath, $dstpath, $detdir); push(@$self, \$thread); $thread->detach(); } else { LOG("-sleep 1 second"); sleep 1; } } } ################################################################################ ######################## Main GOES HERE ############################### ################################################################################ # step 1: try to login the ftp. $start_date = time(); LOG("Connecting to the ftp server($ftp_server)"); my $ftp = Net::FTP->new($ftp_server, Debug=>$ftp_debug, Timeout=>$ftp_timeout); if($@) { $g_strLastError = "Can't connect to the FTP server, the reason: " . $@; LOG("$g_strLastError\n"); $g_uploadSuccess = -1; } else { $ftp->login($ftp_uid, $ftp_pw); if($@) { $g_strLastError = "Can't login to the FTP server, the reason: " . $@; LOG("$g_strLastError\n"); $g_uploadSuccess = -1; } else { LOG("Connect ftp server successful!"); $ftp->quit(); # step 2: upload the files my %lookup; LOG("Start to upload files in directory(@src_dir_files)"); find(\&ProcessFiles, @src_dir_files); LOG("The directoty(@src_dir_files) have been completed. The result: "); foreach my $thread (@$self) { print("Joining thread\n"); $$thread->join(); } #step 3: if($g_isAllFiles_uploadSuccess > 0) { LOG("+==================================================================+"); LOG("Start to upload files in directory(@src_dir_NameListFile)"); find(\&ProcessFiles, @src_dir_NameListFile); LOG("The directoty(@src_dir_NameListFile) have been completed. The result: "); foreach my $thread (@$self) { print("Joining thread\n"); $$thread->join(); } LOG("The directory (@rc_dir_NameListFile) has been completed."); LOG("+==================================================================+"); } else { LOG("+==================================================================+"); LOG("These files will not be upload for directory(@src_dir_files) failed."); LOG("+==================================================================+"); } #Step 4: log time $span = time() - $start_date; LOG("Upload succeed! \nTime:$span second. the total files is $total_files. \ \nSucceed are $processed_files and skipped are $skipped_files.\n"); } CloseLogFile(); } |