package IOFan; use strict; use feature qw[ state ]; use warnings; use threads; use threads::shared; require Exporter; our @ISA = 'Exporter'; our @EXPORT = 'fan'; sub tprint { my $tid = threads->tid; state $sem :shared; lock $sem; print "$tid: ",@_, "\n"; } sub twarn { my $tid = threads->tid; state $sem :shared; lock $sem; print STDERR "$tid: ",@_, "\n"; } sub worker { twarn 'Started'; sleep 1; my( $in, $out, $code ) = @_; while( <$in> ) { $code->(); print $out $_; } close $in; close $out; twarn 'ended'; } use constant { RDR =>0, WTR => 1 }; sub fan (&@) { my( $code, $nThreads, $in, $out ) = @_; my $n = $nThreads -1; $out = \*STDOUT unless defined $out; $in = \*STDIN unless defined $in; my( @fanout, @fanin ); pipe $fanout[ $_ ][RDR], $fanout[ $_ ][WTR] or die $! for 0 .. $n; do{ my $std = select $_->[WTR]; $|++; select $std } for @fanout; pipe $fanin[ $_ ][RDR], $fanin[ $_ ][WTR] or die $! for 0 .. $n; do{ my $std = select $_->[WTR]; $|++; select $std } for @fanin; my @threads = map{ threads->create( \&worker, $fanout[ $_ ][RDR], $fanin[ $_ ][WTR], $code ) } 0 .. $n; close $fanout[ $_ ][RDR] for 0 .. $n; close $fanin[ $_ ][WTR] for 0 .. $n; async { twarn 'reader started'; my $i = 0; while( <$in> ) { print { $fanout[ $i = ++$i%$nThreads ][WTR] } $_; } sleep 1; close $fanout[ $_ ][WTR] for 0 .. $n; twarn 'reader done'; }->detach; twarn 'writer started'; my $i = 0; while( readline( $fanin[ $i = ++$i % $nThreads ][RDR] ) ) { print $out $_; } close $fanin[ $_ ][RDR] for 0 .. $n; twarn 'writer done'; } 1; ------------ #! perl -sw use strict; use IOFan; fan{ s[a][A]g; } $ARGV[0];