in reply to Re^2: Perl && threads && queues - how to make all this work together
in thread Perl && threads && queues - how to make all this work together

BrowserUk is the expert for threads here, so I would follow his advice and use Threads::Queue (advantage: lots of error possibilities go away if you use the tried and tested module for the queue). Then open a new question here with your new code if you have the same or other problems.

You could also try to get some more information about what is happening. For example: Let every new thread open its own logfile and print to it the time it started and the time before and after it gets an item from the queue.

By the way, is your sub getnewsline unfinished code? Because it seems to have something missing:
You use binary access because of large files. But as long as the text lines are not too long the size of the file won't matter since you only read one line at a time. Only if you also read binary files or files with really (really really) long lines would there be problems. But in both these cases your code as it stands would run into the same problems as a simple line read because you just simulate a simple line read. You would have to add code to stop the line reading if the line read exceeds some specific amount

  • Comment on Re^3: Perl && threads && queues - how to make all this work together

Replies are listed 'Best First'.
Re^4: Perl && threads && queues - how to make all this work together
by xaero123 (Novice) on Feb 07, 2010 at 23:43 UTC
    jethro - thanks, I've thought about it, but not as deep as needed! :) BrowserUk - I am very pleased that an expert in threading tries to help me! But can you tell more about some lines of your code: At a first look I thought that you code is placing all lines of file in queue in one time. But when I changed printf "%3d: (%10d, %10d) :%s", $tid, $pos, $size, $line; to printf "%3d: (%10d, %10d, %10d) :%s", $tid, $pos, $size, $Q->pending, $line; I saw that I wasn't right in my conclusion. So, can you explain - how code in block "while( !eof FILE ) {" interacts with code in thread sub in just the right way? It is placed outside the sub. Or the trick is that you placed the join procedure only after the while loop, not just after the 'create' sub? Yes, I made a mistake when wrote my own proc with reading a lines with binary access, I've just forgot about the 'tell' function. Why 'abs($threadscnt-1)'? Because I was taught to make all my programs with even though dawns of 'fool-proof' input.
      So, can you explain - how code in block "while( !eof FILE ) {" interacts with code in thread sub in just the right way?

      Just step through the main line code and see what happens.

      1. my $Q = Thread::Queue->new;

        Creates a new queue.

      2. my @threads = map threads->create( \&thread, $Q ), 1 .. NTHREADS;

        Spawns some threads, passing the handle to the queue

      3.     while( my $line = $Q->dequeue ) {

        The threads enter a loop and try to get a line from the queue. There's nothing in the queue yet, so they block.

      4. while( !eof FILE ) {

        We haven't read all the file yet, so

      5.     sleep 0.001 while $Q->pending;

        We haven't put anything in the queue, so pending() returns false (0).

      6. for( 1 .. NTHREADS ) { $Q->enqueue( scalar <FILE> ); lock $pos; $pos = tell FILE; }

        Push 1 line onto the queue for each thread, updating $pos.

      7. while( !eof FILE ) {

        We still haven't read the whole file, so...

      8. sleep 0.001 while $Q->pending;

        The are now 30 lines in the queue, so we sleep 0.001.

        And we'll keep sleeping until the last thread has pulled the last line from the queue and pending returns zero.

        And that'll keep repeating until we've read and queued all the lines.

      9. $Q->enqueue( (undef) x NTHREADS );

        Then we queue enough undef's to terminate the loops in all the threads. All that's left to do is:

      10. $_->join for @threads;

        Wait for all the threads to terminate. and we're done.

      All the interaction is controlled entirely by the Thread::Queue module. And that's a well-tested core module, so we needn't concern ourselves with the details.

      The only other cross-thread interaction is the value of $pos. And that is usually inaccurate, because the value it contains will reflect the file position at the point the threads access it, which will mostly be entirely different to its value at the time the line that thread is processing was read. Because it spent some time sitting in the queue.

      I assumed that this was only in your sample code as a mechanism of tracking progress. As such, it served the purpose of demonstrating that DIY locking often doesn't achieve the goal you set out to achieve.

      If it is important for the threads to know the file position associated with each line, then you should pass the value with the line via the queue. Eg.

      #! perl -slw use strict; use threads; use Thread::Queue; use Time::HiRes qw[ sleep ]; use constant NTHREADS => 30; open FILE, '<', $ARGV[ 0 ] or die $!; my $size = -s FILE; sub thread { my $Q = shift; my $tid = threads->tid; while( my( $pos, $line ) = split $;, $Q->dequeue ) { printf "%3d: (%10d, %10d) :%s", $tid, $pos, $size, $line; sleep rand .5; } } my $Q = Thread::Queue->new; my @threads = map threads->create( \&thread, $Q ), 1 .. NTHREADS; while( !eof FILE ) { sleep 0.001 while $Q->pending; $Q->enqueue( join $;, tell( FILE ), scalar <FILE> ) for 1 .. NTHRE +ADS } $Q->enqueue( (undef) x NTHREADS ); $_->join for @threads;

      If you run that, you'll see that the pos reflects the true position within the file from which the line was read. Note that the position is read before the line. Note also that $pos is no longer shared, the need for locking it goes away, and so the code runs far more efficiently. The code got both simpler and more efficient.


      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.
        sleep 0.001 while $Q->pending;

      == As long as there is something in the queue, do nothing

      for( 1 .. NTHREADS ) { $Q->enqueue( scalar <FILE> ); ... }

      == read exactly NTHREADS (i.e. 30) lines from the file and put them in the queue.

      And then we are back to doing nothing as long as there is something in the queue

      As you can see the call to 'pending' stops the loop from reading more than the NTHREADS lines it is allowed after the queue was empty

      The join at the end is just to wait for the threads to exit after they have been signaled to exit. Before that the 'undef's pushed on the queue make sure that all the threads get such a signal. By putting NTHREADS of 'undef's into the queue it is made sure that every threads gets its STOP signal