+- read side of pipe
/
---------- ----------
STDIN --> | reader | -pipe-> | writer | --> STDOUT
---------- ----------
\
+- write side of pipe
####
use strict;
use warnings;
use IO::Pipely qw(pipely);
use POE qw(Wheel::ReadWrite);
my ($read_pipe, $write_pipe) = pipely();
# reader sends output to the write pipe
my $reader = POE::Session->create(
args => [ $write_pipe ],
inline_states => {
_start => sub {
$_[HEAP]->{wheel} = POE::Wheel::ReadWrite->new(
InputHandle => \*STDIN,
OutputHandle => $_[ARG0],
InputEvent => 'got_input',
ErrorEvent => 'got_input_error',
);
},
got_input => \&handle_input,
got_input_error => \&handle_input_error,
process_next_input => \&process_next_input,
}
);
# writer reads input from the read pipe
my $writer = POE::Session->create(
args => [$read_pipe],
inline_states => {
_start => sub {
$_[HEAP]->{wheel} = POE::Wheel::ReadWrite->new(
InputHandle => $_[ARG0],
OutputHandle => \*STDOUT,
InputEvent => 'got_input',
ErrorEvent => 'got_input_error',
);
},
got_input => \&handle_input,
got_input_error => \&handle_input_error,
process_next_input => \&process_next_input,
}
);
POE::Kernel->run();
exit;
sub handle_input
{
my ( $kernel, $heap, $input ) = @_[KERNEL, HEAP, ARG0];
print $heap->{wheel}->put( $input );
$heap->{wheel}->flush();
$heap->{wheel}->pause_input();
$kernel->yield( 'process_next_input' );
}
sub handle_input_error
{
my ($kernel, $heap) = @_[KERNEL, HEAP];
$kernel->yield( 'process_next_input' );
delete $heap->{wheel};
}
sub process_next_input
{
my ($kernel, $heap) = @_[KERNEL, HEAP];
$heap->{wheel}->resume_input() if $heap->{wheel};
}
####
ombibulous> perl cat.pl < test.csv
00001,2,3,4
0a,b,c,d
0foo,bar,baz
^C
ombibulous>