in reply to Re^5: Proper undefine queue with multithreads
in thread Proper undefine queue with multithreads

Hi Thanks for your answer. This is exactly how I had it in the beginning. The problem here is that it does not parse all the sub directories. The problem here I think is that as soon as the queue gets empty it hits the undefined entries. But after that, new entries are being pushed into the queue but get ignored because the threads were joined already at that time. Or maybe I misunderstood the logic behind adding undef into the queue. Here an example:
# ./dpfilesearch.pl --object server:/SHARED --label '/SHARED (test)' - +-dir /SHARED/tmp/test --recursive --threads 1 /SHARED/tmp/test/dir7 /SHARED/tmp/test/dir7/dir1 /SHARED/tmp/test/dir7/dir2 /SHARED/tmp/test/dir7/dir3 /SHARED/tmp/test/dir7/dir4 /SHARED/tmp/test/dir7/dir5 /SHARED/tmp/test/dir7/dir6 /SHARED/tmp/test/dir7/dir7 /SHARED/tmp/test/dir7/dir8 /SHARED/tmp/test/dir7/dir9 Still pending in queue: 18
While if I search directly within dir7 I get more subfolders reviled.
# ./dpfilesearch.pl --object server:/SHARED --label '/SHARED (test)' - +-dir /SHARED/tmp/test/dir7 --recursive --threads 1 /SHARED/tmp/test/dir7/dir1 /SHARED/tmp/test/dir7/dir2 /SHARED/tmp/test/dir7/dir3 /SHARED/tmp/test/dir7/dir4 /SHARED/tmp/test/dir7/dir5 /SHARED/tmp/test/dir7/dir6 /SHARED/tmp/test/dir7/dir7 /SHARED/tmp/test/dir7/dir8 /SHARED/tmp/test/dir7/dir9 /SHARED/tmp/test/dir7/dir9/dir1 /SHARED/tmp/test/dir7/dir9/dir2 /SHARED/tmp/test/dir7/dir9/dir3 /SHARED/tmp/test/dir7/dir9/dir4 /SHARED/tmp/test/dir7/dir9/dir5 /SHARED/tmp/test/dir7/dir9/dir6 /SHARED/tmp/test/dir7/dir9/dir7 /SHARED/tmp/test/dir7/dir9/dir8 /SHARED/tmp/test/dir7/dir9/dir9 Still pending in queue: 9
It is the same if I specify more threads than one. I add a $worker->pending() just after printing the output.

