in reply to Re^3: Perl && threads && queues - how to make all this work together
in thread Perl && threads && queues - how to make all this work together

jethro - thanks, I've thought about it, but not as deep as needed! :) BrowserUk - I am very pleased that an expert in threading tries to help me! But can you tell more about some lines of your code: At a first look I thought that you code is placing all lines of file in queue in one time. But when I changed printf "%3d: (%10d, %10d) :%s", $tid, $pos, $size, $line; to printf "%3d: (%10d, %10d, %10d) :%s", $tid, $pos, $size, $Q->pending, $line; I saw that I wasn't right in my conclusion. So, can you explain - how code in block "while( !eof FILE ) {" interacts with code in thread sub in just the right way? It is placed outside the sub. Or the trick is that you placed the join procedure only after the while loop, not just after the 'create' sub? Yes, I made a mistake when wrote my own proc with reading a lines with binary access, I've just forgot about the 'tell' function. Why 'abs($threadscnt-1)'? Because I was taught to make all my programs with even though dawns of 'fool-proof' input.

Replies are listed 'Best First'.
Re^5: Perl && threads && queues - how to make all this work together
by BrowserUk (Patriarch) on Feb 08, 2010 at 01:18 UTC
    So, can you explain - how code in block "while( !eof FILE ) {" interacts with code in thread sub in just the right way?

    Just step through the main line code and see what happens.

    1. my $Q = Thread::Queue->new;

      Creates a new queue.

    2. my @threads = map threads->create( \&thread, $Q ), 1 .. NTHREADS;

      Spawns some threads, passing the handle to the queue

    3.     while( my $line = $Q->dequeue ) {

      The threads enter a loop and try to get a line from the queue. There's nothing in the queue yet, so they block.

    4. while( !eof FILE ) {

      We haven't read all the file yet, so

    5.     sleep 0.001 while $Q->pending;

      We haven't put anything in the queue, so pending() returns false (0).

    6. for( 1 .. NTHREADS ) { $Q->enqueue( scalar <FILE> ); lock $pos; $pos = tell FILE; }

      Push 1 line onto the queue for each thread, updating $pos.

    7. while( !eof FILE ) {

      We still haven't read the whole file, so...

    8. sleep 0.001 while $Q->pending;

      The are now 30 lines in the queue, so we sleep 0.001.

      And we'll keep sleeping until the last thread has pulled the last line from the queue and pending returns zero.

      And that'll keep repeating until we've read and queued all the lines.

    9. $Q->enqueue( (undef) x NTHREADS );

      Then we queue enough undef's to terminate the loops in all the threads. All that's left to do is:

    10. $_->join for @threads;

      Wait for all the threads to terminate. and we're done.

    All the interaction is controlled entirely by the Thread::Queue module. And that's a well-tested core module, so we needn't concern ourselves with the details.

    The only other cross-thread interaction is the value of $pos. And that is usually inaccurate, because the value it contains will reflect the file position at the point the threads access it, which will mostly be entirely different to its value at the time the line that thread is processing was read. Because it spent some time sitting in the queue.

    I assumed that this was only in your sample code as a mechanism of tracking progress. As such, it served the purpose of demonstrating that DIY locking often doesn't achieve the goal you set out to achieve.

    If it is important for the threads to know the file position associated with each line, then you should pass the value with the line via the queue. Eg.

    #! perl -slw use strict; use threads; use Thread::Queue; use Time::HiRes qw[ sleep ]; use constant NTHREADS => 30; open FILE, '<', $ARGV[ 0 ] or die $!; my $size = -s FILE; sub thread { my $Q = shift; my $tid = threads->tid; while( my( $pos, $line ) = split $;, $Q->dequeue ) { printf "%3d: (%10d, %10d) :%s", $tid, $pos, $size, $line; sleep rand .5; } } my $Q = Thread::Queue->new; my @threads = map threads->create( \&thread, $Q ), 1 .. NTHREADS; while( !eof FILE ) { sleep 0.001 while $Q->pending; $Q->enqueue( join $;, tell( FILE ), scalar <FILE> ) for 1 .. NTHRE +ADS } $Q->enqueue( (undef) x NTHREADS ); $_->join for @threads;

    If you run that, you'll see that the pos reflects the true position within the file from which the line was read. Note that the position is read before the line. Note also that $pos is no longer shared, the need for locking it goes away, and so the code runs far more efficiently. The code got both simpler and more efficient.


    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.
Re^5: Perl && threads && queues - how to make all this work together
by jethro (Monsignor) on Feb 08, 2010 at 01:22 UTC
      sleep 0.001 while $Q->pending;

    == As long as there is something in the queue, do nothing

    for( 1 .. NTHREADS ) { $Q->enqueue( scalar <FILE> ); ... }

    == read exactly NTHREADS (i.e. 30) lines from the file and put them in the queue.

    And then we are back to doing nothing as long as there is something in the queue

    As you can see the call to 'pending' stops the loop from reading more than the NTHREADS lines it is allowed after the queue was empty

    The join at the end is just to wait for the threads to exit after they have been signaled to exit. Before that the 'undef's pushed on the queue make sure that all the threads get such a signal. By putting NTHREADS of 'undef's into the queue it is made sure that every threads gets its STOP signal