So, can you explain - how code in block "while( !eof FILE ) {" interacts with code in thread sub in just the right way?
Just step through the main line code and see what happens.
Creates a new queue.
Spawns some threads, passing the handle to the queue
The threads enter a loop and try to get a line from the queue. There's nothing in the queue yet, so they block.
We haven't read all the file yet, so
We haven't put anything in the queue, so pending() returns false (0).
for( 1 .. NTHREADS ) { $Q->enqueue( scalar <FILE> ); lock $pos; $pos = tell FILE; }
Push 1 line onto the queue for each thread, updating $pos.
We still haven't read the whole file, so...
The are now 30 lines in the queue, so we sleep 0.001.
And we'll keep sleeping until the last thread has pulled the last line from the queue and pending returns zero.
And that'll keep repeating until we've read and queued all the lines.
Then we queue enough undef's to terminate the loops in all the threads. All that's left to do is:
Wait for all the threads to terminate. and we're done.
All the interaction is controlled entirely by the Thread::Queue module. And that's a well-tested core module, so we needn't concern ourselves with the details.
The only other cross-thread interaction is the value of $pos. And that is usually inaccurate, because the value it contains will reflect the file position at the point the threads access it, which will mostly be entirely different to its value at the time the line that thread is processing was read. Because it spent some time sitting in the queue.
I assumed that this was only in your sample code as a mechanism of tracking progress. As such, it served the purpose of demonstrating that DIY locking often doesn't achieve the goal you set out to achieve.
If it is important for the threads to know the file position associated with each line, then you should pass the value with the line via the queue. Eg.
#! perl -slw use strict; use threads; use Thread::Queue; use Time::HiRes qw[ sleep ]; use constant NTHREADS => 30; open FILE, '<', $ARGV[ 0 ] or die $!; my $size = -s FILE; sub thread { my $Q = shift; my $tid = threads->tid; while( my( $pos, $line ) = split $;, $Q->dequeue ) { printf "%3d: (%10d, %10d) :%s", $tid, $pos, $size, $line; sleep rand .5; } } my $Q = Thread::Queue->new; my @threads = map threads->create( \&thread, $Q ), 1 .. NTHREADS; while( !eof FILE ) { sleep 0.001 while $Q->pending; $Q->enqueue( join $;, tell( FILE ), scalar <FILE> ) for 1 .. NTHRE +ADS } $Q->enqueue( (undef) x NTHREADS ); $_->join for @threads;
If you run that, you'll see that the pos reflects the true position within the file from which the line was read. Note that the position is read before the line. Note also that $pos is no longer shared, the need for locking it goes away, and so the code runs far more efficiently. The code got both simpler and more efficient.
In reply to Re^5: Perl && threads && queues - how to make all this work together
by BrowserUk
in thread Perl && threads && queues - how to make all this work together
by xaero123
| For: | Use: | ||
| & | & | ||
| < | < | ||
| > | > | ||
| [ | [ | ||
| ] | ] |