Starting ID 1, command echo foobar
Error starting ID 1: Illegal seek at ./graphpipe.pl line 47.
PID: 6461
Starting ID 2, command sed -e s/foo/bar/
Error starting ID 2: Illegal seek at ./graphpipe.pl line 47.
PID: 6462
Starting ID 3, command sed -e s/bar/foo/
Error starting ID 3: Illegal seek at ./graphpipe.pl line 47.
PID: 6463
Starting ID 4, command tee /home/alex/allbar
Error starting ID 4: Illegal seek at ./graphpipe.pl line 47.
PID: 6464
Starting ID 5, command tee /home/alex/allfoo
Error starting ID 5: Illegal seek at ./graphpipe.pl line 47.
PID: 6465
Exited: ID = 1, command = echo foobar
Reading STDOUT of 1
Writing data to STDIN of 2
Use of uninitialized value within %child_inputs in ref-to-glob cast at ./graphpipe.pl line 82.
Can't use string ("") as a symbol ref while "strict refs" in use at ./graphpipe.pl line 82.
####
{
"Edges" : [
{
"in" : [
1,
"STDOUT"
],
"out" : 2
},
{
"in" : [
1,
"STDOUT"
],
"out" : 3
},
{
"in" : [
2,
"STDOUT"
],
"out" : 4
},
{
"in" : [
3,
"STDOUT"
],
"out" : 5
}
],
"Nodes" : [
{
"command" : [
"echo",
"foobar"
],
"id" : 1
},
{
"command" : [
"sed",
"-e",
"s/foo/bar/"
],
"id" : 2
},
{
"command" : [
"sed",
"-e",
"s/bar/foo/"
],
"id" : 3
},
{
"command" : [
"tee",
"/home/alex/allbar"
],
"id" : 4
},
{
"command" : [
"tee",
"/home/alex/allfoo"
],
"id" : 5
}
]
}
####
#!/usr/bin/env perl
use strict;
use warnings;
use feature qw/:5.10/;
use IO::Handle;
use JSON;
use Parallel::Jobs qw/start_job watch_jobs/;
my $graph_filename = shift( @ARGV );
open( my $graph_file, '<', $graph_filename )
or die "Could not open JSON graph description $graph_filename!";
my $graph_data = decode_json( join( '', <$graph_file> ) );
close( $graph_file );
my %edges;
my %pids_to_ids;
my %ids_to_commands;
my %child_inputs;
my %child_outputs;
my %child_errors;
my %used;
for ( @{ $graph_data->{Edges} } ) {
# it's $edges{readfrom}{streamtype} = [ writeto1, writeto2, ... ]
push( @{ $edges{ $_->{in}[0] }{ $_->{in}[1] } }, $_->{out} );
# Make sure we aren't trying to cram two streams into one STDIN
if ( ++$used{ $_->{out} } > 1 ) {
die 'Only one input per node!';
}
}
for ( @{ $graph_data->{Nodes} } ) {
say "Starting ID $_->{id}, command @{$_->{command}}";
$ids_to_commands{$_->{id}} = $_->{command};
my $pid;
eval {
$pid = start_job(
{
stdin_handle => $child_inputs{$_->{id}},
stdout_capture => 1,
stderr_capture => 1
},
@{ $_->{command} }
);
die $! if $!;
};
if ( $@ ) {
# $@ already has a newline
print "Error starting ID $_->{id}: $@";
}
say "PID: $pid";
$pids_to_ids{$pid} = $_->{id};
}
# Close all dangling STDINs so that the relevant processes get EOF
# instead of hanging forever
for ( grep { ! $used{$_} } keys( %child_inputs ) ) {
close( $child_inputs{$_} );
}
# While there are still events
while ( my ($pid, $event, $data) = Parallel::Jobs::watch_jobs() ) {
if ( $event eq 'EXIT' ) {
say( "Exited: ID = $pids_to_ids{$pid},",
" command = @{ $ids_to_commands{$pids_to_ids{$pid}} }"
);
} else {
handle_event( $pids_to_ids{$pid}, $event, $data );
}
}
exit( 0 );
sub handle_event {
my ( $id, $event, $data ) = @_;
say "Reading $event of $id";
if ( @{ $edges{$id}->{$event} } ) {
for ( @{ $edges{$id}->{$event} } ) {
say "Writing data to STDIN of $_";
print { $child_inputs{$_} } $data;
}
} else {
say "No consumers, sending to STDOUT.";
print "$event: $data";
}
return;
}