Thanks I added calls to my IOQueue close functions in the ProcOpen::close and also untied the variables as you suggested.

Since I got around the crashes I have also replaced the temp files used for the process STDOUT/STDERR with pipes, so that simplifies some of the processing (don't need to tail the files, can just read from the pipe until it closes). All seems well with it.

Here is the "final" module if anyone is interested.
#!/usr/bin/perl package ProcOpen; use strict; use warnings; use threads; use threads::shared; use Thread::Queue; use Win32API::File ':ALL'; use Win32::Process qw / STILL_ACTIVE NORMAL_PRIORITY_CLASS INFINITE / +; use IO::Select; $ProcOpen::DEBUG = 0; # set to 1 to enable debug logging $ProcOpen::LOGFUNC = sub {}; @ProcOpen::LOGLVLS = qw(STDIN STDOUT STDERR ERROR INFO DEBUG); use constant { LOGSTDIN => 0, LOGSTDOUT => 1, LOGSTDERR => 2, LOGERROR => 3, LOGINFO => 4, LOGDEBUG => 5, }; sub setLogFunc { my ($func) = @_; if (ref($func) eq "CODE") { $ProcOpen::LOGFUNC = $func; return 1; } return 0; } sub logLvlStr { my ($no) = @_; if ($no >= 0 && $no <= $#ProcOpen::LOGLVLS) { return $ProcOpen::LOGLVLS[$no]; } return "?"; } sub prepPrivateHandle { my ($fh) = @_; my $fd = fileno $fh; return(-1) if (! defined $fd); my $osfh = FdGetOsFHandle($fd); if ($osfh == INVALID_HANDLE_VALUE) { $ProcOpen::LOGFUNC->(ProcOpen::LOGERROR, "prepPrivateHandle: F +dGetOsFHandle failed: $^E"); return(-2); } if (! SetHandleInformation($osfh, (HANDLE_FLAG_INHERIT | HANDLE_FL +AG_PROTECT_FROM_CLOSE), 0)) { $ProcOpen::LOGFUNC->(ProcOpen::LOGERROR, "prepPrivateHandle: S +etHandleInformation failed: $^E"); return(-3); } return 0; } sub softclose { # normal Perl IO Handle close on each handle passed my (@handles) = @_; for my $h (@handles) { $h->close(); } } sub hardclose { # not only close the regular Perl IO Handle, but close the underly +ing Win32 Handle as well # This is primarily useful for closing the handles we had open whe +n creating a new thread # forcing the handle closed in the entire process, even though the + ithread holds a copy of # the perl handle (can't use :shared on the handle, might yet be +another way though) my (@handles) = @_; for my $h (@handles) { my $fd = fileno $h; if (defined $fd && $fd >= 0) { my $osfh = FdGetOsFHandle($fd); # get the OS native file +handle if (! CloseHandle($osfh)) { $ProcOpen::LOGFUNC->(ProcOpen::LOGERROR, "ProcOpen::ha +rcdlose: CloseHandle failed: $^E"); } else { $h->close(); } } else { $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "ProcOpen::ha +rdclose: Failed getting fileno for $h"); } } } sub procopen { my $self = { 'closeto' => 2000, # ms to wait for process to exit after fo +rced close 'readto' => undef, # sec to wait beforing timing out read fr +om $out / $err handles }; # If first argument is a hash it contains config options if (ref($_[0]) eq 'HASH') { my $options = shift; for (keys(%$options)) { $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBU +G, "set option: $_ = $options->{$_}"); $self->{$_} = $options->{$_}; } } my ($in, $out, $err, $cmd, @args) = @_; # created a shared variable for flagging reader thread count to th +e main thread 0->2 # then termination of the subprocess ->0 my $running :shared = 0; $self->{'running'} = \$running; # create shared variables for synchronizing start of readers my $goreaders :shared = 0; # $out and $err become IOQueue tied handles # Create the underlying Thread::Queues my $obuff = Thread::Queue->new(); my $ebuff = Thread::Queue->new(); # The subprocess STDOUT and STDERR become pipes connected to reade +r threads # Those threads will feed the IOQueue Thread::Queues pipe my $outread, my $outwrite; pipe my $errread, my $errwrite; # prevent subprocess from inheriting our read handles prepPrivateHandle($outread); prepPrivateHandle($errread); # Create a thread for reading each output file (STDOUT / STDERR) $self->{'outthread'} = threads->create('fhreader', \$goreaders, \$ +running, $outread, $obuff, ProcOpen::LOGSTDOUT); $self->{'errthread'} = threads->create('fhreader', \$goreaders, \$ +running, $errread, $ebuff, ProcOpen::LOGSTDERR); # Make sure our reader threads have started and are waiting for ou +r signal # This gives them a chance to do any prep (close handles) before w +e spawn the subproc { lock($running); if ($running < 2) { # there is no reason the threads should take more than a s +econd or two to initialize # So make sure we don't sit around waiting forever my $absto = time() + 4; # 4 second timeout $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBU +G, "Waiting for readers to begin execution"); do { cond_timedwait($running, $absto) || last; } while ($running < 2); if ($running < 2) { # should be rare, not worth graceful recovery. probab +ly?? die "ProcOpen: Subprocess reader threads did not start +!\n"; } elsif ($ProcOpen::DEBUG) { $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "Readers init +ialized"); # and waiting for our signal } } } # The problem with the normal open3 implementation appears to be t +hat file descriptors # are dupped into a pseudo-process when open3 calls system(1, cmd. +..). # That "process" holds onto those descriptors, so even if we close + them when open3 # returns, our Perl process still has them open, which causes bloc +king. # Simulate what open3 does on Windows # * swap out the STD* file descriptors # * Spawn the program (asychronously) (use Win32::Process::Create +rather than system(1, ...)) # * swap the file descriptors back # Save copies of STDIN, STDOUT, STDERR my $saveIN = IO::Handle->new_from_fd(\*STDIN, 'r'); my $saveOUT = IO::Handle->new_from_fd(\*STDOUT, 'w'); my $saveERR = IO::Handle->new_from_fd(\*STDERR, 'w'); # create a pipe for the process STDIN pipe STDIN, my $inwrite; # prevent subprocess from inheriting our write handle # Not doing so can result in a deadlock prepPrivateHandle($inwrite); $inwrite->autoflush(1); # redirect STDOUT / STDERR (dup our tmpfile handles) # Then close ours, do not need the IO:File handles anymore STDOUT->fdopen($outwrite, 'w'); STDERR->fdopen($errwrite, 'w'); # Need hardclose because our threads hold copies of the handles an +d prevent # the refcnt on the handles from hitting zero and closing the OS H +andle # hardclose will close the OS Handle ProcOpen::hardclose($outwrite, $errwrite); # disable output buffering STDOUT->autoflush(1); STDERR->autoflush(1); # Start the sub-process my $fullCmd = join(" ", $cmd, @args); my $subproc; my $wpc_err = ""; $running = Win32::Process::Create($subproc, $cmd, $fullCmd, 1, Win +32::Process::NORMAL_PRIORITY_CLASS, "."); if (! $running) { $wpc_err = Win32::FormatMessage( Win32::GetLastError() ); } # Restore the original STDIN, STDOUT, STDERR ProcOpen::softclose(\*STDIN, \*STDOUT, \*STDERR); # we don't want +the spawned process's STDs STDIN->fdopen($saveIN, 'r'); STDOUT->fdopen($saveOUT, 'w'); STDERR->fdopen($saveERR, 'w'); ProcOpen::softclose($saveIN, $saveOUT, $saveERR); # close the orig +inal saves because we dup'd # notify the freader threads that they are ready to read $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "Sign +alling readers to start"); { lock($goreaders); cond_broadcast($goreaders); } if (! $running) { $ProcOpen::LOGFUNC->(ProcOpen::LOGERROR, "Win32::Process::Crea +te: $wpc_err"); $ProcOpen::LOGFUNC->(ProcOpen::LOGERROR, "Failed to start $ful +lCmd"); ProcOpen::softclose($inwrite); $self->{'outthread'}->join(); $self->{'errthread'}->join(); return undef; } my $pid = $subproc->GetProcessID(); $self->{'subproc'} = $subproc; $self->{'pid'} = $pid; $ProcOpen::LOGFUNC->(ProcOpen::LOGINFO, "Started '$fullCmd' PID:$p +id"); # create an IOQueue tied handle for each of the thread queues and +set the user's handles tie *OFH, "IOQueue", $obuff, $self; tie *EFH, "IOQueue", $ebuff, $self; $self->{'ofh'} = $$out = \*OFH; $self->{'efh'} = $$err = \*EFH; # create a ProcInHandle tied handle for controlling the input stre +am and set the user handle tie *IFH, "ProcInHandle", $inwrite; $self->{'ifh'} = $$in = \*IFH; bless $self; } sub close { my $self = shift; if (! defined $self->{'rc'}) { $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, " +Requested close on ProcOpen Object"); my $rc; my $othr = $self->{'outthread'}; my $ethr = $self->{'errthread'}; my $subproc = $self->{'subproc'}; my $ifh = $self->{'ifh'}; my $ofh = $self->{'ofh'}; my $efh = $self->{'efh'}; my $pid = $self->{'pid'}; # Is our side of the subproc STDIN pipe still open? if (defined $ifh && $ifh->opened()) { close($ifh); # close our subprocess stdin untie *{$ifh}; } $subproc->GetExitCode($rc); if ($rc == Win32::Process::STILL_ACTIVE) { $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBU +G, "Subproc $pid still running. Waiting."); $subproc->Wait($self->{'closeto'}); $subproc->GetExitCode($rc); if ($rc == Win32::Process::STILL_ACTIVE) { $ProcOpen::LOGFUNC->(ProcOpen::LOGINFO, "Sending KILL +to subprocess PID:$pid"); $subproc->Kill(0); $subproc->Wait($self->{'closeto'}); $subproc->GetExitCode($rc); if ($rc == Win32::Process::STILL_ACTIVE) { $ProcOpen::LOGFUNC->(ProcOpen::LOGINFO, "Subproces +s PID:$pid still active"); $rc = -1; } } } $self->{'rc'} = $rc; $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, " +Subproc $pid exit status: $rc"); close($ofh); close($efh); untie *{$ofh}; untie *{$efh}; $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, " +Wait for reader threads to terminate"); $othr->join(); $ethr->join(); $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, " +ProcOpen::close complete"); } return $self->{'rc'}; } sub fhreader { my ($start, $running, $reader, $ioQueue, $logtype) = @_; # purely a worker, no reason to keep these open # Besides they cause IO blocking / deadlocks STDOUT->close(); STDERR->close(); STDIN->close(); # wait for the main thread to signal us to start { lock($$start); # need to make sure the main thread knows we are waiting # increment $running, so it knows when 2 readers are ready { lock($$running); ++$$running; cond_signal($$running); } cond_wait($$start); # wait to start } return (0) if (!$running); # Read from the pipe that is connected to the process output # Appending each line to the ioQueue while(<$reader>) { $ioQueue->enqueue($_); $ProcOpen::LOGFUNC->($logtype, $_); } $reader->close(); $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, ProcO +pen::logLvlStr($logtype) . " CLOSED"); # can not read anymore, so shutdown queue as well $ioQueue->end(); return (0); } package ProcInHandle; use Win32API::File ':ALL'; sub TIEHANDLE { my $class = shift; my $fh = shift; my $self = {}; $self->{'fh'} = $fh; bless $self, $class; } sub FILENO { my ($self) = @_; return $self->{'fh'}->fileno(); } sub WRITE { my ($self, @args) = @_; $ProcOpen::LOGFUNC->(ProcOpen::LOGSTDIN, join('',@args)); return $self->{'fh'}->write(@args); } sub PRINT { my ($self, @args) = @_; $ProcOpen::LOGFUNC->(ProcOpen::LOGSTDIN, join('',@args)); return $self->{'fh'}->print(@args); } sub PRINTF { my ($self, @args) = @_; $ProcOpen::LOGFUNC->(ProcOpen::LOGSTDIN, sprintf(@args)); return $self->{'fh'}->printf(@args); } sub CLOSE { my ($self) = @_; $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "STDI +N Closed"); $self->{'fh'}->flush(); $self->{'fh'}->close(); return 1; } package IOQueue; sub TIEHANDLE { my $class = shift; my $buff = shift; my $procopen = shift; my $self = {}; $self->{'buff'} = $buff; $self->{'readto'} = $procopen->{'readto'}; $self->{'subproc'} = $procopen->{'subproc'}; $self->{'running'} = $procopen->{'running'}; bless $self, $class; } sub EOF { my $self = shift; my $i = $self->{'buff'}->pending(); if (! defined $i) { return 1; } return 0; } sub READLINE { my ($self) = @_; my $to = 14400; my $subproc = $self->{'subproc'}; my $run_ref = $self->{'running'}; my $rc = Win32::Process::STILL_ACTIVE; if ($$run_ref) { # check if the process is still running $subproc->GetExitCode($rc); if ($rc != Win32::Process::STILL_ACTIVE) { $$run_ref = 0; $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "Subprocess has e +xited with rc:$rc"); } elsif ($ProcOpen::DEBUG) { $ProcOpen::LOGFUNC->(ProcOpen::LOGDEBUG, "Subprocess still + running"); } } if (defined $self->{'readto'}) { $to = $self->{'readto'}; } my $quitinTime = time() + $to; if (wantarray) { my @lines = (); my $x; # get everything that doesn't require blocking while($x = $self->{'buff'}->dequeue_nb()) { push(@lines, $x); } if ($to > 0) { # if we are willing to block # wait in 1 second intervals until the readto or the subpr +oc terminates while($quitinTime > time()) { if ($x = $self->{'buff'}->dequeue_timed(1)) { push(@lines, $x); while($x = $self->{'buff'}->dequeue_nb()) { push(@lines, $x); } } # $rc was set above, we want to block for one iteratio +n after termination # of the subproc so we pick up late written data if ($rc != Win32::Process::STILL_ACTIVE) { $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen: +:LOGDEBUG, "Subprocess has exited with rc:$rc"); $$run_ref = 0; last; } # check for subproc termination again $subproc->GetExitCode($rc); } } return @lines; } else { my $x = $self->{'buff'}->dequeue_nb(); if ((! $x) && ($to > 0)) { while($quitinTime > time()) { $x = $self->{'buff'}->dequeue_timed(1); last if ($x); if ($rc != Win32::Process::STILL_ACTIVE) { $ProcOpen::DEBUG && $ProcOpen::LOGFUNC->(ProcOpen: +:LOGDEBUG, "Subprocess has exited with rc:$rc"); $$run_ref = 0; last; } $subproc->GetExitCode($rc); } } return $x; } } sub CLOSE { $_[0] = {}; bless $_[0]; } 1;

In reply to Re^4: Having Win32 Perl crashs using ithreads by dchidelf
in thread Having Win32 Perl crashs using ithreads by dchidelf

Title:
Use:  <p> text here (a paragraph) </p>
and:  <code> code here </code>
to format your post, it's "PerlMonks-approved HTML":



  • Posts are HTML formatted. Put <p> </p> tags around your paragraphs. Put <code> </code> tags around your code and data!
  • Titles consisting of a single word are discouraged, and in most cases are disallowed outright.
  • Read Where should I post X? if you're not absolutely sure you're posting in the right place.
  • Please read these before you post! —
  • Posts may use any of the Perl Monks Approved HTML tags:
    a, abbr, b, big, blockquote, br, caption, center, col, colgroup, dd, del, details, div, dl, dt, em, font, h1, h2, h3, h4, h5, h6, hr, i, ins, li, ol, p, pre, readmore, small, span, spoiler, strike, strong, sub, summary, sup, table, tbody, td, tfoot, th, thead, tr, tt, u, ul, wbr
  • You may need to use entities for some characters, as follows. (Exception: Within code tags, you can put the characters literally.)
            For:     Use:
    & &amp;
    < &lt;
    > &gt;
    [ &#91;
    ] &#93;
  • Link using PerlMonks shortcuts! What shortcuts can I use for linking?
  • See Writeup Formatting Tips and other pages linked from there for more info.