Fendaria has asked for the wisdom of the Perl Monks concerning the following question:

Currently I have a program which needs to issue 3000+ SOAP calls to process a set of data. Individually the calls take between 5-11 seconds and the performance for running this way isn't acceptible so I am looking into threading the SOAP calls. When run threaded, the calls slow down, very rarely taking over 45 seconds. (We are looking to speed up the SOAP call but for this it is basically a black box I can't touch).

My first solution was to thread the SOAP::Lite calls in process, but eventually some SOAP calls hung and would never quit, even with the timeout param to the proxy in SOAP::Lite. I attempted to use Sys::SigAction to wrap it in a timeout but apparently SOAP::Lite doesn't like that and messes with the signal handlers itself (which I am begining to doubt).

My second attempted solution moved all the SOAP::Lite code into a completely seperate process where the threads now invoke an external program and read the response back, however the soap call is still timing out. I have tried wrapping the process call in a Sys::SigAction section and when it is triggered it claims there is no handler found. The code should just be your basic timeout example from the Sys::SigAction.

So, my question is, why isn't Sys::SigAction working for the timeout? How else can I write the timeout section to have it work?

If someone can entertain why the SOAP::Lite call is hanging I'd be happy to hear that, but right now my focus is on the timeout around the external command. I believe that by issuing the SOAP call in a completely seperate process, I should be removing it from being a factor in the equation.

This is perl, v5.8.8 built for i586-linux-thread-multi running under SUSE 10. Sys::SigAction is up to date (0.10)

The error message I receive is: Signal SIGALRM received, but no signal handler set.

Few quick comments on the code

