xaero123 has asked for the wisdom of the Perl Monks concerning the following question:

Hello, Monks! I am a newbie in Perl threading model, so I've got some problem while using threads and shared access to data among them. My aim was to make a queue of data to work with, which is being read from a file in blocks. Block size is equal to threads count, so, first we are filling up an array with new block of data. Then threads shifts the data lines from an array, work with it, and check if an array with data queue is empty. If if is so, it initializes filling up this array with fresh data. Also I dont want to use ready-to-use classes like Thread::Queue, I need to do it only using threads and threads::shared. Here is my code(I used binary access to be able to operate very large files):
use threads; use threads::shared; my @block : shared; # block of data my $pos : shared; # offset in data-file here my $threadscnt=30; # count of threads open INFH, '<', $path.'list.txt' or die("Cannot open list.txt : $!\n") +; # file with input data $listsize = -s INFH; binmode(INFH); $pos=0; for(0..abs($threadscnt-1)){ $trarr[$_]=threads->create(\&smeacts, $_); + } for(@trarr){$_->join; } sub smeacts($) { my $num = shift(@_); until(($pos>=$listsize)and((scalar @block)==0)){ # until we reached th +e end of datafile if (@block){ { lock(@block); $sline = shift(@block); } # get line of data and lock +block from other threads to prevent errors print "from $num: $sline($pos,$listsize)\n"; sleep(rand(5)); } # emula +te some work with data if ((scalar @block)==0){ async{ getnewblock(); } } # get new block if +necessary } } print 'end.'; sub getnewsline() { my $oneline=''; my $tmp=''; while((read(INFH, $tmp, 1))and(not(($tmp eq "\n")or($tmp eq "\r")))){ +# binary reading line of data $oneline .= $tmp; $pos++; } $pos++ if ($tmp eq "\n"); # for unix new-line format if ($tmp eq "\r"){ seek(INFH, 1, 1); $pos += 2; } # for win new-line f +ormat return $oneline; } sub getnewblock() { my $i=0; my $tmp = getnewsline(); while(($i<=abs($threadscnt-1))and($tmp)){ push(@block, $tmp); $i++; $tmp = getnewsline(); } return; }
But when I try to use it, sometimes I get an infinite output, or strange output in console, like:
from 1: foo(45,999) from 2: bar(48,999) from 3: foo(45,999) ... from 8: a(1050,999)
Why it happens? Using async and lock didnt fixed the problem, as I saw. I think that the problem is in shared access, or access to line in one time... But sometimes this program gives me the perfect output, just as I expected, and sometimes some 'madness'. For info, I am using ActivePerl 5.10.1. Remember, that sometimes it gives the CORRECT output, but if you make more tests - you will see what I said! Now, what can you correct or suggest to fix this unstability? Sorry for my mistakes in English, it is not my native language.

Replies are listed 'Best First'.
Re: Perl && threads && queues - how to make all this work together
by BrowserUk (Patriarch) on Feb 06, 2010 at 01:41 UTC

    If you want to know how create a shared queue, look at the code in Thread::Queue and see how they do it. Their's works. Your's doesn't.

    If you bothered to format your code properly, other people might be encourage to read it.


    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.
Re: Perl && threads && queues - how to make all this work together
by jethro (Monsignor) on Feb 06, 2010 at 01:51 UTC

    You don't lock @block in getnewblock(). Seems to me there could be a lot of getnewblock jobs running in parallel and parallel to one read access

      I tried to place lock in async block and in getnewblock() procedure. But the result was the same =( I also tried to use Thread::Queue instead of array, but there also were some problems like I described in first post. Have you any other ideas how can this coe work correctly?

        You code is so full of

        • errors:

          eg. $pos is a shared variable accessed and modified by every thread, but you never lock it.

        • misconceptions:
          (I used binary access to be able to operate very large files)

          You do not need to use binary access in order to handle huge files in Perl (nor any other language I'm aware of).

          Which makes all the laborious (and mostly broken) effort you went to to write your own line handling:

          1. slow: reading huge files char by char will take a very long time.
          2. Completely unnecessary.
        • and other weirdness:

          Why abs($threadscnt-1)?

        it is doubtful that it would ever work reliably.

        You don't say why you wish to avoid using Thread::Queue, but your understanding of Perl, much less your understanding of threading, isn't sufficient to allow you to consider writing your own shared data handling.

        By way of encouragement, this code does pretty much exactly what your code attempts to do:

        #! perl -slw use strict; use threads qw[ yield ]; use threads::shared; use Thread::Queue; use Time::HiRes qw[ sleep ]; use constant NTHREADS => 30; my $pos :shared = 0; open FILE, '<', $ARGV[ 0 ] or die $!; my $size = -s FILE; sub thread { my $Q = shift; my $tid = threads->tid; while( my $line = $Q->dequeue ) { printf "%3d: (%10d, %10d) :%s", $tid, $pos, $size, $line; sleep rand 5; } } my $Q = Thread::Queue->new; my @threads = map threads->create( \&thread, $Q ), 1 .. NTHREADS; while( !eof FILE ) { sleep 0.001 while $Q->pending; for( 1 .. NTHREADS ) { $Q->enqueue( scalar <FILE> ); lock $pos; $pos = tell FILE; } } $Q->enqueue( (undef) x NTHREADS ); $_->join for @threads;

        It's clear, clean and simple. And works. (Though it is of dubious value, but you wrote the spec!)

        If the idea of threading your code is to allow you to process your huge file more quickly, that probably isn't going to work unless you spend an inordinate amount of time processing each line. And if that's the case, unless you're using hardware with 16 or more cores, using 30 threads is unlikely to be an optimum strategy.


        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.

        BrowserUk is the expert for threads here, so I would follow his advice and use Threads::Queue (advantage: lots of error possibilities go away if you use the tried and tested module for the queue). Then open a new question here with your new code if you have the same or other problems.

        You could also try to get some more information about what is happening. For example: Let every new thread open its own logfile and print to it the time it started and the time before and after it gets an item from the queue.

        By the way, is your sub getnewsline unfinished code? Because it seems to have something missing:
        You use binary access because of large files. But as long as the text lines are not too long the size of the file won't matter since you only read one line at a time. Only if you also read binary files or files with really (really really) long lines would there be problems. But in both these cases your code as it stands would run into the same problems as a simple line read because you just simulate a simple line read. You would have to add code to stop the line reading if the line read exceeds some specific amount

Re: Perl && threads && queues - how to make all this work together
by ikegami (Patriarch) on Feb 08, 2010 at 05:43 UTC

    Here is my code(I used binary access to be able to operate very large files):

    What do you mean by binary access? I don't see a binmode which would ensure that raw bytes are obtained (which is what is usually meant by binary mode).

    Update: bah, I'm blind.

      blind, maybe; can't tell from here. the real problem as i see it is that you know more about Perl than most monks around herw. but you express that higher knowledge by treating (many) other monks as if they are st00Pid. the not-so-thinly-veiled derision in your initial comment is unbecoming of someone who could (should?) offer more constructive comments. if the less-experienced monks in attendance really piss you off so much, would it not be a less stressful use of your time to do something else? take up yoga; meditate; run a marathon; but don't be a duckhead.

        Given what I saw for his code at the time, I couldn't understand what he said. I asked what he meant, giving my interpretation so he could correct it or let me know what I missed.

        How is that derisive or treating someone (other than myself) as stupid?