eternaleye has asked for the wisdom of the Perl Monks concerning the following question:

I recently decided that it would be useful if one was able to treat the standard input, output, and error streams of commandline programs as edges on a digraph. I have written a short program which accomplishes this, but am running into persistent problems. Namely, regardless of what method I use to try and manage the streams of the child processes, they die immediately after spawning. I have attempted to use the following combinations of modules:

EV (for IO watchers) and Forks::Super::open3
AnyEvent and Forks::Super::open3
AnyEvent and IPC::Open3
Parallel::Jobs

So far I'm liking the syntax of Parallel::Jobs the most, but it still has the same problem of processes dying immediately. Well, enough talking.

EDIT: I forgot to mention that I straced it, it seems to be getting -ESPIPE (invalid seek) on an llseek.

Here's the output I'm getting:
Starting ID 1, command echo foobar Error starting ID 1: Illegal seek at ./graphpipe.pl line 47. PID: 6461 Starting ID 2, command sed -e s/foo/bar/ Error starting ID 2: Illegal seek at ./graphpipe.pl line 47. PID: 6462 Starting ID 3, command sed -e s/bar/foo/ Error starting ID 3: Illegal seek at ./graphpipe.pl line 47. PID: 6463 Starting ID 4, command tee /home/alex/allbar Error starting ID 4: Illegal seek at ./graphpipe.pl line 47. PID: 6464 Starting ID 5, command tee /home/alex/allfoo Error starting ID 5: Illegal seek at ./graphpipe.pl line 47. PID: 6465 Exited: ID = 1, command = echo foobar Reading STDOUT of 1 Writing data to STDIN of 2 Use of uninitialized value within %child_inputs in ref-to-glob cast at + ./graphpipe.pl line 82. Can't use string ("") as a symbol ref while "strict refs" in use at ./ +graphpipe.pl line 82.
Here's the JSON I'm using for input:
{ "Edges" : [ { "in" : [ 1, "STDOUT" ], "out" : 2 }, { "in" : [ 1, "STDOUT" ], "out" : 3 }, { "in" : [ 2, "STDOUT" ], "out" : 4 }, { "in" : [ 3, "STDOUT" ], "out" : 5 } ], "Nodes" : [ { "command" : [ "echo", "foobar" ], "id" : 1 }, { "command" : [ "sed", "-e", "s/foo/bar/" ], "id" : 2 }, { "command" : [ "sed", "-e", "s/bar/foo/" ], "id" : 3 }, { "command" : [ "tee", "/home/alex/allbar" ], "id" : 4 }, { "command" : [ "tee", "/home/alex/allfoo" ], "id" : 5 } ] }
And here's my script:
EDIT: Made changes according to jwkrahn's feedback (Nov 30 @ 11:40 PST8PDT)
#!/usr/bin/env perl use strict; use warnings; use feature qw/:5.10/; use IO::Handle; use JSON; use Parallel::Jobs qw/start_job watch_jobs/; my $graph_filename = shift( @ARGV ); open( my $graph_file, '<', $graph_filename ) or die "Could not open JSON graph description $graph_filename!"; my $graph_data = decode_json( join( '', <$graph_file> ) ); close( $graph_file ); my %edges; my %pids_to_ids; my %ids_to_commands; my %child_inputs; my %child_outputs; my %child_errors; my %used; for ( @{ $graph_data->{Edges} } ) { # it's $edges{readfrom}{streamtype} = [ writeto1, writeto2, ... ] push( @{ $edges{ $_->{in}[0] }{ $_->{in}[1] } }, $_->{out} ); # Make sure we aren't trying to cram two streams into one STDIN if ( ++$used{ $_->{out} } > 1 ) { die 'Only one input per node!'; } } for ( @{ $graph_data->{Nodes} } ) { say "Starting ID $_->{id}, command @{$_->{command}}"; $ids_to_commands{$_->{id}} = $_->{command}; my $pid; eval { $pid = start_job( { stdin_handle => $child_inputs{$_->{id}}, stdout_capture => 1, stderr_capture => 1 }, @{ $_->{command} } ); die $! if $!; }; if ( $@ ) { # $@ already has a newline print "Error starting ID $_->{id}: $@"; } say "PID: $pid"; $pids_to_ids{$pid} = $_->{id}; } # Close all dangling STDINs so that the relevant processes get EOF # instead of hanging forever for ( grep { ! $used{$_} } keys( %child_inputs ) ) { close( $child_inputs{$_} ); } # While there are still events while ( my ($pid, $event, $data) = Parallel::Jobs::watch_jobs() ) { if ( $event eq 'EXIT' ) { say( "Exited: ID = $pids_to_ids{$pid},", " command = @{ $ids_to_commands{$pids_to_ids{$pid}} }" ); } else { handle_event( $pids_to_ids{$pid}, $event, $data ); } } exit( 0 ); sub handle_event { my ( $id, $event, $data ) = @_; say "Reading $event of $id"; if ( @{ $edges{$id}->{$event} } ) { for ( @{ $edges{$id}->{$event} } ) { say "Writing data to STDIN of $_"; print { $child_inputs{$_} } $data; } } else { say "No consumers, sending to STDOUT."; print "$event: $data"; } return; }

Replies are listed 'Best First'.
Re: Difficulty handling std{in,out,err} of child processes
by jwkrahn (Abbot) on Dec 01, 2010 at 07:02 UTC
    my %used; for ( @{ $graph_data->{Edges} } ) { $edges{$_->{in}->[0]} //= {}; $edges{$_->{in}->[0]}->{$_->{in}->[1]} //= []; push( @{ $edges{$_->{in}->[0]}->{$_->{in}->[1]} }, $_->{out} ); $used{$_->{out}} //= 0; $used{$_->{out}}++; if ( grep { $used{$_} > 1 } keys( %used ) ) { die( 'Only one input per node!' ); } }

    Perl has a thing called autovivification, which means that that could be simplified to:

    my %used; for ( @{ $graph_data->{Edges} } ) { push @{ $edges{ $_->{in}[0] }{ $_->{in}[1] } }, $_->{out}; if ( ++$used{ $_->{out} } > 1 ) { die 'Only one input per node!'; } }


    eval { ... }; if ( $! ) { say "Error starting ID $_->{id}: $!"; }

    The $! variable does nothing useful there.    You need to use the $@ variable instead.

      re autovivification, I tried first without the //= lines, but push kept spitting out an error about the first argument needing to be an array or array reference and not a hash element.

      Also, the $! _is_ giving the 'invalid seek' message, and I just tried with $@ and it merely said:
      Error starting ID 5:
      ie, without any error message at all. (If I change the if as well, the line is missing completely.)

        If  start_job() is setting $! when it fails then you need to die at that point with $! in the error message and that will show up in $@ at the end of eval.

        Hm, with your suggestion on the %used section it's working. Not sure why I was getting the error I was.