package CM::SOAPQueue; use base qw( Utils::Thread::JobQueue ); use strict; use warnings; use threads; use threads::shared; use FindBin qw( $Bin ); use autouse 'Carp' => qw( carp croak ); use autouse 'Data::Dumper' => qw( Dumper ); use autouse 'Memoize' => qw( memoize ); use autouse 'Pod::Usage' => qw( pod2usage ); use Time::HiRes qw( gettimeofday tv_interval ); use IO::All; use IO::Select; use POSIX qw( tmpnam :errno_h ); use Utils::Thread::Arg; use Sys::SigAction qw( set_sig_handler ); ############################################################ my $external_program = "$Bin/singleSOAPCall.pl"; ############################################################ sub init { my ($self) = @_; $self->WorkerSub( \&worker ); return $self; } sub worker { my ($arg_string) = @_; my $results; my $alarm = 0; my $h = set_sig_handler( 'ALRM', sub { $alarm = 1; }, {safe => 1} ); my $start = [gettimeofday]; my $arg = Utils::Thread::Arg::decode($arg_string); my $tmp_file = tmpnam; $arg->Input > io($tmp_file); # print STDERR $arg->Input; my $command_line = "$external_program --xml=$tmp_file"; print "Starting soap thread_id:" . threads->self->tid . " of arg:" . $arg->ID . "\n" if $::verbose > 4; my $pid; #timeout a system call: eval { alarm(2); # will force a timeout my $pid = open( COMMAND, "$command_line 2>&1 |" ) or die "Couldn't launch $command_line: $!\n"; threads->yield; my $select = IO::Select->new(*COMMAND); SELECT_LOOP: while (1) { if ( $select->can_read ) { READ_LOOP: while () { next READ_LOOP if /^\s*$/; $results .= $_; } last SELECT_LOOP; } ## end if ( $select->can_read) threads->yield; } ## end while (1) $select->remove(*COMMAND); close(COMMAND); alarm(0); }; #signal handler is reset when $h goes out of scope alarm(0); if ($@ or $alarm) { print "Timeout!\n"; # we timed out } if ( defined $pid ) { if ( kill 0 => $pid ) { # it is alive kill -9 => $pid; } elsif ( $! == EPERM ) { # changed uid } elsif ( $! == ESRCH ) { # is deceased, or zombie } else { # odd, couldn't check on status } } ## end if ( defined $pid ) my $duration = tv_interval( $start, [gettimeofday] ); unlink($tmp_file) or die "Unable to unlink $tmp_file\n"; $arg->Duration( $arg->Duration() + $duration ); print "SOAP took $duration on thread_id:" . threads->self->tid . " of arg:" . $arg->ID . "\n" if $::verbose > 4; $arg->Output($results); return Utils::Thread::Arg::encode($arg); } ## end sub worker 1; #### package Utils::Thread::JobQueue; use base qw( _Inheritable ); use strict; use warnings; use 5.8.0; use threads; use threads::shared; use Thread::Queue; my $counter : shared = 0; my @running : shared; my $job_number : shared; use Class::MethodMaker [ scalar => [{-type => "Thread::Queue"}, qw/ q_in q_out /], scalar => [qw/ Name WorkerSize WorkerSub _ID Debug OutputWaitLock /], scalar => [qw/ _running_lock _job_number_lock /], array => [qw/ _WorkerThreads /],]; our @DEFAULT_ARGS = ( Debug => 0 ); ################################################################ sub init { my ($self) = shift; my $rlock : shared = 0; # running lock my $jlock : shared = 0; # job id lock $self->_running_lock( \$rlock ); $self->_job_number_lock( \$jlock ); $self->q_in( Thread::Queue->new ); $self->q_out( Thread::Queue->new ); lock($counter); $self->_ID( $counter++ ); my $id = $self->_ID; lock($rlock); $rlock = 0; lock($jlock); $jlock = 0; return $self; } ## end sub init sub start { my ($self) = @_; my $rlock = $self->_running_lock; lock($rlock); $$rlock = 1; print "Starting: ", $self->Name, "\n" if $self->Debug; my $size = $self->WorkerSize - 1; if ( not $self->_WorkerThreads_isset or $size < $self->_WorkerThreads_count ) { my $low = 0; $low = $self->_WorkerThreads_count if $self->_WorkerThreads_isset; for ( $low .. $size ) { my $thread = threads->create( $self->_threadWorkerFactory ); #$thread->detach; $self->_WorkerThreads_push($thread); } } ## end if ( not $self->_WorkerThreads_isset... return $self; } ## end sub start sub stop { my ($self) = @_; print "Stoping: ", $self->Name, "\n" if $self->Debug; my $rlock = $self->_running_lock; lock($rlock); $$rlock = 0; if ( $self->_WorkerThreads_count > 0 ) { for my $thread ( $self->_WorkerThreads ) { print "Waiting Thread: ", $thread->tid, " on ", $self->Name, "\n" if $self->Debug; $thread->join; } $self->_WorkerThreads_reset; } ## end if ( $self->_WorkerThreads_count... return $self; } ## end sub stop sub _threadWorkerFactory { my ($self) = shift; return sub { print "Started Thread: ", threads->self->tid, " on ", $self->Name, "\n" if $self->Debug; while (1) { my $arg = $self->q_in->dequeue_nb; if ( defined $arg and $arg ne "" ) { my $number; { my $jlock = $self->_job_number_lock; lock($jlock); $number = ${$jlock}++; } print "Starting Job: $number on ", $self->Name, " on thread ", threads->self->tid, "\n" if $self->Debug; my $result = $self->WorkerSub->($arg); print "Done Job: $number on ", $self->Name, " on thread ", threads->self->tid, "\n" if $self->Debug; $self->q_out->enqueue($result); } ## end if ( defined $arg and ... threads->yield; my $rlock = $self->_running_lock; if ( not $$rlock ) { print "Done Thread: ", threads->self->tid, " on ", $self->Name, "\n" if $self->Debug; return 1; } } ## end while (1) }; } ## end sub _threadWorkerFactory 1;