package CM::SOAPQueue; use base qw( Utils::Thread::JobQueue ); use strict; use warnings; use threads; use threads::shared; use FindBin qw( $Bin ); use autouse 'Carp' => qw( carp croak ); use autouse 'Data::Dumper' => qw( Dumper ); use autouse 'Memoize' => qw( memoize ); use autouse 'Pod::Usage' => qw( pod2usage ); use Time::HiRes qw( gettimeofday tv_interval ); use IO::All; use IO::Select; use POSIX qw( tmpnam :errno_h ); use Utils::Thread::Arg; use Sys::SigAction qw( set_sig_handler ); ############################################################ my $external_program = "$Bin/singleSOAPCall.pl"; ############################################################ sub init { my ($self) = @_; $self->WorkerSub( \&worker ); return $self; } sub worker { my ($arg_string) = @_; my $results; my $alarm = 0; my $h = set_sig_handler( 'ALRM', sub { $alarm = 1; }, {safe => 1} ); my $start = [gettimeofday]; my $arg = Utils::Thread::Arg::decode($arg_string); my $tmp_file = tmpnam; $arg->Input > io($tmp_file); # print STDERR $arg->Input; my $command_line = "$external_program --xml=$tmp_file"; print "Starting soap thread_id:" . threads->self->tid . " of arg:" . $arg->ID . "\n" if $::verbose > 4; my $pid; #timeout a system call: eval { alarm(2); # will force a timeout my $pid = open( COMMAND, "$command_line 2>&1 |" ) or die "Couldn't launch $command_line: $!\n"; threads->yield; my $select = IO::Select->new(*COMMAND); SELECT_LOOP: while (1) { if ( $select->can_read ) { READ_LOOP: while (<COMMAND>) { next READ_LOOP if /^\s*$/; $results .= $_; } last SELECT_LOOP; } ## end if ( $select->can_read) threads->yield; } ## end while (1) $select->remove(*COMMAND); close(COMMAND); alarm(0); }; #signal handler is reset when $h goes out of scope alarm(0); if ($@ or $alarm) { print "Timeout!\n"; # we timed out } if ( defined $pid ) { if ( kill 0 => $pid ) { # it is alive kill -9 => $pid; } elsif ( $! == EPERM ) { # changed uid } elsif ( $! == ESRCH ) { # is deceased, or zombie } else { # odd, couldn't check on status } } ## end if ( defined $pid ) my $duration = tv_interval( $start, [gettimeofday] ); unlink($tmp_file) or die "Unable to unlink $tmp_file\n"; $arg->Duration( $arg->Duration() + $duration ); print "SOAP took $duration on thread_id:" . threads->self->tid . " of arg:" . $arg->ID . "\n" if $::verbose > 4; $arg->Output($results); return Utils::Thread::Arg::encode($arg); } ## end sub worker 1;
package Utils::Thread::JobQueue; use base qw( _Inheritable ); use strict; use warnings; use 5.8.0; use threads; use threads::shared; use Thread::Queue; my $counter : shared = 0; my @running : shared; my $job_number : shared; use Class::MethodMaker [ scalar => [{-type => "Thread::Queue"}, qw/ q_in q_out /], scalar => [qw/ Name WorkerSize WorkerSub _ID Debug OutputWaitLock +/], scalar => [qw/ _running_lock _job_number_lock /], array => [qw/ _WorkerThreads /],]; our @DEFAULT_ARGS = ( Debug => 0 ); ################################################################ sub init { my ($self) = shift; my $rlock : shared = 0; # running lock my $jlock : shared = 0; # job id lock $self->_running_lock( \$rlock ); $self->_job_number_lock( \$jlock ); $self->q_in( Thread::Queue->new ); $self->q_out( Thread::Queue->new ); lock($counter); $self->_ID( $counter++ ); my $id = $self->_ID; lock($rlock); $rlock = 0; lock($jlock); $jlock = 0; return $self; } ## end sub init sub start { my ($self) = @_; my $rlock = $self->_running_lock; lock($rlock); $$rlock = 1; print "Starting: ", $self->Name, "\n" if $self->Debug; my $size = $self->WorkerSize - 1; if ( not $self->_WorkerThreads_isset or $size < $self->_WorkerThreads_count ) { my $low = 0; $low = $self->_WorkerThreads_count if $self->_WorkerThreads_isset; for ( $low .. $size ) { my $thread = threads->create( $self->_threadWorkerFactory ); #$thread->detach; $self->_WorkerThreads_push($thread); } } ## end if ( not $self->_WorkerThreads_isset... return $self; } ## end sub start sub stop { my ($self) = @_; print "Stoping: ", $self->Name, "\n" if $self->Debug; my $rlock = $self->_running_lock; lock($rlock); $$rlock = 0; if ( $self->_WorkerThreads_count > 0 ) { for my $thread ( $self->_WorkerThreads ) { print "Waiting Thread: ", $thread->tid, " on ", $self->Name, "\n" if $self->Debug; $thread->join; } $self->_WorkerThreads_reset; } ## end if ( $self->_WorkerThreads_count... return $self; } ## end sub stop sub _threadWorkerFactory { my ($self) = shift; return sub { print "Started Thread: ", threads->self->tid, " on ", $self->Name, "\n" if $self->Debug; while (1) { my $arg = $self->q_in->dequeue_nb; if ( defined $arg and $arg ne "" ) { my $number; { my $jlock = $self->_job_number_lock; lock($jlock); $number = ${$jlock}++; } print "Starting Job: $number on ", $self->Name, " on thread ", threads->self->tid, "\n" if $self->Debug; my $result = $self->WorkerSub->($arg); print "Done Job: $number on ", $self->Name, " on thread ", threads->self->tid, "\n" if $self->Debug; $self->q_out->enqueue($result); } ## end if ( defined $arg and ... threads->yield; my $rlock = $self->_running_lock; if ( not $$rlock ) { print "Done Thread: ", threads->self->tid, " on ", $self->Name, "\n" if $self->Debug; return 1; } } ## end while (1) }; } ## end sub _threadWorkerFactory 1;

Thanks for your help, this has been baffling me for over a week now.

Fendaria