#! perl -slw
use strict;
use threads;
use threads::shared;
use Thread::Queue;
my $sem : shared;
sub worker {
my $Q = shift;
while( my $line = $Q->dequeue ) {
$line =~ s[a][A]g;
lock $sem;
print $line;
}
}
our $T //= 4;
my $n = $T -1;
my @Qs = map Thread::Queue->new, 0 .. $n;
my @threads = map{
threads->create( \&worker, $Qs[ $_ ] )
} 0 .. $n;
my $i = 0;
while( <> ) {
$Qs[ $i ]->enqueue( $_ );
$i = ( $i + 1 ) % $T;
}
$Qs[ $_ ]->enqueue( undef ) for 0 .. $n;
$_->join for @threads;
__END__
junk71 3.5GB >nul
####
package IOFan;
use strict;
use feature qw[ state ];
use warnings;
use threads;
use threads::shared;
require Exporter;
our @ISA = 'Exporter';
our @EXPORT = 'fan';
sub tprint {
my $tid = threads->tid;
state $sem :shared;
lock $sem;
print "$tid: ",@_, "\n";
}
sub twarn {
my $tid = threads->tid;
state $sem :shared;
lock $sem;
print STDERR "$tid: ",@_, "\n";
}
sub worker {
twarn 'Started';
sleep 1;
my( $in, $out, $code ) = @_;
while( <$in> ) {
$code->();
print $out $_;
}
close $in;
close $out;
twarn 'ended';
}
use constant { RDR =>0, WTR => 1 };
sub fan (&@) {
my( $code, $nThreads, $in, $out ) = @_;
my $n = $nThreads -1;
$out = \*STDOUT unless defined $out;
$in = \*STDIN unless defined $in;
my( @fanout, @fanin );
pipe $fanout[ $_ ][RDR], $fanout[ $_ ][WTR] or die $! for 0 .. $n;
do{ my $std = select $_->[WTR]; $|++; select $std } for @fanout;
pipe $fanin[ $_ ][RDR], $fanin[ $_ ][WTR] or die $! for 0 .. $n;
do{ my $std = select $_->[WTR]; $|++; select $std } for @fanin;
my @threads = map{ threads->create(
\&worker, $fanout[ $_ ][RDR], $fanin[ $_ ][WTR], $code
) } 0 .. $n;
close $fanout[ $_ ][RDR] for 0 .. $n;
close $fanin[ $_ ][WTR] for 0 .. $n;
async {
twarn 'reader started';
my $i = 0;
while( <$in> ) {
print { $fanout[ $i = ++$i%$nThreads ][WTR] } $_;
}
sleep 1;
close $fanout[ $_ ][WTR] for 0 .. $n;
twarn 'reader done';
}->detach;
twarn 'writer started';
my $i = 0;
while( readline( $fanin[ $i = ++$i % $nThreads ][RDR] ) ) {
print $out $_;
}
close $fanin[ $_ ][RDR] for 0 .. $n;
twarn 'writer done';
}
1;
------------
#! perl -sw
use strict;
use IOFan;
fan{
s[a][A]g;
} $ARGV[0];
####
C:\test>perl -pe"s[a][A]" phrases.txt >nul