Replies are listed 'Best First'.
Re^7: Proper undefine queue with multithreads
by BrowserUk (Patriarch) on Jun 05, 2014 at 14:33 UTC

    Try this (again untested) code.

    I've yet to convince myself that there isn't a race condition here, but if there is, you'll probably find it by running it for real quicker than I will be running it mentally:

    #!/usr/bin/perl -w BEGIN { our($_pathname,$_filename) = ( $0 =~ m[(.*)/([^/]+)$] ) ? ( $1, $2 ) : ( ".", $0 ); push @INC,$_pathname; }; sub usage { print STDERR "\nERROR: $_[0]\nUsage:\n", <<"EndOfDescription"; $_filename Required Parameters: --filesystem 'host:dir' Filesystem with format + host:fs ex. host:/SHARED --label 'label' Label --dir 'directory' Directory to search Optional Parameters: --recursive Recursive search --maxCount 10000 Maximum allowed item count --threads 10 Maximul parallel jobs --exclude dir Can be specified muliple times EndOfDescription exit 2 } # ------------------------- # Required libraries # ------------------------- use strict; use Data::Dumper; use Getopt::Long; use Term::ANSIColor; use threads; use Thread::Queue; # ------------------------- # Global Variables # ------------------------- my $omnidb = '/opt/omni/bin/omnidb'; my @data :shared; my $maxNumberOfParallelJobs = 10; my $maxNumberOfItems = 10000; my $itemCount = 0; my $worker = Thread::Queue->new(); my $idle :shared = $maxNumberOfParallelJobs; # ------------------------- # Argument handling # ------------------------- my( $filesystem, $label, $directory, $recursive, $debug, @exclude ); Getopt::Long::Configure("pass_through"); GetOptions( q{filesystem=s} => \$filesystem, q{label=s} => \$label, q{dir=s} => \$directory, q{recursive!} => \$recursive, q{maxCount=i} => \$maxNumberOfItems, q{threads=i} => \$maxNumberOfParallelJobs, q{debug!} => \$debug, q{exclude=s} => \@exclude ); usage "Invalid argument(s)." if (grep {/^-/o } @ARGV); my( @args ) = @ARGV; if ( !($filesystem || $label || $directory) ) { usage "Not enough arguments." if (! @args ); } sub pullDataFromDbWithDirectory { my $_dir = $_[0]; if ($itemCount <= $maxNumberOfItems) { my @retval = grep { /^Dir|^File/ } qx($omnidb -filesystem $fil +esystem '$label' -listdir '$_dir'); foreach my $item (@retval) { $itemCount++; (my $filename = $item) =~ s/^File\s+|^Dir\s+|\n//g; my $file = "$_dir/$filename"; if (!($file ~~ @exclude)) { push(@data,$file); if ($item =~ /^Dir/) { $worker->enqueue( $file ); print "Add $file to queue\n" if $debug; } } } } } sub doOperation () { my $ithread = threads->tid(); while( my $folder = $worker->dequeue() ) { { lock $idle; --$idle } print "Read $folder from queue with thread $ithread\n" if $deb +ug; pullDataFromDbWithDirectory($folder); { lock $idle; ++$idle } }; } sub printData { foreach my $file (sort @data) { print "$file\n"; } if ($itemCount > $maxNumberOfItems) { print colored ['red on_black'], "\nWARNING: Maximum item count + of $itemCount / $maxNumberOfItems has been reached. Please adjust yo +ur filter\n"; } } # ------------------------- # Main # ------------------------- print "Exclude: " . Dumper( \@exclude ) if $debug; my @threads = map{ threads->create( \&doOperation ) } 1 .. $maxNumberOfParallelJobs; pullDataFromDbWithDirectory( $directory ); sleep 1 until $idle < $maxNumberOfParallelJobs; ## Wait for (some) thr +eads to get started sleep 1 until $idle == $maxNumberOfParallelJobs; ## wait until they a +re all done $worker->enqueue( (undef) x $maxNumberOfParallelJobs ); ## Tell'em to +stop $_->join for @threads; printData();

    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.

      Ok. The problem was that my $idle :shared = $maxNumberOfParallelJobs; was specified too early as maxNumberOfParallelJobs gets overwritten later using parameters.,

      Your solution performs well and I get the idea of how this all should work together. Thank you for your patience.

        You're most welcome.

        With regard to the "race condition" suggested here. It's a red herring!

        The description:

        1. worker thread dequeues last item
        2. main thread checks pending (false)
        3. main thread checks number of busy threads (none)
        4. main thread signals workers to end
        5. worker marks itself busy
        6. worker starts processing last item
        7. fails to consider what happens at this point.

        8. once the worker processing the last item finishes, it loops back to check for another, gets undef, exits the loop and terminates.
        9. and the main thread, waiting on the join, sees it terminate and completes. Clean exit. Job done.

        There is simply no need for the over-engineered, overcomplicated solution suggested.


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.
      It does have a race condition. Explained and solved here
        ikegami, the queue does locking, doesn't make sense to add locking on top of that

      Hi, thanks for your code.

      The code you provided works well with >1 threads. As soon as I run it with $maxNumberOfParallelJobs=1 it will hang before joining the threads and it will not display the output of printData()

      This is the output when I run the script in debug mode

      Add /SHARED/tmp/test/dir9 to queue Add /SHARED/tmp/test/dir8 to queue Add /SHARED/tmp/test/dir7 to queue Add /SHARED/tmp/test/dir6 to queue Add /SHARED/tmp/test/dir5 to queue Add /SHARED/tmp/test/dir4 to queue Add /SHARED/tmp/test/dir3 to queue Add /SHARED/tmp/test/dir2 to queue Read /SHARED/tmp/test/dir9 from queue with thread 1 Add /SHARED/tmp/test/dir1 to queue Read /SHARED/tmp/test/dir8 from queue with thread 1 Read /SHARED/tmp/test/dir7 from queue with thread 1 Add /SHARED/tmp/test/dir7/dir9 to queue Add /SHARED/tmp/test/dir7/dir8 to queue Add /SHARED/tmp/test/dir7/dir7 to queue Add /SHARED/tmp/test/dir7/dir6 to queue Add /SHARED/tmp/test/dir7/dir5 to queue Add /SHARED/tmp/test/dir7/dir4 to queue Add /SHARED/tmp/test/dir7/dir3 to queue Add /SHARED/tmp/test/dir7/dir2 to queue Add /SHARED/tmp/test/dir7/dir1 to queue Read /SHARED/tmp/test/dir6 from queue with thread 1 Read /SHARED/tmp/test/dir5 from queue with thread 1 Read /SHARED/tmp/test/dir4 from queue with thread 1 Read /SHARED/tmp/test/dir3 from queue with thread 1 Read /SHARED/tmp/test/dir2 from queue with thread 1 Add /SHARED/tmp/test/dir2/dir9 to queue Add /SHARED/tmp/test/dir2/dir8 to queue Add /SHARED/tmp/test/dir2/dir7 to queue Add /SHARED/tmp/test/dir2/dir6 to queue Add /SHARED/tmp/test/dir2/dir5 to queue Add /SHARED/tmp/test/dir2/dir4 to queue Add /SHARED/tmp/test/dir2/dir3 to queue Add /SHARED/tmp/test/dir2/dir2 to queue Add /SHARED/tmp/test/dir2/dir1 to queue Read /SHARED/tmp/test/dir1 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9 from queue with thread 1 Add /SHARED/tmp/test/dir7/dir9/dir9 to queue Add /SHARED/tmp/test/dir7/dir9/dir8 to queue Add /SHARED/tmp/test/dir7/dir9/dir7 to queue Add /SHARED/tmp/test/dir7/dir9/dir6 to queue Add /SHARED/tmp/test/dir7/dir9/dir5 to queue Add /SHARED/tmp/test/dir7/dir9/dir4 to queue Add /SHARED/tmp/test/dir7/dir9/dir3 to queue Add /SHARED/tmp/test/dir7/dir9/dir2 to queue Add /SHARED/tmp/test/dir7/dir9/dir1 to queue Read /SHARED/tmp/test/dir7/dir8 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir7 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir6 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir5 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir4 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir3 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir2 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir1 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir9 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir8 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir7 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir6 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir5 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir4 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir3 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir2 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir1 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir9 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir8 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir7 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir6 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir5 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir4 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir3 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir2 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir1 from queue with thread 1 waiting here.. aborted with Ctrl+C
Re^7: Proper undefine queue with multithreads
by BrowserUk (Patriarch) on Jun 05, 2014 at 14:24 UTC
    The problem here I think is that as soon as the queue gets empty it hits the undefined entries. But after that, new entries are being pushed into the queue but get ignored because the threads were joined already at that time.

    Okay. So now we get to the crux of the real question you should have asked: How to determine when there are no more directories to be processed? (And thus, the threads can safely terminate.)

    You cannot have the main thread post the undefs once it has finished the initial population of the queue, because the worker threads will be adding to the queue as they process that initial population.

    And you cannot use the absence of queue entries (Q->pending == 0), because another thread may add another directory to the queue a millisecond later.

    This is the classic chicken & egg scenario. How can any of the worker threads know when they are finished, when at any given moment in time one of the other worker threads might queue an new directory to be processed?

    I don't have an answer for you yet. I'm not sure that there is an answer given the current methodology of your code.

    I'm sure that there is a better way, and I'll get back to you once I think of it.


    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.