in reply to Non-blocking Reads from Pipe Filehandle

I'm trying to do this as simply as possible, ... I'd like to avoid having to use ..., threads, ...

Shame, because that's by far the easiest way to do it. The structure of your main loop is almost exactly as you envisioned it and it just works:

#! perl -slw use strict; use threads; use Thread::Queue; sub pipeCommand { my $cmd = shift; my $Q = new Thread::Queue; async{ my $pid = open my $pipe, $cmd or die $!; $Q->enqueue( $_ ) while <$pipe>; $Q->enqueue( undef ); }->detach; return $Q; } my $pipe = pipeCommand( 'perl -le"$|++;print localtime().q[: some text] and sleep 1 for 1 +.. 10" |' ) or die; while( 1 ) { if( $pipe->pending ) { my $line = $pipe->dequeue or last; chomp( $line ); ## Do stuff with $line printf "Got: '%s'\n", $line; } else { ## Do something else print 'Tum te tum'; Win32::Sleep 500; } } __END__ C:\test>621058-t Tum te tum Got: 'Thu Jun 14 01:43:13 2007: some text' Tum te tum Got: 'Thu Jun 14 01:43:14 2007: some text' Tum te tum Tum te tum Got: 'Thu Jun 14 01:43:15 2007: some text' Tum te tum Tum te tum Got: 'Thu Jun 14 01:43:16 2007: some text' Tum te tum Tum te tum Got: 'Thu Jun 14 01:43:17 2007: some text' Tum te tum Tum te tum Got: 'Thu Jun 14 01:43:18 2007: some text' Tum te tum Tum te tum Tum te tum Got: 'Thu Jun 14 01:43:19 2007: some text' Tum te tum Got: 'Thu Jun 14 01:43:20 2007: some text' Tum te tum Tum te tum Got: 'Thu Jun 14 01:43:21 2007: some text' Tum te tum Tum te tum Got: 'Thu Jun 14 01:43:22 2007: some text' Tum te tum Tum te tum

To my knowledge, there is no way to put a win32 anonymous pipe into non-blocking mode. There is an obscure reference that's suggests it might be possible to set the read-end non-blocking using SetNamedPipeHandleState if it has been opened using ReadFile() (See the section on PIPE_NOWAIT and the reference to LAN Manager but I have never made this work, so WaitMultipleObjects() probably isn't going to help.

I did assist demerphq with reference to IO::Pipe a year or so ago, and succeeded in getting PeekNamedPipe to work with that, which allows you to poll the pipe. (See the bit that says the call will also work with anonymous pipes.)

I've tried that with a normal pipe handle, but can't make it work (yet!). The call to PeekNamedPipe() isn't giving any errors, but it's not telling me there is data available either. I seem to remember having this problem last time initially, but I can't remember how I fixed it. I no longer have the code, but I'll message demerphq and hope that he still has it.

But even if it can be made to work, it is hardly less complicated:

#! perl -slw use strict; use Win32API::File qw[ GetOsFHandle ]; use Win32::API::Prototype; ApiLink( 'Kernel32', q[ BOOL PeekNamedPipe( HANDLE hNamedPipe, LPVOID lpBuffer, DWORD nBufferSize, LPDWORD lpBytesRead, DWORD *lpTotalBytesAvail, LPDWORD lpBytesLeftThisMessage ) ] ) or die $^E; my $cmd = 'perl -le"$|++;print localtime().q[: some text] and sleep 1 +for 1..10" |'; my $pid = open my $pipe, $cmd or die $!; warn $pid; my $pHandle = GetOsFHandle( $pipe ); warn $pHandle; while( 1 ) { my $cAvail = 0; if( ( PeekNamedPipe( $pHandle, 0, 0, 0, $cAvail, 0 ) or warn $^E ) + and $cAvail ) { defined( my $line = <$pipe> ) or last; chomp( $line ); ## Do stuff with $line printf "Got: '%s'\n", $line; } else { ## Do something else print 'Tum te tum'; Win32::Sleep 500; } }

So, a couple of choices. One easy that works now. One less easy that only might work. Your choice.

Also, maybe the code I wrote for IO::Pipe made it into the 5.9.x releases. Maybe you could use bleed perl an that?


Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.
"Too many [] have been sedated by an oppressive environment of political correctness and risk aversion."

