Starting ID 1, command echo foobar Error starting ID 1: Illegal seek at ./graphpipe.pl line 47. PID: 6461 Starting ID 2, command sed -e s/foo/bar/ Error starting ID 2: Illegal seek at ./graphpipe.pl line 47. PID: 6462 Starting ID 3, command sed -e s/bar/foo/ Error starting ID 3: Illegal seek at ./graphpipe.pl line 47. PID: 6463 Starting ID 4, command tee /home/alex/allbar Error starting ID 4: Illegal seek at ./graphpipe.pl line 47. PID: 6464 Starting ID 5, command tee /home/alex/allfoo Error starting ID 5: Illegal seek at ./graphpipe.pl line 47. PID: 6465 Exited: ID = 1, command = echo foobar Reading STDOUT of 1 Writing data to STDIN of 2 Use of uninitialized value within %child_inputs in ref-to-glob cast at ./graphpipe.pl line 82. Can't use string ("") as a symbol ref while "strict refs" in use at ./graphpipe.pl line 82. #### { "Edges" : [ { "in" : [ 1, "STDOUT" ], "out" : 2 }, { "in" : [ 1, "STDOUT" ], "out" : 3 }, { "in" : [ 2, "STDOUT" ], "out" : 4 }, { "in" : [ 3, "STDOUT" ], "out" : 5 } ], "Nodes" : [ { "command" : [ "echo", "foobar" ], "id" : 1 }, { "command" : [ "sed", "-e", "s/foo/bar/" ], "id" : 2 }, { "command" : [ "sed", "-e", "s/bar/foo/" ], "id" : 3 }, { "command" : [ "tee", "/home/alex/allbar" ], "id" : 4 }, { "command" : [ "tee", "/home/alex/allfoo" ], "id" : 5 } ] } #### #!/usr/bin/env perl use strict; use warnings; use feature qw/:5.10/; use IO::Handle; use JSON; use Parallel::Jobs qw/start_job watch_jobs/; my $graph_filename = shift( @ARGV ); open( my $graph_file, '<', $graph_filename ) or die "Could not open JSON graph description $graph_filename!"; my $graph_data = decode_json( join( '', <$graph_file> ) ); close( $graph_file ); my %edges; my %pids_to_ids; my %ids_to_commands; my %child_inputs; my %child_outputs; my %child_errors; my %used; for ( @{ $graph_data->{Edges} } ) { # it's $edges{readfrom}{streamtype} = [ writeto1, writeto2, ... ] push( @{ $edges{ $_->{in}[0] }{ $_->{in}[1] } }, $_->{out} ); # Make sure we aren't trying to cram two streams into one STDIN if ( ++$used{ $_->{out} } > 1 ) { die 'Only one input per node!'; } } for ( @{ $graph_data->{Nodes} } ) { say "Starting ID $_->{id}, command @{$_->{command}}"; $ids_to_commands{$_->{id}} = $_->{command}; my $pid; eval { $pid = start_job( { stdin_handle => $child_inputs{$_->{id}}, stdout_capture => 1, stderr_capture => 1 }, @{ $_->{command} } ); die $! if $!; }; if ( $@ ) { # $@ already has a newline print "Error starting ID $_->{id}: $@"; } say "PID: $pid"; $pids_to_ids{$pid} = $_->{id}; } # Close all dangling STDINs so that the relevant processes get EOF # instead of hanging forever for ( grep { ! $used{$_} } keys( %child_inputs ) ) { close( $child_inputs{$_} ); } # While there are still events while ( my ($pid, $event, $data) = Parallel::Jobs::watch_jobs() ) { if ( $event eq 'EXIT' ) { say( "Exited: ID = $pids_to_ids{$pid},", " command = @{ $ids_to_commands{$pids_to_ids{$pid}} }" ); } else { handle_event( $pids_to_ids{$pid}, $event, $data ); } } exit( 0 ); sub handle_event { my ( $id, $event, $data ) = @_; say "Reading $event of $id"; if ( @{ $edges{$id}->{$event} } ) { for ( @{ $edges{$id}->{$event} } ) { say "Writing data to STDIN of $_"; print { $child_inputs{$_} } $data; } } else { say "No consumers, sending to STDOUT."; print "$event: $data"; } return; }