sanc has asked for the wisdom of the Perl Monks concerning the following question:

Hi I have the following code that creates workers processing a queue. The problem I have is when to undefine the queue. My only solution I found was to create an array to hold all threads that are idling. When the array count is equal the thread count I undefine the queue and join the threads. I am sure there is better / cleaner ways to do that.. Any help would be appreciated.
sub pullDataFromDbWithDirectory { my $_dir = $_[0]; if ($itemCount <= $maxNumberOfItems) { my @retval = grep { /^Dir|^File/ } qx($omnidb -filesystem $fil +esystem '$label' -listdir '$_dir'); foreach my $item (@retval) { $itemCount++; (my $filename = $item) =~ s/^File\s+|^Dir\s+|\n//g; my $file = "$_dir/$filename"; push(@data,$file); if ($item =~ /^Dir/) { $worker->enqueue($file); print "Add $file to queue\n" if $debug; } } } } sub doOperation () { my $ithread = threads->tid(); do { my $folder = $worker->dequeue(); print "Read $folder from queue with thread $ithread\n" if $debu +g; pullDataFromDbWithDirectory($folder); } while ($worker->pending()); push(@IDLE_THREADS,$ithread); }
This is the main section
my @threads = map threads->create(\&doOperation), 1 .. $maxNumberOfPar +allelJobs; pullDataFromDbWithDirectory($directory); sleep 0.01 while (scalar @IDLE_THREADS < $maxNumberOfParallelJobs); $worker->enqueue((undef) x $maxNumberOfParallelJobs); $_->join for @threads; printData();

Replies are listed 'Best First'.
Re: Proper undefine queue with multithreads
by BrowserUk (Patriarch) on Jun 03, 2014 at 13:15 UTC

    Your queue handling is fatally flawed.

    Why are you exiting your queue reading loop if there is nothing pending?

    do { ... } while ($worker->pending());

    If (for example), some other process decides to hammer the disk drive, then your queue population code might stall waiting for access to the disk, at which point your queue will empty and all your worker threads will terminate despite that there are still files to be read from disk.

    Also, why are you calling (pullDataFromDbWithDirectory(), bith in your main thread and in all your worker threads. That's very confused.


    The normal way to do this is:

    1. Have your queue worker thread queue reading loops terminate when they see undef:
      sub doOperation () { my $ithread = threads->tid(); while( my $folder = $worker->dequeue() ) { print "Read $folder from queue with thread $ithread\n" if $deb +ug; ## Do something useful here... } }
    2. Start your threads before you populate the queue:

      They will all block on the dequeue() until something is made available.

    3. Populate your queue from you main thread (*ONLY*);

      The threads will start doing work as soon as your main thread gives them something to work on.

    4. Once the main thread has finished populating the queue, it then queues one undef per worker thread to cause the worker loops to terminate and thus the worker threads to end.

      You are queing undefs, but you're not doing it until you've already seen that your threads have ended, at which point it serves no purpose.

    5. Finally, loop over the thread handles calling join() to ensure all the threads have finished before you exit the program.

    This way, the whole process becomes self-managing and you don;t have to poll to count threads.


    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.
      Thank you for your valuable input. I am not reading a filesystem but issuing an omnidb command which will query HP Dataprotector database for backed up files. There problem here is that it does not support recursive listing. Which means I have to run the omnidb command for each found directory from the previous output. Having that said I can not populate the queue completely. As querying the db takes a lot of time I try to do that in parallel. Would that still be doable with the normal way you just explained? Would you mind showing a small example? Thanks a lot
        Would that still be doable with the normal way you just explained?

        Yes. But you're going to have to explain what output you get from this "omnidb" command; because I've never heard of it.

        Would you mind showing a small example?

        Given that your example code is incomplete, which leaves us none the wiser as to and how a whole bunch of global variables used by pullDataFromDbWithDirectory() -- eg. $itemCount, $maxNumberOfItems, $omnidb, $filesystem, $label, @data -- are initialised/used, it would require gobs of time and guesswork to try and construct a working example.

        Post your full code.

        Post a sample output from the command.

        Explain what you are going to do with the list of files you are building in @data.

        Then maybe we stand a chance of helping you.


        BTW: Ignore/downvote both existing and any further replies you get from sundialsvc4. He has no working knowledge of using Perl's threading and has a history of posting long, rambling, always useless, often dangerous "suggestions".

        He even knows this: " Clearly, none of my suggestions would apply to this particular case. ", but he continues to waste everyones time by posting these useless, over generic replies on subjects that he has been proven, time & time again, to have no first hand knowledge.

        Why? I think the poor ol' thing is getting so senile that he genuinely forgets that he's only regurgitating things he's read rather than his own experiences. Sadly, whilst he seems to be able to retrieve odd snippets of generally good advice; he always seems to forget the correct context, rendering them useless.


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.
      A reply falls below the community's threshold of quality. You may see it by logging in.
    A reply falls below the community's threshold of quality. You may see it by logging in.