dchidelf has asked for the wisdom of the Perl Monks concerning the following question:
The test script looks like:select STDOUT; $|=1; printf "%d testpipe.pl output\n", scalar time(); while(<STDIN>) { printf "%d %s", scalar time(), $_; printf STDERR "%d YO!\n", scalar time(); } printf "%d testpipe.pl done\n", scalar time(); exit 5;
This runs the testpipe.pl process, sending "HELLO", receiving a line, sending "HI AGAIN", and then reading the rest of the output of the script. It usually crashes on one of the p->close() of the 2nd or greater iterations of the loop.use lib '.'; use ProcOpen; my $log = IO::Handle->new_from_fd(\*STDOUT, 'w'); my $logfunc = sub { my ($lvl, $msg) = @_; chomp($msg); my $loglvl = ProcOpen::logLvlStr($lvl); print $log "ProcOpen[$loglvl] $msg\n"; }; $ProcOpen::DEBUG = 1; ProcOpen::setLogFunc($logfunc); my ($in, $out, $err); for(my $i=0;$i<20;$i++) { my $p = ProcOpen::procopen(\$in, \$out, \$err, "c:\\perl\\bin\\per +l.exe", "testpipe.pl" ); print $in "HELLO\n"; printf "FIRST: %s", scalar <$out>; print $in "HI AGAIN\n"; close($in); while(<$out>) { print "OUT $_"; } print "LOOP DONE\n"; $p->close(); } print "WORKED!!\n";
#!/usr/bin/perl package ProcOpen; use strict; use warnings; use threads; use threads::shared; use Thread::Queue; use File::Temp qw / tempfile /; use Win32API::File ':ALL'; use Win32::Process qw / STILL_ACTIVE NORMAL_PRIORITY_CLASS INFINITE / +; use IO::Select; $ProcOpen::DEBUG = 0; # set to 1 to enable debug logging $ProcOpen::LOGFUNC = sub {}; @ProcOpen::LOGLVLS = qw(STDIN STDOUT STDERR ERROR INFO DEBUG); use constant { LOGSTDIN => 0, LOGSTDOUT => 1, LOGSTDERR => 2, LOGERROR => 3, LOGINFO => 4, LOGDEBUG => 5, }; sub setLogFunc { my ($func) = @_; if (ref($func) eq "CODE") { $ProcOpen::LOGFUNC = $func; return 1; } return 0; } sub logLvlStr { my ($no) = @_; if ($no >= 0 && $no <= 5) { return $ProcOpen::LOGLVLS[$no]; } return "?"; } sub prepPrivateHandle { my ($fh) = @_; my $fd = fileno $fh; return(-1) if (! defined $fd); my $osfh = FdGetOsFHandle($fd); if ($osfh == INVALID_HANDLE_VALUE) { $ProcOpen::LOGFUNC->(ProcOpen::LOGERROR, "prepPrivateHandle: F +dGetOsFHandle failed: $^E"); return(-2); } if (! SetHandleInformation($osfh, (HANDLE_FLAG_INHERIT | HANDLE_FL +AG_PROTECT_FROM_CLOSE), 0)) { $ProcOpen::LOGFUNC->(ProcOpen::LOGERROR, "prepPrivateHandle: S +etHandleInformation failed: $^E"); return(-3); } return 0; } sub hardclose { # Previously closed underlying OS fh, but proved unnecessary my (@handles) = @_; for my $h (@handles) { $h->close(); } } sub procopen { my $self = { 'closeto' => 2000, # ms to wait for process to exit after fo +rced close 'readto' => undef, # sec to wait beforing timing out read fr +om $out / $err handles }; # If first argument is a hash it contains config options if (ref($_[0]) eq 'HASH') { my $options = shift; for (keys(%$options)) { $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBU +G, "set option: $_ = $options->{$_}"); $self->{$_} = $options->{$_}; } } my ($in, $out, $err, $cmd, @args) = @_; # created a shared variable for flagging termination of the proces +s to the threads my $running = 0; my $r_ref = share($running); $self->{'running'} = $r_ref; # create shared variables for holding the temp filenames my $poutname :shared = ""; my $perrname :shared = ""; # STDOUT and STDERR become IOQueue tied handles # Create the underlying Thread::Queues my $obuff = Thread::Queue->new(); my $ebuff = Thread::Queue->new(); # Create a thread for reading each output file (STDOUT / STDERR) $self->{'outthread'} = threads->create('fhreader', \$poutname, $ +obuff, $r_ref, ProcOpen::LOGSTDOUT); $self->{'errthread'} = threads->create('fhreader', \$perrname, $ +ebuff, $r_ref, ProcOpen::LOGSTDERR); my ($pout, $perr); # open the tempfiles for STDOUT / STDERR redirection ($pout, $poutname) = File::Temp::tempfile(); ($perr, $perrname) = File::Temp::tempfile(); if ($ProcOpen::DEBUG) { $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "STDOUT tmpfile: $pou +tname"); $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "STDERR tmpfile: $per +rname"); } # The problem with the normal open3 implementation appears to be t +hat file descriptors # are dupped into a pseudo-process when open3 calls system(1, cmd. +..). # That "process" holds onto those descriptors, so even if we close + them when open3 # returns, our Perl process still has them open, which causes bloc +king # simulate what open3 does on Windows # * swap out the STD* file descriptors # * Spawn the program (asychronously) (use Win32::Process::Create +rather than system(1, ...)) # * swap the file descriptors back # Save copies of STDIN, STDOUT, STDERR my $saveIN = IO::Handle->new_from_fd(\*STDIN, 'r'); my $saveOUT = IO::Handle->new_from_fd(\*STDOUT, 'w'); my $saveERR = IO::Handle->new_from_fd(\*STDERR, 'w'); # create a pipe for the process STDIN pipe STDIN, my $inwrite; # prevent subprocess from inheriting our write handle # Not doing so can result in a deadlock prepPrivateHandle($inwrite); $inwrite->autoflush(1); # redirect STDOUT / STDERR (dup our tmpfile handles) # Then close our do not need the IO:File handles anymore STDOUT->fdopen($pout, 'w'); STDERR->fdopen($perr, 'w'); ProcOpen::hardclose($pout, $perr); # disable output buffering STDOUT->autoflush(1); STDERR->autoflush(1); # Start the sub-process my $fullCmd = join(" ", $cmd, @args); my $subproc; Win32::Process::Create($subproc, $cmd, $fullCmd, 1, Win32::Process +::NORMAL_PRIORITY_CLASS, ".") || die ErrorReport(); # Restore the original STDIN, STDOUT, STDERR ProcOpen::hardclose(\*STDIN, \*STDOUT, \*STDERR); # we don't want +the spawned process's STDs STDIN->fdopen($saveIN, 'r'); STDOUT->fdopen($saveOUT, 'w'); STDERR->fdopen($saveERR, 'w'); ProcOpen::hardclose($saveIN, $saveOUT, $saveERR); # close the orig +inal saves because we dup'd my $pid = $subproc->GetProcessID(); $self->{'subproc'} = $subproc; $self->{'pid'} = $pid; $ProcOpen::LOGFUNC->(ProcOpen::LOGINFO, "Started '$fullCmd' PID:$p +id"); # Make sure our reader threads have started and are waiting for ou +r signal if ($running < 2) { my $absto = time() + 4; $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, " +Waiting for readers to begin execution"); lock($running); do { cond_timedwait($running, $absto) || last; } while ($running < 2); if ($running < 2) { $ProcOpen::LOGFUNC->(ProcOpen::LOGERROR, "Subprocess reade +r threads did not start!"); } elsif ($ProcOpen::DEBUG) { $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "Readers initiali +zed"); } } # notify the freader threads that they are ready to read $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "Sign +alling STDOUT reader to start"); { lock($poutname); cond_signal($poutname); } $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "Sign +alling STDERR reader to start"); { lock($perrname); cond_signal($perrname); } # create an IOQueue tied handle for each of the thread queues and +set the user's handles tie *OFH, "IOQueue", $obuff, $self; tie *EFH, "IOQueue", $ebuff, $self; $$out = \*OFH; $$err = \*EFH; # create a ProcInHandle tied handle for controlling the input stre +am and set the user handle tie *IFH, "ProcInHandle", $inwrite, $r_ref; $$in = \*IFH; $self->{'inh'} = \*IFH; bless $self; } sub close { my $self = shift; #$ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "RC +value: " . $self->{'rc'}); if (! defined $self->{'rc'}) { $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, " +Requested close on ProcOpen Object"); my $rc; my $othr = $self->{'outthread'}; my $ethr = $self->{'errthread'}; my $subproc = $self->{'subproc'}; my $inh = $self->{'inh'}; my $pid = $self->{'pid'}; print "About to close\n"; # Is our side of the subproc STDIN pipe still open? if (defined $inh && $inh->opened()) { print "It's open\n"; close($inh); # close our subprocess stdin } $subproc->GetExitCode($rc); if ($rc == Win32::Process::STILL_ACTIVE) { $subproc->Wait($self->{'closeto'}); $subproc->GetExitCode($rc); if ($rc == Win32::Process::STILL_ACTIVE) { $ProcOpen::LOGFUNC->(ProcOpen::LOGINFO, "Sending KILL +to subprocess PID:$pid"); $subproc->Kill(0); $subproc->Wait($self->{'closeto'}); $subproc->GetExitCode($rc); if ($rc == Win32::Process::STILL_ACTIVE) { $ProcOpen::LOGFUNC->(ProcOpen::LOGINFO, "Subproces +s PID:$pid still active"); $rc = -1; } } } $self->{'rc'} = $rc; print "Join Threads $othr $ethr\n"; $othr->join(); print "part done\n"; print "Join Threads $othr $ethr\n"; $ethr->join(); print "Threads joined\n"; } return $self->{'rc'}; } sub fhreader { my ($filename, $ioQueue, $running, $logtype) = @_; # purely a worker, no reason to keep these open STDOUT->close(); STDERR->close(); STDIN->close(); { # wait for the main thread to signal us to start (filename is +ready) lock($$filename); # need to make sure the main thread knows we are waiting { lock($$running); ++$$running; cond_signal($$running); } cond_wait($$filename); } my $fh = IO::File->new($$filename, 'r'); if (! $fh) { $ProcOpen::LOGFUNC->(ProcOpen::LOGERROR, "Could not open fhrea +der file $$filename"); } else { # tail the file until the spawned process has terminated # Appending each line to the ioQueue my $loopcnt = 0; while($$running) { while(<$fh>) { $ioQueue->enqueue($_); $ProcOpen::LOGFUNC->($logtype, $_); } # Log a WAITING debug message every second (every 10 times +) $ProcOpen::DEBUG && ($loopcnt++%10==0) && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "WAITING FOR +" . ProcOpen::logLvlStr($logtype)); select(undef,undef,undef,0.1); # sleep 0.1 seconds seek($fh, 0, 1); # clear EOF $fh->clearerr(); # clear EOF } # one more attempt to read anything written in the final 0.1 s +econds while(<$fh>) { $ioQueue->enqueue($_); $ProcOpen::LOGFUNC->($logtype, $_); } $fh->close(); unlink($$filename); $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, P +rocOpen::logLvlStr($logtype) . " CLOSED"); } # can not read anymore, so shutdown queue as well $ioQueue->end(); threads->exit(); } package ProcInHandle; use Win32API::File ':ALL'; sub TIEHANDLE { my $class = shift; my $fh = shift; my $running = shift; my $self = {}; $self->{'fh'} = $fh; bless $self, $class; } sub FILENO { my ($self) = @_; return $self->{'fh'}->fileno(); } sub WRITE { my ($self, @args) = @_; $ProcOpen::LOGFUNC->(ProcOpen::LOGSTDIN, join('',@args)); return $self->{'fh'}->write(@args); } sub PRINT { my ($self, @args) = @_; $ProcOpen::LOGFUNC->(ProcOpen::LOGSTDIN, join('',@args)); return $self->{'fh'}->print(@args); } sub PRINTF { my ($self, @args) = @_; $ProcOpen::LOGFUNC->(ProcOpen::LOGSTDIN, sprintf(@args)); return $self->{'fh'}->printf(@args); } sub CLOSE { my ($self) = @_; $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "STDI +N Closed"); $self->{'fh'}->flush(); $self->{'fh'}->close(); return 1; } package IOQueue; sub TIEHANDLE { my $class = shift; my $buff = shift; my $procopen = shift; my $self = {}; $self->{'buff'} = $buff; $self->{'readto'} = $procopen->{'readto'}; $self->{'subproc'} = $procopen->{'subproc'}; $self->{'running'} = $procopen->{'running'}; bless $self, $class; } sub EOF { my $self = shift; my $i = $self->{'buff'}->pending(); if (! defined $i) { return 1; } return 0; } sub READLINE { my ($self) = @_; my $to = 14400; my $subproc = $self->{'subproc'}; my $run_ref = $self->{'running'}; if ($$run_ref) { # check if the process is still running my $rc; $subproc->GetExitCode($rc); if ($rc != Win32::Process::STILL_ACTIVE) { $$run_ref = 0; $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "Subprocess has e +xited with rc:$rc"); } elsif ($ProcOpen::DEBUG) { $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "Subprocess still + running"); } } if (defined $self->{'readto'}) { $to = $self->{'readto'}; } if (wantarray) { my @lines = (); while(my $x = $self->{'buff'}->dequeue_timed($to)) { push(@lines, $x); } return @lines; } else { my $x = $self->{'buff'}->dequeue_timed($to); return $x; } } 1;
|
|---|
| Replies are listed 'Best First'. | |
|---|---|
|
Re: Having Win32 Perl crashes using ithreads
by Athanasius (Archbishop) on Feb 16, 2016 at 13:34 UTC | |
|
Re: Having Win32 Perl crashs using ithreads
by BrowserUk (Patriarch) on Feb 16, 2016 at 14:32 UTC | |
by dchidelf (Novice) on Feb 16, 2016 at 17:14 UTC | |
|
Re: Having Win32 Perl crashs using ithreads
by marioroy (Prior) on Feb 16, 2016 at 19:48 UTC | |
by dchidelf (Novice) on Feb 18, 2016 at 21:11 UTC | |
by marioroy (Prior) on Feb 19, 2016 at 00:17 UTC | |
by dchidelf (Novice) on Feb 19, 2016 at 18:06 UTC | |
by marioroy (Prior) on Feb 19, 2016 at 22:14 UTC |