package Thread::Simple; use 5.008; use strict; use threads; use warnings; use Carp; use Thread::Queue; our $VERSION = '0.02'; sub new{ my $class= shift; my $self = {@_}; # Take all parms passed in as hashref. # Expect WORKER=>&subref, N=>$int $self->{REQUEST_QUEUE} = new Thread::Queue; # Get work from here $self->{RESULT_QUEUE} = new Thread::Queue; # Worker Places results here $self->{STATS_QUEUE} = new Thread::Queue; # Worker Places thread Staistics here $self->{CLASS} = $class; $self->{STATE} = "RUNNING"; bless $self, $class; $self->{WORKER} and setworker($self); return $self; } sub setworker{ my ($self,$workref,@params) = @_; $self->{WORKER} = $workref if $workref; if ( @params ){ # Make it empty, if undef $self->{WORK_PARAMS} = \@params ; }else{ $self->{WORK_PARAMS} ||= []; } for (0..$self->{N} - 1 ){ $self->{THISTHREADNBR} = $_; $self->{CONTROL_QUEUE}[$_] = new Thread::Queue; # Send CONTROL messages to worker push @{$self->{THREADPOOL}} ,threads->create(\&_internal_worker, $self) ; } } sub Pause_workers{ my ($self, $msg,$autoresume,$delay) = @_; die "Misuse of PAUSE - already Paused" if $self->{STATE} eq "PAUSED"; $_->enqueue($msg) for @{ $self->{CONTROL_QUEUE} }; $self->{REQUEST_QUEUE}->insert(0, (undef) x $self->{N} ); # Tell them to look at CTRL $self->{STATE} = "PAUSED"; sleep $delay if $delay; $self->Resume_workers() if $autoresume; } sub Resume_workers{ my $self = shift; die "Misuse of RESUME - we are NOT Paused" unless $self->{STATE} eq "PAUSED"; $_->enqueue(undef) for @{ $self->{CONTROL_QUEUE} }; # Resumes workers $self->{STATE} = "RUNNING"; } sub _internal_worker{ my $self=shift; while ($_ = $self->getwork()){ $self->pushresult ( $self->{WORKER}->( $_, @{$self->{WORK_PARAMS}} ) ); } } sub feedwork{ my $self = shift; $self->{REQUEST_QUEUE}->enqueue (@_); } sub getwork{ my $self = shift; my $currentwork = $self->GetNextITEM("REQUEST_QUEUE"); if (defined $currentwork){ # There is work to be done .. $self->{WORKDONE}++; return $currentwork ; } # No current work - look for CONTROL msgs my $control_work = $self->{CONTROL_QUEUE}[$self->{THISTHREADNBR}]->dequeue_nb; # Non Blk if (defined $control_work){ $self->{CONTROLMSGS}++; #Interpret/process the current message here #Then wait for next message, before proceeding $control_work = $self->{CONTROL_QUEUE}[$self->{THISTHREADNBR}]->dequeue; # Blk; return $self->getwork(); # Recurse } # Main thread is telling us to quit # enqueued item MUST BE SIMPLE SCALAR (No aref etc) $self->{STATS_QUEUE}->enqueue ("$self->{THISTHREADNBR},$self->{WORKDONE},$self->{CONTROLMSGS}"); return undef;# Tell worker to quit } sub pushresult{ my $self = shift; $self->{RESULT_QUEUE}->enqueue (@_); } sub GetNextITEM{ my $self = shift; my $itemname = shift; # What queue to pull from... my $blocking = shift; defined $blocking or $blocking= 1; # Default is to BLOCK. Set to 0 to NOT blk $blocking and return $self->{ $itemname }->dequeue; return $self->{ $itemname }->dequeue_nb; # NON-Blocking (undef at eof) } sub Quit_And_Get_Results{ my $self = shift; $self->feedwork( (undef) x $self->{N} ); # Tell them to quit $_->join for @{$self->{THREADPOOL}}; # Wait for kids return undef unless wantarray; my @results ; push @results, $_ while $_=$self->GetNextITEM("RESULT_QUEUE",0);# Use NON-BLK return @results; } sub print_stats{ my $self = shift; my @results ; push @results, $_ while $_=$self->GetNextITEM("STATS_QUEUE",0);# Use NON-BLK print "STATS: Thread $_\n" for sort @results; my $count = scalar @results; my (@freq,$totaltrans); for (@results){ my ($thread,$wrk,$ctrl)=split( /,/ , $_); $freq[ $wrk ]++ ; $totaltrans += $wrk; } my ($cumul, $cumul_th); for my $idx (0..$#freq) { next unless $freq[$idx]; $cumul_th += $freq[$idx]; $cumul += $freq[$idx] * $idx; printf "%3d = %3.1f%% Did %5d (%5d Cumul). %3d Threads did %3.1f %% of Trans.)\n", $freq[$idx] ,$freq[$idx]*100/$count , $idx ,$cumul, $cumul_th, $cumul *100/$totaltrans; } } 1; __END__ =head1 NAME Thread::Simple - A simple thread-pool implementation =head1 SYNOPSIS use Thread::Simple; =head1 DESCRIPTION C provides a simple thread-pool implementaion without external dependencies outside core modules. Jobs can be submitted to and handled by multi-threaded `workers' managed by the pool. The processing model is that a series of individual units of data needs to be processed, in parallel. The result of processing (if present) can be returned, and will be collected. This module hides the "thread" and Thread::Queue" objects that are used in it's implementation. The caller/user sees only a highly simplified interface. =head1 SYNOPSIS use Thread::Simple; my $t = new Thread::Simple (N=>5, WORKER=> \&worksub); # 5 threads while( ) { $t->feedwork( $_ ); } # Get and process results for ( $t->Quit_And_Get_Results() ){ print "$_\n" ; # Or whatever you want to do with them } sub worksub{ # This is called by each thread my $work = shift; # Get one unit of work .. process the $work return "Some (optional) scalar result of processing $work"; } =head1 Selected METHODs new(N=>, WORKER=>, ) Creates a Thread::Simple object. feedwork( $work_to_process) Adds to the queue of work to be done. Quit_And_Get_Results() Returns an array of results produced. The following methods are optional, for more control over processing: print_stats() can be called after Quit_And_Get_Results, to print out how much work each thread did (for manual optimization of the N param). Pause_workers() and Resume_workers can temporarily suspend/resume work. =head1 BUGS #Note: # If you see the error message: # Attempt to free unreferenced scalar: SV ..., Perl interpreter: ... during global destruction. # is due to a bug in threads. This bug is fixed in threads 1.63 =head1 AUTHOR NetWallah =head1 COPYRIGHT AND LICENSE Copyleft 2008 This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself. =cut