#!/usr/bin/perl package ProcOpen; use strict; use warnings; use threads; use threads::shared; use Thread::Queue; use Win32API::File ':ALL'; use Win32::Process qw / STILL_ACTIVE NORMAL_PRIORITY_CLASS INFINITE / +; use IO::Select; $ProcOpen::DEBUG = 0; # set to 1 to enable debug logging $ProcOpen::LOGFUNC = sub {}; @ProcOpen::LOGLVLS = qw(STDIN STDOUT STDERR ERROR INFO DEBUG); use constant { LOGSTDIN => 0, LOGSTDOUT => 1, LOGSTDERR => 2, LOGERROR => 3, LOGINFO => 4, LOGDEBUG => 5, }; sub setLogFunc { my ($func) = @_; if (ref($func) eq "CODE") { $ProcOpen::LOGFUNC = $func; return 1; } return 0; } sub logLvlStr { my ($no) = @_; if ($no >= 0 && $no <= $#ProcOpen::LOGLVLS) { return $ProcOpen::LOGLVLS[$no]; } return "?"; } sub prepPrivateHandle { my ($fh) = @_; my $fd = fileno $fh; return(-1) if (! defined $fd); my $osfh = FdGetOsFHandle($fd); if ($osfh == INVALID_HANDLE_VALUE) { $ProcOpen::LOGFUNC->(ProcOpen::LOGERROR, "prepPrivateHandle: F +dGetOsFHandle failed: $^E"); return(-2); } if (! SetHandleInformation($osfh, (HANDLE_FLAG_INHERIT | HANDLE_FL +AG_PROTECT_FROM_CLOSE), 0)) { $ProcOpen::LOGFUNC->(ProcOpen::LOGERROR, "prepPrivateHandle: S +etHandleInformation failed: $^E"); return(-3); } return 0; } sub softclose { # normal Perl IO Handle close on each handle passed my (@handles) = @_; for my $h (@handles) { $h->close(); } } sub hardclose { # not only close the regular Perl IO Handle, but close the underly +ing Win32 Handle as well # This is primarily useful for closing the handles we had open whe +n creating a new thread # forcing the handle closed in the entire process, even though the + ithread holds a copy of # the perl handle (can't use :shared on the handle, might yet be +another way though) my (@handles) = @_; for my $h (@handles) { my $fd = fileno $h; if (defined $fd && $fd >= 0) { my $osfh = FdGetOsFHandle($fd); # get the OS native file +handle if (! CloseHandle($osfh)) { $ProcOpen::LOGFUNC->(ProcOpen::LOGERROR, "ProcOpen::ha +rcdlose: CloseHandle failed: $^E"); } else { $h->close(); } } else { $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "ProcOpen::ha +rdclose: Failed getting fileno for $h"); } } } sub procopen { my $self = { 'closeto' => 2000, # ms to wait for process to exit after fo +rced close 'readto' => undef, # sec to wait beforing timing out read fr +om $out / $err handles }; # If first argument is a hash it contains config options if (ref($_[0]) eq 'HASH') { my $options = shift; for (keys(%$options)) { $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBU +G, "set option: $_ = $options->{$_}"); $self->{$_} = $options->{$_}; } } my ($in, $out, $err, $cmd, @args) = @_; # created a shared variable for flagging reader thread count to th +e main thread 0->2 # then termination of the subprocess ->0 my $running :shared = 0; $self->{'running'} = \$running; # create shared variables for synchronizing start of readers my $goreaders :shared = 0; # $out and $err become IOQueue tied handles # Create the underlying Thread::Queues my $obuff = Thread::Queue->new(); my $ebuff = Thread::Queue->new(); # The subprocess STDOUT and STDERR become pipes connected to reade +r threads # Those threads will feed the IOQueue Thread::Queues pipe my $outread, my $outwrite; pipe my $errread, my $errwrite; # prevent subprocess from inheriting our read handles prepPrivateHandle($outread); prepPrivateHandle($errread); # Create a thread for reading each output file (STDOUT / STDERR) $self->{'outthread'} = threads->create('fhreader', \$goreaders, \$ +running, $outread, $obuff, ProcOpen::LOGSTDOUT); $self->{'errthread'} = threads->create('fhreader', \$goreaders, \$ +running, $errread, $ebuff, ProcOpen::LOGSTDERR); # Make sure our reader threads have started and are waiting for ou +r signal # This gives them a chance to do any prep (close handles) before w +e spawn the subproc { lock($running); if ($running < 2) { # there is no reason the threads should take more than a s +econd or two to initialize # So make sure we don't sit around waiting forever my $absto = time() + 4; # 4 second timeout $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBU +G, "Waiting for readers to begin execution"); do { cond_timedwait($running, $absto) || last; } while ($running < 2); if ($running < 2) { # should be rare, not worth graceful recovery. probab +ly?? die "ProcOpen: Subprocess reader threads did not start +!\n"; } elsif ($ProcOpen::DEBUG) { $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "Readers init +ialized"); # and waiting for our signal } } } # The problem with the normal open3 implementation appears to be t +hat file descriptors # are dupped into a pseudo-process when open3 calls system(1, cmd. +..). # That "process" holds onto those descriptors, so even if we close + them when open3 # returns, our Perl process still has them open, which causes bloc +king. # Simulate what open3 does on Windows # * swap out the STD* file descriptors # * Spawn the program (asychronously) (use Win32::Process::Create +rather than system(1, ...)) # * swap the file descriptors back # Save copies of STDIN, STDOUT, STDERR my $saveIN = IO::Handle->new_from_fd(\*STDIN, 'r'); my $saveOUT = IO::Handle->new_from_fd(\*STDOUT, 'w'); my $saveERR = IO::Handle->new_from_fd(\*STDERR, 'w'); # create a pipe for the process STDIN pipe STDIN, my $inwrite; # prevent subprocess from inheriting our write handle # Not doing so can result in a deadlock prepPrivateHandle($inwrite); $inwrite->autoflush(1); # redirect STDOUT / STDERR (dup our tmpfile handles) # Then close ours, do not need the IO:File handles anymore STDOUT->fdopen($outwrite, 'w'); STDERR->fdopen($errwrite, 'w'); # Need hardclose because our threads hold copies of the handles an +d prevent # the refcnt on the handles from hitting zero and closing the OS H +andle # hardclose will close the OS Handle ProcOpen::hardclose($outwrite, $errwrite); # disable output buffering STDOUT->autoflush(1); STDERR->autoflush(1); # Start the sub-process my $fullCmd = join(" ", $cmd, @args); my $subproc; my $wpc_err = ""; $running = Win32::Process::Create($subproc, $cmd, $fullCmd, 1, Win +32::Process::NORMAL_PRIORITY_CLASS, "."); if (! $running) { $wpc_err = Win32::FormatMessage( Win32::GetLastError() ); } # Restore the original STDIN, STDOUT, STDERR ProcOpen::softclose(\*STDIN, \*STDOUT, \*STDERR); # we don't want +the spawned process's STDs STDIN->fdopen($saveIN, 'r'); STDOUT->fdopen($saveOUT, 'w'); STDERR->fdopen($saveERR, 'w'); ProcOpen::softclose($saveIN, $saveOUT, $saveERR); # close the orig +inal saves because we dup'd # notify the freader threads that they are ready to read $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "Sign +alling readers to start"); { lock($goreaders); cond_broadcast($goreaders); } if (! $running) { $ProcOpen::LOGFUNC->(ProcOpen::LOGERROR, "Win32::Process::Crea +te: $wpc_err"); $ProcOpen::LOGFUNC->(ProcOpen::LOGERROR, "Failed to start $ful +lCmd"); ProcOpen::softclose($inwrite); $self->{'outthread'}->join(); $self->{'errthread'}->join(); return undef; } my $pid = $subproc->GetProcessID(); $self->{'subproc'} = $subproc; $self->{'pid'} = $pid; $ProcOpen::LOGFUNC->(ProcOpen::LOGINFO, "Started '$fullCmd' PID:$p +id"); # create an IOQueue tied handle for each of the thread queues and +set the user's handles tie *OFH, "IOQueue", $obuff, $self; tie *EFH, "IOQueue", $ebuff, $self; $self->{'ofh'} = $$out = \*OFH; $self->{'efh'} = $$err = \*EFH; # create a ProcInHandle tied handle for controlling the input stre +am and set the user handle tie *IFH, "ProcInHandle", $inwrite; $self->{'ifh'} = $$in = \*IFH; bless $self; } sub close { my $self = shift; if (! defined $self->{'rc'}) { $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, " +Requested close on ProcOpen Object"); my $rc; my $othr = $self->{'outthread'}; my $ethr = $self->{'errthread'}; my $subproc = $self->{'subproc'}; my $ifh = $self->{'ifh'}; my $ofh = $self->{'ofh'}; my $efh = $self->{'efh'}; my $pid = $self->{'pid'}; # Is our side of the subproc STDIN pipe still open? if (defined $ifh && $ifh->opened()) { close($ifh); # close our subprocess stdin untie *{$ifh}; } $subproc->GetExitCode($rc); if ($rc == Win32::Process::STILL_ACTIVE) { $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBU +G, "Subproc $pid still running. Waiting."); $subproc->Wait($self->{'closeto'}); $subproc->GetExitCode($rc); if ($rc == Win32::Process::STILL_ACTIVE) { $ProcOpen::LOGFUNC->(ProcOpen::LOGINFO, "Sending KILL +to subprocess PID:$pid"); $subproc->Kill(0); $subproc->Wait($self->{'closeto'}); $subproc->GetExitCode($rc); if ($rc == Win32::Process::STILL_ACTIVE) { $ProcOpen::LOGFUNC->(ProcOpen::LOGINFO, "Subproces +s PID:$pid still active"); $rc = -1; } } } $self->{'rc'} = $rc; $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, " +Subproc $pid exit status: $rc"); close($ofh); close($efh); untie *{$ofh}; untie *{$efh}; $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, " +Wait for reader threads to terminate"); $othr->join(); $ethr->join(); $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, " +ProcOpen::close complete"); } return $self->{'rc'}; } sub fhreader { my ($start, $running, $reader, $ioQueue, $logtype) = @_; # purely a worker, no reason to keep these open # Besides they cause IO blocking / deadlocks STDOUT->close(); STDERR->close(); STDIN->close(); # wait for the main thread to signal us to start { lock($$start); # need to make sure the main thread knows we are waiting # increment $running, so it knows when 2 readers are ready { lock($$running); ++$$running; cond_signal($$running); } cond_wait($$start); # wait to start } return (0) if (!$running); # Read from the pipe that is connected to the process output # Appending each line to the ioQueue while(<$reader>) { $ioQueue->enqueue($_); $ProcOpen::LOGFUNC->($logtype, $_); } $reader->close(); $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, ProcO +pen::logLvlStr($logtype) . " CLOSED"); # can not read anymore, so shutdown queue as well $ioQueue->end(); return (0); } package ProcInHandle; use Win32API::File ':ALL'; sub TIEHANDLE { my $class = shift; my $fh = shift; my $self = {}; $self->{'fh'} = $fh; bless $self, $class; } sub FILENO { my ($self) = @_; return $self->{'fh'}->fileno(); } sub WRITE { my ($self, @args) = @_; $ProcOpen::LOGFUNC->(ProcOpen::LOGSTDIN, join('',@args)); return $self->{'fh'}->write(@args); } sub PRINT { my ($self, @args) = @_; $ProcOpen::LOGFUNC->(ProcOpen::LOGSTDIN, join('',@args)); return $self->{'fh'}->print(@args); } sub PRINTF { my ($self, @args) = @_; $ProcOpen::LOGFUNC->(ProcOpen::LOGSTDIN, sprintf(@args)); return $self->{'fh'}->printf(@args); } sub CLOSE { my ($self) = @_; $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "STDI +N Closed"); $self->{'fh'}->flush(); $self->{'fh'}->close(); return 1; } package IOQueue; sub TIEHANDLE { my $class = shift; my $buff = shift; my $procopen = shift; my $self = {}; $self->{'buff'} = $buff; $self->{'readto'} = $procopen->{'readto'}; $self->{'subproc'} = $procopen->{'subproc'}; $self->{'running'} = $procopen->{'running'}; bless $self, $class; } sub EOF { my $self = shift; my $i = $self->{'buff'}->pending(); if (! defined $i) { return 1; } return 0; } sub READLINE { my ($self) = @_; my $to = 14400; my $subproc = $self->{'subproc'}; my $run_ref = $self->{'running'}; my $rc = Win32::Process::STILL_ACTIVE; if ($$run_ref) { # check if the process is still running $subproc->GetExitCode($rc); if ($rc != Win32::Process::STILL_ACTIVE) { $$run_ref = 0; $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "Subprocess has e +xited with rc:$rc"); } elsif ($ProcOpen::DEBUG) { $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "Subprocess still + running"); } } if (defined $self->{'readto'}) { $to = $self->{'readto'}; } my $quitinTime = time() + $to; if (wantarray) { my @lines = (); my $x; # get everything that doesn't require blocking while($x = $self->{'buff'}->dequeue_nb()) { push(@lines, $x); } if ($to > 0) { # if we are willing to block # wait in 1 second intervals until the readto or the subpr +oc terminates while($quitinTime > time()) { if ($x = $self->{'buff'}->dequeue_timed(1)) { push(@lines, $x); while($x = $self->{'buff'}->dequeue_nb()) { push(@lines, $x); } } # $rc was set above, we want to block for one iteratio +n after termination # of the subproc so we pick up late written data if ($rc != Win32::Process::STILL_ACTIVE) { $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen: +:LOGDEBUG, "Subprocess has exited with rc:$rc"); $$run_ref = 0; last; } # check for subproc termination again $subproc->GetExitCode($rc); } } return @lines; } else { my $x = $self->{'buff'}->dequeue_nb(); if ((! $x) && ($to > 0)) { while($quitinTime > time()) { $x = $self->{'buff'}->dequeue_timed(1); last if ($x); if ($rc != Win32::Process::STILL_ACTIVE) { $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen: +:LOGDEBUG, "Subprocess has exited with rc:$rc"); $$run_ref = 0; last; } $subproc->GetExitCode($rc); } } return $x; } } sub CLOSE { $_[0] = {}; bless $_[0]; } 1;
In reply to Re^4: Having Win32 Perl crashs using ithreads
by dchidelf
in thread Having Win32 Perl crashs using ithreads
by dchidelf
| For: | Use: | ||
| & | & | ||
| < | < | ||
| > | > | ||
| [ | [ | ||
| ] | ] |