So, can you explain - how code in block "while( !eof FILE ) {" interacts with code in thread sub in just the right way?
Just step through the main line code and see what happens.
- my $Q = Thread::Queue->new;
Creates a new queue.
- my @threads = map threads->create( \&thread, $Q ), 1 .. NTHREADS;
Spawns some threads, passing the handle to the queue
- while( my $line = $Q->dequeue ) {
The threads enter a loop and try to get a line from the queue. There's nothing in the queue yet, so they block.
- while( !eof FILE ) {
We haven't read all the file yet, so
- sleep 0.001 while $Q->pending;
We haven't put anything in the queue, so pending() returns false (0).
for( 1 .. NTHREADS ) {
$Q->enqueue( scalar <FILE> );
lock $pos;
$pos = tell FILE;
}
Push 1 line onto the queue for each thread, updating $pos.
- while( !eof FILE ) {
We still haven't read the whole file, so...
- sleep 0.001 while $Q->pending;
The are now 30 lines in the queue, so we sleep 0.001.
And we'll keep sleeping until the last thread has pulled the last line from the queue and pending returns zero.
And that'll keep repeating until we've read and queued all the lines.
- $Q->enqueue( (undef) x NTHREADS );
Then we queue enough undef's to terminate the loops in all the threads. All that's left to do is:
- $_->join for @threads;
Wait for all the threads to terminate. and we're done.
All the interaction is controlled entirely by the Thread::Queue module. And that's a well-tested core module, so we needn't concern ourselves with the details.
The only other cross-thread interaction is the value of $pos. And that is usually inaccurate, because the value it contains will reflect the file position at the point the threads access it, which will mostly be entirely different to its value at the time the line that thread is processing was read. Because it spent some time sitting in the queue.
I assumed that this was only in your sample code as a mechanism of tracking progress. As such, it served the purpose of demonstrating that DIY locking often doesn't achieve the goal you set out to achieve.
If it is important for the threads to know the file position associated with each line, then you should pass the value with the line via the queue. Eg.
#! perl -slw
use strict;
use threads;
use Thread::Queue;
use Time::HiRes qw[ sleep ];
use constant NTHREADS => 30;
open FILE, '<', $ARGV[ 0 ] or die $!;
my $size = -s FILE;
sub thread {
my $Q = shift;
my $tid = threads->tid;
while( my( $pos, $line ) = split $;, $Q->dequeue ) {
printf "%3d: (%10d, %10d) :%s", $tid, $pos, $size, $line;
sleep rand .5;
}
}
my $Q = Thread::Queue->new;
my @threads = map threads->create( \&thread, $Q ), 1 .. NTHREADS;
while( !eof FILE ) {
sleep 0.001 while $Q->pending;
$Q->enqueue( join $;, tell( FILE ), scalar <FILE> ) for 1 .. NTHRE
+ADS
}
$Q->enqueue( (undef) x NTHREADS );
$_->join for @threads;
If you run that, you'll see that the pos reflects the true position within the file from which the line was read. Note that the position is read before the line. Note also that $pos is no longer shared, the need for locking it goes away, and so the code runs far more efficiently. The code got both simpler and more efficient.
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.
| [reply] [d/l] [select] |
sleep 0.001 while $Q->pending;
== As long as there is something in the queue, do nothing
for( 1 .. NTHREADS ) {
$Q->enqueue( scalar <FILE> );
...
}
== read exactly NTHREADS (i.e. 30) lines from the file and put them in the queue.
And then we are back to doing nothing as long as there is something in the queue
As you can see the call to 'pending' stops the loop from reading more than the NTHREADS lines it is allowed after the queue was empty
The join at the end is just to wait for the threads to exit after they have been signaled to exit. Before that the 'undef's pushed on the queue make sure that all the threads get such a signal. By putting NTHREADS of 'undef's into the queue it is made sure that every threads gets its STOP signal
| [reply] [d/l] [select] |