#! perl -slw use strict; use threads; use threads::shared; use Thread::Queue; my $sem : shared; sub worker { my $Q = shift; while( my $line = $Q->dequeue ) { $line =~ s[a][A]g; lock $sem; print $line; } } our $T //= 4; my $n = $T -1; my @Qs = map Thread::Queue->new, 0 .. $n; my @threads = map{ threads->create( \&worker, $Qs[ $_ ] ) } 0 .. $n; my $i = 0; while( <> ) { $Qs[ $i ]->enqueue( $_ ); $i = ( $i + 1 ) % $T; } $Qs[ $_ ]->enqueue( undef ) for 0 .. $n; $_->join for @threads; __END__ junk71 3.5GB >nul #### package IOFan; use strict; use feature qw[ state ]; use warnings; use threads; use threads::shared; require Exporter; our @ISA = 'Exporter'; our @EXPORT = 'fan'; sub tprint { my $tid = threads->tid; state $sem :shared; lock $sem; print "$tid: ",@_, "\n"; } sub twarn { my $tid = threads->tid; state $sem :shared; lock $sem; print STDERR "$tid: ",@_, "\n"; } sub worker { twarn 'Started'; sleep 1; my( $in, $out, $code ) = @_; while( <$in> ) { $code->(); print $out $_; } close $in; close $out; twarn 'ended'; } use constant { RDR =>0, WTR => 1 }; sub fan (&@) { my( $code, $nThreads, $in, $out ) = @_; my $n = $nThreads -1; $out = \*STDOUT unless defined $out; $in = \*STDIN unless defined $in; my( @fanout, @fanin ); pipe $fanout[ $_ ][RDR], $fanout[ $_ ][WTR] or die $! for 0 .. $n; do{ my $std = select $_->[WTR]; $|++; select $std } for @fanout; pipe $fanin[ $_ ][RDR], $fanin[ $_ ][WTR] or die $! for 0 .. $n; do{ my $std = select $_->[WTR]; $|++; select $std } for @fanin; my @threads = map{ threads->create( \&worker, $fanout[ $_ ][RDR], $fanin[ $_ ][WTR], $code ) } 0 .. $n; close $fanout[ $_ ][RDR] for 0 .. $n; close $fanin[ $_ ][WTR] for 0 .. $n; async { twarn 'reader started'; my $i = 0; while( <$in> ) { print { $fanout[ $i = ++$i%$nThreads ][WTR] } $_; } sleep 1; close $fanout[ $_ ][WTR] for 0 .. $n; twarn 'reader done'; }->detach; twarn 'writer started'; my $i = 0; while( readline( $fanin[ $i = ++$i % $nThreads ][RDR] ) ) { print $out $_; } close $fanin[ $_ ][RDR] for 0 .. $n; twarn 'writer done'; } 1; ------------ #! perl -sw use strict; use IOFan; fan{ s[a][A]g; } $ARGV[0]; #### C:\test>perl -pe"s[a][A]" phrases.txt >nul