sub runAll { my $self = shift; my $doneThreadsLock : shared = 1; my $testQueueLock : shared = 1; my @workers; for (1..$self->{'maxtasks'}) { push @workers, threads::async { $SIG{'INT'} = sub { my $tid = threads->tid(); # Tell user we've been terminated if (exists $self->{'runningTasks'}->{$tid}) { my $worker_task = $self->{'runningTasks'}->{$tid}; my $exec_name = getExecName(join(" ",@{$worker_task->{'command'}})); my $pid = $worker_task->{'pid'}; kill(-12, $pid); # print "Worker $tid >> Waiting$exec_name pid: $pid\n"; waitpid($pid, 0); # print "Worker $tid >> Done $exec_name pid: $pid\n"; $worker_task->{'endTime'} = time(); } { lock $doneThreadsLock; $self->{'doneThreads'}->enqueue($tid); } print "INT: Worker $tid exiting\n"; threads->exit(); }; my $tid = threads->tid(); my $rc; my $continueProcessing = 1; my $worker_task; while ($continueProcessing) { { lock $testQueueLock; $worker_task = $self->{'testQueue'}->dequeue(); } if(!$worker_task) { $continueProcessing = 0; next; } my $exec_name = getExecName(join(" ",@{$worker_task->{'command'}})); chdir ($worker_task->{'workdir'}); print "Running: $exec_name\n"; $worker_task->{'startTime'} = time(); $self->{'runningTasks'}->{$tid} = $worker_task; my $cmd = join(" ", @{$worker_task->{'command'}}); my $pid = fork; # if we are child process, pid will be 0 otherwise we are the master if ($pid == 0) { # print "pid = 0, $exec_name running as child process.\n"; my $logfile = FileHandle->new; $logfile->open("> $worker_task->{'log'}"); $logfile->autoflush(1); open(STDOUT, '>&=' . $logfile->fileno); open(STDERR, '>&=' . $logfile->fileno); select STDERR; $| = 1; # make unbuffered select STDOUT; $| = 1; # make unbuffered $logfile->close; # print "Execing child process $exec_name pid($pid).\n"; { lock $doneThreadsLock; $self->{'doneThreads'}->enqueue($tid); } setpgrp; exec("$cmd"); exit(1); } $worker_task->{'pid'} = $pid; my $child_status; # print "Waiting for $exec_name $pid to exit\n"; while (waitpid($pid, POSIX::WNOHANG) != -1) { $child_status = $?; sleep(1); } # print "Done waiting for $exec_name $pid to exit\n"; $worker_task->{'endTime'} = time(); $worker_task->{'status'} = $child_status; # print "Done waiting for $exec_name $pid to exit status ($child_status)\n"; } { lock $doneThreadsLock; $self->{'doneThreads'}->enqueue($tid); } threads->exit(); }; } print "all tasks queued, Now waiting for tasks to exit before queuing more.\n"; { lock $testQueueLock; $self->{'testQueue'}->enqueue(undef) for @workers; } my $threads_exited = 0; my $stop_done = 0; while ($threads_exited < $self->{'maxtasks'}) { my $tid; { lock $doneThreadsLock; $tid = $self->{'doneThreads'}->dequeue_nb(); } if (defined($tid)) { threads->object($tid)->join(); $threads_exited++; print "threads_exited($threads_exited) maxtasks:($self->{'maxtasks'}).\n"; } else { if ($self->{stop} && !$stop_done) { $stop_done = 1; foreach my $thr (threads->list()) { $thr->kill('INT'); } } else { sleep(1); } } } }