package Utils::Thread::JobQueue; use base qw( _Inheritable ); use strict; use warnings; use 5.8.0; use threads; use threads::shared; use Thread::Queue; my $counter : shared = 0; my @running : shared; my $job_number : shared; use Class::MethodMaker [ scalar => [{-type => "Thread::Queue"}, qw/ q_in q_out /], scalar => [qw/ Name WorkerSize WorkerSub _ID Debug OutputWaitLock /], scalar => [qw/ _running_lock _job_number_lock /], array => [qw/ _WorkerThreads /],]; our @DEFAULT_ARGS = ( Debug => 0 ); ################################################################ sub init { my ($self) = shift; my $rlock : shared = 0; # running lock my $jlock : shared = 0; # job id lock $self->_running_lock( \$rlock ); $self->_job_number_lock( \$jlock ); $self->q_in( Thread::Queue->new ); $self->q_out( Thread::Queue->new ); lock($counter); $self->_ID( $counter++ ); my $id = $self->_ID; lock($rlock); $rlock = 0; lock($jlock); $jlock = 0; return $self; } ## end sub init sub start { my ($self) = @_; my $rlock = $self->_running_lock; lock($rlock); $$rlock = 1; print "Starting: ", $self->Name, "\n" if $self->Debug; my $size = $self->WorkerSize - 1; if ( not $self->_WorkerThreads_isset or $size < $self->_WorkerThreads_count ) { my $low = 0; $low = $self->_WorkerThreads_count if $self->_WorkerThreads_isset; for ( $low .. $size ) { my $thread = threads->create( $self->_threadWorkerFactory ); #$thread->detach; $self->_WorkerThreads_push($thread); } } ## end if ( not $self->_WorkerThreads_isset... return $self; } ## end sub start sub stop { my ($self) = @_; print "Stoping: ", $self->Name, "\n" if $self->Debug; my $rlock = $self->_running_lock; lock($rlock); $$rlock = 0; if ( $self->_WorkerThreads_count > 0 ) { for my $thread ( $self->_WorkerThreads ) { print "Waiting Thread: ", $thread->tid, " on ", $self->Name, "\n" if $self->Debug; $thread->join; } $self->_WorkerThreads_reset; } ## end if ( $self->_WorkerThreads_count... return $self; } ## end sub stop sub _threadWorkerFactory { my ($self) = shift; return sub { print "Started Thread: ", threads->self->tid, " on ", $self->Name, "\n" if $self->Debug; while (1) { my $arg = $self->q_in->dequeue_nb; if ( defined $arg and $arg ne "" ) { my $number; { my $jlock = $self->_job_number_lock; lock($jlock); $number = ${$jlock}++; } print "Starting Job: $number on ", $self->Name, " on thread ", threads->self->tid, "\n" if $self->Debug; my $result = $self->WorkerSub->($arg); print "Done Job: $number on ", $self->Name, " on thread ", threads->self->tid, "\n" if $self->Debug; $self->q_out->enqueue($result); } ## end if ( defined $arg and ... threads->yield; my $rlock = $self->_running_lock; if ( not $$rlock ) { print "Done Thread: ", threads->self->tid, " on ", $self->Name, "\n" if $self->Debug; return 1; } } ## end while (1) }; } ## end sub _threadWorkerFactory 1;