sub qxx {
my( $cmd, @args ) = @_;
my( $rout, $rerr, $wout, $werr, $proch );
open my $stdout, '>&', \*STDOUT or tcroak( $! );
pipe $rout, $wout or tcroak( $! );
binmode $rout, ':crlf';
open STDOUT, '>&', $wout or tcroak( $! );
open my $stderr, '>&', \*STDERR or tcroak( $! );
pipe $rerr, $werr or tcroak( $! );
binmode $rerr, ':crlf';
open STDERR, '>&', $werr or tcroak( $! );
Win32::Process::Create(
$proch, $cmd, join( ' ', $cmd, @args ),
1, NORMAL_PRIORITY_CLASS, "."
) or tcroak( $^E );
close $_ for $wout, $werr;
open STDOUT, '>&', $stdout or tcroak( $! );
open STDERR, '>&', $stderr or tcroak( $! );
return $proch->GetProcessID(), $rout, $rerr;
}
####
#! perl -slw
use strict;
use Data::Dump qw[ pp ];
use Win32::Process qw( NORMAL_PRIORITY_CLASS );
use Time::HiRes qw[ sleep ];
use threads;
use threads::shared;
use Thread::Queue;
our $TIMEOUT //= 10;
our $THREADS //= 16;
our $PROCESSES //= 100;
open DEBUGOUT, '>', 'CON' or die $!;
select DEBUGOUT; $|++; select STDOUT;
my $semDEBUGOUT :shared;
sub tprint{
my $tid = threads->tid;
lock $semDEBUGOUT;
printf DEBUGOUT "%3d: %s\n", $tid, @_;
}
sub tcroak{
tprint( @_ );
die scalar caller();
};
sub qxx {
my( $cmd, @args ) = @_;
my( $rout, $rerr, $wout, $werr, $proch );
open my $stdout, '>&', \*STDOUT or tcroak( $! );
pipe $rout, $wout or tcroak( $! );
binmode $rout, ':crlf';
open STDOUT, '>&', $wout or tcroak( $! );
open my $stderr, '>&', \*STDERR or tcroak( $! );
pipe $rerr, $werr or tcroak( $! );
binmode $rerr, ':crlf';
open STDERR, '>&', $werr or tcroak( $! );
Win32::Process::Create(
$proch, $cmd, join( ' ', $cmd, @args ),
1, NORMAL_PRIORITY_CLASS, "."
) or tcroak( $^E );
close $_ for $wout, $werr;
open STDOUT, '>&', $stdout or tcroak( $! );
open STDERR, '>&', $stderr or tcroak( $! );
return $proch->GetProcessID(), $rout, $rerr;
}
sub worker {
my $Q = shift;
while( defined( my $time = $Q->dequeue ) ) {
my $pid :shared;
my( $th ) = threads->create( sub {
my( $rout, $rerr );
( $pid, $rout, $rerr ) = qxx(
'c:/perl64/bin/perl.exe',
qq[ -le"warn qq/a warning\n/; sleep 1, print for 1 .. $time; print \$\$, ' done'"]
);
tcroak 'Failed to exec' unless $pid;
my $out = join( '', <$rout> ) // '';
my $err = join( '', <$rerr> ) // '';
return $out, $err;
} );
sleep 1;
$th->join && tcroak "Process failed to start" unless $pid;
tprint "Started pid $pid for $time seconds";
my $t = 0;
sleep 0.1 while kill( 0, $pid ) and $t++ < ( 10 * $TIMEOUT );
if( kill( 9, $pid ) ) {
tprint "$pid timed out; killed";
}
else {
tprint "$pid completed sucessfully";
}
my( $out, $err ) = $th->join;
tprint "pid $pid returned:\nout:'$out'\nerr:'$err'";
## Check result;
}
}
my $Q = new Thread::Queue;
my @workers = map async( \&worker, $Q ), 1 .. $THREADS;
$Q->enqueue( map int( rand 2 * $TIMEOUT ), 1 .. $PROCESSES );
$Q->enqueue( (undef) x $THREADS );
$_->join for @workers;
####
c:\test>857569-2 -THREADS=4 -PROCESSES=6 -TIMEOUT=3
1: Started pid 4724 for 5 seconds
2: Started pid 4992 for 3 seconds
3: Started pid 3088 for 4 seconds
4: Started pid 4456 for 3 seconds
1: 4724 timed out; killed
1: pid 4724 returned:
out:''
err:'a warning
'
2: 4992 completed sucessfully
3: 3088 timed out; killed
2: pid 4992 returned:
out:'1
2
3
4992 done
'
err:'a warning
'
4: 4456 completed sucessfully
3: pid 3088 returned:
out:''
err:'a warning
'
4: pid 4456 returned:
out:'1
2
3
4456 done
'
err:'a warning
'
1: Started pid 1924 for 4 seconds
2: Started pid 4784 for 2 seconds
1: 1924 timed out; killed
1: pid 1924 returned:
out:''
err:'a warning
'
2: 4784 completed sucessfully
2: pid 4784 returned:
out:'1
2
4784 done
'
err:'a warning
'