in reply to Re^5: Using kernel-space threads with Perl (order)
in thread Using kernel-space threads with Perl
Having the parent read in the data and hand off each piece to the appropriate thread(s) (I'm guessing via Thread::Queue might be a good way) is the most general method that springs to my mind.
Something like this maybe?:
#! perl -slw use strict; use threads; use threads::shared; use Thread::Queue; my $sem : shared; sub worker { my $Q = shift; while( my $line = $Q->dequeue ) { $line =~ s[a][A]g; lock $sem; print $line; } } our $T //= 4; my $n = $T -1; my @Qs = map Thread::Queue->new, 0 .. $n; my @threads = map{ threads->create( \&worker, $Qs[ $_ ] ) } 0 .. $n; my $i = 0; while( <> ) { $Qs[ $i ]->enqueue( $_ ); $i = ( $i + 1 ) % $T; } $Qs[ $_ ]->enqueue( undef ) for 0 .. $n; $_->join for @threads; __END__ junk71 3.5GB >nul
Problem: That will take ~4 hours to process a 3.5 GB file. And that's with the output redirected to nul so there is no competition for the disk head.
I'd probably do something similar except using processes and simple pipes, as I've often done.
So something like this, but using processes instead of threads perhaps?:
package IOFan; use strict; use feature qw[ state ]; use warnings; use threads; use threads::shared; require Exporter; our @ISA = 'Exporter'; our @EXPORT = 'fan'; sub tprint { my $tid = threads->tid; state $sem :shared; lock $sem; print "$tid: ",@_, "\n"; } sub twarn { my $tid = threads->tid; state $sem :shared; lock $sem; print STDERR "$tid: ",@_, "\n"; } sub worker { twarn 'Started'; sleep 1; my( $in, $out, $code ) = @_; while( <$in> ) { $code->(); print $out $_; } close $in; close $out; twarn 'ended'; } use constant { RDR =>0, WTR => 1 }; sub fan (&@) { my( $code, $nThreads, $in, $out ) = @_; my $n = $nThreads -1; $out = \*STDOUT unless defined $out; $in = \*STDIN unless defined $in; my( @fanout, @fanin ); pipe $fanout[ $_ ][RDR], $fanout[ $_ ][WTR] or die $! for 0 .. $n; do{ my $std = select $_->[WTR]; $|++; select $std } for @fanout; pipe $fanin[ $_ ][RDR], $fanin[ $_ ][WTR] or die $! for 0 .. $n; do{ my $std = select $_->[WTR]; $|++; select $std } for @fanin; my @threads = map{ threads->create( \&worker, $fanout[ $_ ][RDR], $fanin[ $_ ][WTR], $code ) } 0 .. $n; close $fanout[ $_ ][RDR] for 0 .. $n; close $fanin[ $_ ][WTR] for 0 .. $n; async { twarn 'reader started'; my $i = 0; while( <$in> ) { print { $fanout[ $i = ++$i%$nThreads ][WTR] } $_; } sleep 1; close $fanout[ $_ ][WTR] for 0 .. $n; twarn 'reader done'; }->detach; twarn 'writer started'; my $i = 0; while( readline( $fanin[ $i = ++$i % $nThreads ][RDR] ) ) { print $out $_; } close $fanin[ $_ ][RDR] for 0 .. $n; twarn 'writer done'; } 1; ------------ #! perl -sw use strict; use IOFan; fan{ s[a][A]g; } $ARGV[0];
This fares better and only takes ~15 minutes to process the 3.5GB.
But all that effort is for naught as this:
C:\test>perl -pe"s[a][A]" phrases.txt >nul
processes the same 3.5GB in exactly the same way, but in less than 2 minutes.
Now the "processing" in all these examples is pretty light, just a single pass of each record, but it serves to highlight the scale of the overhead involved in distributing the records. And the scale that the processing would need to involve to make either of these distribution mechanisms viable.
The pipes mechanism is more efficient than the shared queues, and should be pretty much the same between processes as it is between threads, so I doubt there is much to be gained by going that route.
Maybe you have some mechanism in mind that will radically alter the overheads equation, but there is an awful lot of in-the-mind's-eye-expertise around here, and having spent a long time trying to find a good way to do this very thing, I'd really like to be educated by someone who's actually made it work.
|
|---|
| Replies are listed 'Best First'. | |
|---|---|
|
Re^7: Using kernel-space threads with Perl (disorder)
by tye (Sage) on Mar 22, 2011 at 18:26 UTC | |
by BrowserUk (Patriarch) on Mar 23, 2011 at 04:06 UTC | |
by tye (Sage) on Mar 23, 2011 at 16:34 UTC | |
by BrowserUk (Patriarch) on Mar 23, 2011 at 18:07 UTC | |
by tye (Sage) on Mar 23, 2011 at 23:15 UTC | |
|