Replies are listed 'Best First'.
Re^2: Non-blocking Reads from Pipe Filehandle
by ikegami (Patriarch) on Sep 30, 2008 at 23:23 UTC

    Given

    Win32::API->Import( 'mydll', 'BOOL SomeFunction( LPDWORD lpCount )', );

    you'd call the function as follows:

    my $nCount = 5; SomeFunction($nCount);

    Win32::API properly packs nCount into a DWORD and passes a pointer to that packed DWORD to the DLL function.

    But what if lpCount can be NULL? Shouldn't be a problem. Scalars already have a mechanism to identify whether they contain a string or not. Just pass undef, right?

    SomeFunction(undef);

    Wrong! Win32::API does no defined check. It simply does pack('L', undef) and passes a pointer to that.

    That means we need to do our own pointer management. In turn, that means we need to do our own packing and unpacking for pointed data.

    use strict; use warnings; use Win32::API qw( ); use Win32API::File qw( GetOsFHandle INVALID_HANDLE_VALUE ); use Time::HiRes qw( sleep ); use constant ERROR_BROKEN_PIPE => 109; sub get_pv { unpack 'L!', pack 'P', $_[0] } BEGIN { # BOOL WINAPI PeekNamedPipe( # __in HANDLE hNamedPipe, # __out_opt LPVOID lpBuffer, # __in DWORD nBufferSize, # __out_opt LPDWORD lpBytesRead, # __out_opt LPDWORD lpTotalBytesAvail, # __out_opt LPDWORD lpBytesLeftThisMessage # ) my $f = Win32::API->new('kernel32', 'PeekNamedPipe', 'LLLLLL', 'L') or die $^E; sub PeekNamedPipe { my $nBytesRead; my $nTotalBytesAvail; my $nBytesLeftThisMessage; $nBytesRead = pack('L!', $_[3]) if defined $_[3]; $nTotalBytesAvail = pack('L!', $_[4]) if defined $_[4]; $nBytesLeftThisMessage = pack('L!', $_[5]) if defined $_[5]; my $rv = $f->Call( $_[0], get_pv($_[1]), $_[2], get_pv($nBytesRead), get_pv($nTotalBytesAvail), get_pv($nBytesLeftThisMessage), ); $_[3] = unpack('L!', $nBytesRead ) if defined $_[3]; $_[4] = unpack('L!', $nTotalBytesAvail ) if defined $_[4]; $_[5] = unpack('L!', $nBytesLeftThisMessage) if defined $_[5]; return $rv; } } my $cmd = 'perl -le"$|++;print localtime().q[: some text] and sleep 1 +for 1..10" |'; my $pid = open my $pipe, $cmd or die $!; ( my $pHandle = GetOsFHandle( $pipe ) ) != INVALID_HANDLE_VALUE or die $^E; print("Handle: $pHandle\n"); my $buf = ''; for (;;) { my $avail = 0; if ( !PeekNamedPipe( $pHandle, undef, 0, undef, $avail, undef ) ) { last if $^E == ERROR_BROKEN_PIPE; die $^E; } print("Avail: $avail"); print(" (+" . length($buf) . ")") if length($buf); print("\n"); if ( $avail ) { sysread($pipe, $buf, $avail, length($buf) ) or die $!; while ( $buf =~ s/^(.*)\n// ) { my $line = $1; ## Do stuff with $line print( "Got: $line\n" ); } } else { print( "Zzzz\n" ); sleep(0.500); } }

    I had to change two things:

    • I got rid of buffered I/O. <$pipe> and read($pipe, ...) don't mix with select and PeekNamedPipe.
    • PeekNamedPipe was noticing the EOF condition first, so I moved the loop exit there.
    Handle: 1980 Avail: 0 Zzzz Avail: 37 Got: Tue Sep 30 19:23:32 2008: some text Avail: 0 Zzzz Avail: 0 Zzzz Avail: 37 Got: Tue Sep 30 19:23:33 2008: some text Avail: 0 Zzzz Avail: 0 Zzzz Avail: 37 Got: Tue Sep 30 19:23:34 2008: some text Avail: 0 Zzzz Avail: 0 Zzzz Avail: 37 Got: Tue Sep 30 19:23:35 2008: some text Avail: 0 Zzzz Avail: 0 Zzzz Avail: 37 Got: Tue Sep 30 19:23:36 2008: some text Avail: 0 Zzzz Avail: 0 Zzzz Avail: 37 Got: Tue Sep 30 19:23:37 2008: some text Avail: 0 Zzzz Avail: 0 Zzzz Avail: 37 Got: Tue Sep 30 19:23:38 2008: some text Avail: 0 Zzzz Avail: 0 Zzzz Avail: 37 Got: Tue Sep 30 19:23:39 2008: some text Avail: 0 Zzzz Avail: 0 Zzzz Avail: 37 Got: Tue Sep 30 19:23:40 2008: some text Avail: 0 Zzzz Avail: 0 Zzzz Avail: 37 Got: Tue Sep 30 19:23:41 2008: some text Avail: 0 Zzzz Avail: 0 Zzzz
      Wrong! Win32::API does no defined check.

      Hm, Maybe you're right, but it doesn't seem to be necessary. With your insight about buffering & EOF and a little wrapping, this seems to work quite nicely:

      #! perl -slw use strict; use Win32API::File qw[ GetOsFHandle ]; use Win32::API::Prototype; ApiLink( 'Kernel32', q[ BOOL PeekNamedPipe( HANDLE hNamedPipe, LPVOID lpBuffer, DWORD nBufferSize, LPDWORD lpBytesRead, DWORD *lpTotalBytesAvail, LPDWORD lpBytesLeftThisMessage ) ] ) or die $^E; sub readlineMaybe { my $fh = shift; my $osfh = GetOsFHandle( $fh ) or die $^E; my( $bufsize, $buffer, $cAvail, $read ) = ( 1024, chr(0)x1024, 0, +0 ); PeekNamedPipe( $osfh, $buffer, $bufsize, $read, $cAvail, 0 ) or $^E == 109 or die $^E; return if $^E == 109; my $eolPos = 1+index $buffer, $/; return '' unless $eolPos; sysread( $fh, $buffer, $eolPos ) or die $!; return $buffer; } my $cmd = 'perl -le"$|++;print localtime().q[: some text] and sleep 1 for 1..1 +0" |'; my $pid = open my $pipe, $cmd or die $!; while( defined( my $line = readlineMaybe( $pipe ) ) ) { Win32::Sleep( 100 ) and next unless $line; chomp $line; chop $line; ## Annoying! print "Got: '$line'"; } __END__ c:\test>buk-pipe.pl Got: 'Wed Oct 1 02:01:21 2008: some text' Got: 'Wed Oct 1 02:01:22 2008: some text' Got: 'Wed Oct 1 02:01:23 2008: some text' Got: 'Wed Oct 1 02:01:24 2008: some text' Got: 'Wed Oct 1 02:01:25 2008: some text' Got: 'Wed Oct 1 02:01:26 2008: some text' Got: 'Wed Oct 1 02:01:27 2008: some text' Got: 'Wed Oct 1 02:01:28 2008: some text' Got: 'Wed Oct 1 02:01:29 2008: some text' Got: 'Wed Oct 1 02:01:30 2008: some text'

      The only annoying thing is the need for that chop in addition to chomp.


      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.

        Hm, Maybe you're right, but it doesn't seem to be necessary

        It stopped your program from working for over a year. It's forcing you to have to OS do unnecessary and possible unwanted work. It's definitely a necessity even though though it could be fudged this time.

Re^2: Non-blocking Reads from Pipe Filehandle
by cbudin (Initiate) on Jun 15, 2007 at 02:01 UTC
    Thank you for your insightful and comprehensive reply! I've tried out your test script on my system and it works great, and I think that I will try to incorporate the concepts that it illustrates. I'm not familiar with the threads package, but I will read up on it.

    One question, just for clarification: the $pipe variable declared in the pipeCommand() subroutine is different from the $pipe variable created in the main body of the code, correct?

    You have convinced me that threads is the way to go in this situation.

    cbudin

      Thank you for your insightful and comprehensive reply! .... I'm not familiar with the threads package, but I will read up on it.

      Your welcome.

      Be careful what you read, look carefully at any dates and/or version numbers. threads had a pretty traumatic birth and was for a long time unstable and definitely not for use in production environments. Anything you read relating to threads prior to perl 5.8.4 will probably reflect that.

      Actually, for this kind of do-this-blocking-operation-but-don't-make-me-wait, they worked pretty well even before then. Many of the bugs in 5.8.2 & 5.8.3 seemed to be as much to do with closures as it do with threading--and closures can still cause the occasional problem in non-threaded code--but threadind and shared variable cloning conspired to exaserbate those problems.

      In my opinion, they have been stable enough for use since 5.8.4. Of course, no one is guarenteeing them totally bug free, but then bugs occasionally turn up in the regex engine, and Math::Big* packages, and perlIO and almost any other area of perl you care to name. Few people would suggest that the 1000's of pieces of code using those facilities should be taken out of production, pending guarentees about the bug-free nature.

      the $pipe variable declared in the pipeCommand() subroutine is different from the $pipe variable created in the main body of the code,

      Physically, yes.

      Different scopes (the mys), different types--one is a filehandle, the other actually a blessed reference to a Thread::Queue object--and they actually exist in different threads.

      Conceptually, maybe not.

      At the inner level, the filehandle $pipe, is the source of input from the anonymous pipe. Ie. the output from the command.

      At the outer level, the queue object $pipe, is the source of input from the anonymous pipe. Ie. the output from the command.

      Under the scrutiny of your peer review would I change one of them? Yes, probably. The inner would probably be changed to $pipeFH.

      But then, the odds are that you prefer lower_with_underscore indentifiers to CamelCase, so you'll probably change all the names anyway.

      You might also feel that the name of the subroutine should reflect the nature of what's inside it more. Maybe something like pipe_asych_command_via_shared_queue().

      Or maybe, you feel that the mechanisms don't matter at the outer level and should be encapsulated. Hence a better names might be

      my $cmdOutput = openCommandAsInput( ) ... if( $cmdOutput->pending ) { my $line = $cmdPutput->dequeue() or last; ...

      Then again, the existing method names tend to reflect the queue-ish nature of the ITC mechanism, and at the level of use, they could be a distraction. Hence, for your application it might be better to wrap the whole thing into an object and alias the method names with more familiar concepts

      my $cmd_fh = open_pipe_command( $cmd ); ... if( $cmd_fh->can_read ) { my $line = $cmd_fh->readline or last; ...

      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.