in reply to Re^6: Proper undefine queue with multithreads
in thread Proper undefine queue with multithreads

Try this (again untested) code.

I've yet to convince myself that there isn't a race condition here, but if there is, you'll probably find it by running it for real quicker than I will be running it mentally:

#!/usr/bin/perl -w BEGIN { our($_pathname,$_filename) = ( $0 =~ m[(.*)/([^/]+)$] ) ? ( $1, $2 ) : ( ".", $0 ); push @INC,$_pathname; }; sub usage { print STDERR "\nERROR: $_[0]\nUsage:\n", <<"EndOfDescription"; $_filename Required Parameters: --filesystem 'host:dir' Filesystem with format + host:fs ex. host:/SHARED --label 'label' Label --dir 'directory' Directory to search Optional Parameters: --recursive Recursive search --maxCount 10000 Maximum allowed item count --threads 10 Maximul parallel jobs --exclude dir Can be specified muliple times EndOfDescription exit 2 } # ------------------------- # Required libraries # ------------------------- use strict; use Data::Dumper; use Getopt::Long; use Term::ANSIColor; use threads; use Thread::Queue; # ------------------------- # Global Variables # ------------------------- my $omnidb = '/opt/omni/bin/omnidb'; my @data :shared; my $maxNumberOfParallelJobs = 10; my $maxNumberOfItems = 10000; my $itemCount = 0; my $worker = Thread::Queue->new(); my $idle :shared = $maxNumberOfParallelJobs; # ------------------------- # Argument handling # ------------------------- my( $filesystem, $label, $directory, $recursive, $debug, @exclude ); Getopt::Long::Configure("pass_through"); GetOptions( q{filesystem=s} => \$filesystem, q{label=s} => \$label, q{dir=s} => \$directory, q{recursive!} => \$recursive, q{maxCount=i} => \$maxNumberOfItems, q{threads=i} => \$maxNumberOfParallelJobs, q{debug!} => \$debug, q{exclude=s} => \@exclude ); usage "Invalid argument(s)." if (grep {/^-/o } @ARGV); my( @args ) = @ARGV; if ( !($filesystem || $label || $directory) ) { usage "Not enough arguments." if (! @args ); } sub pullDataFromDbWithDirectory { my $_dir = $_[0]; if ($itemCount <= $maxNumberOfItems) { my @retval = grep { /^Dir|^File/ } qx($omnidb -filesystem $fil +esystem '$label' -listdir '$_dir'); foreach my $item (@retval) { $itemCount++; (my $filename = $item) =~ s/^File\s+|^Dir\s+|\n//g; my $file = "$_dir/$filename"; if (!($file ~~ @exclude)) { push(@data,$file); if ($item =~ /^Dir/) { $worker->enqueue( $file ); print "Add $file to queue\n" if $debug; } } } } } sub doOperation () { my $ithread = threads->tid(); while( my $folder = $worker->dequeue() ) { { lock $idle; --$idle } print "Read $folder from queue with thread $ithread\n" if $deb +ug; pullDataFromDbWithDirectory($folder); { lock $idle; ++$idle } }; } sub printData { foreach my $file (sort @data) { print "$file\n"; } if ($itemCount > $maxNumberOfItems) { print colored ['red on_black'], "\nWARNING: Maximum item count + of $itemCount / $maxNumberOfItems has been reached. Please adjust yo +ur filter\n"; } } # ------------------------- # Main # ------------------------- print "Exclude: " . Dumper( \@exclude ) if $debug; my @threads = map{ threads->create( \&doOperation ) } 1 .. $maxNumberOfParallelJobs; pullDataFromDbWithDirectory( $directory ); sleep 1 until $idle < $maxNumberOfParallelJobs; ## Wait for (some) thr +eads to get started sleep 1 until $idle == $maxNumberOfParallelJobs; ## wait until they a +re all done $worker->enqueue( (undef) x $maxNumberOfParallelJobs ); ## Tell'em to +stop $_->join for @threads; printData();

With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.

Replies are listed 'Best First'.
Re^8: Proper undefine queue with multithreads
by sanc (Initiate) on Jun 06, 2014 at 07:22 UTC

    Ok. The problem was that my $idle :shared = $maxNumberOfParallelJobs; was specified too early as maxNumberOfParallelJobs gets overwritten later using parameters.,

    Your solution performs well and I get the idea of how this all should work together. Thank you for your patience.

      You're most welcome.

      With regard to the "race condition" suggested here. It's a red herring!

      The description:

      1. worker thread dequeues last item
      2. main thread checks pending (false)
      3. main thread checks number of busy threads (none)
      4. main thread signals workers to end
      5. worker marks itself busy
      6. worker starts processing last item
      7. fails to consider what happens at this point.

      8. once the worker processing the last item finishes, it loops back to check for another, gets undef, exits the loop and terminates.
      9. and the main thread, waiting on the join, sees it terminate and completes. Clean exit. Job done.

      There is simply no need for the over-engineered, overcomplicated solution suggested.


      With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.

        You missed the point. It shouldn't terminate in step 7!

        When I said it dequeued the last item, I meant the last item of the queue. There could be millions more items to process, but they're not going to be processed. Updated the linked post for clarification.

Re^8: Proper undefine queue with multithreads
by ikegami (Patriarch) on Jun 05, 2014 at 18:54 UTC
    It does have a race condition. Explained and solved here
      ikegami, the queue does locking, doesn't make sense to add locking on top of that
        It's not locking long enough. There's a step by step demonstration of the race condition in the linked post.
Re^8: Proper undefine queue with multithreads
by sanc (Initiate) on Jun 06, 2014 at 06:57 UTC

    Hi, thanks for your code.

    The code you provided works well with >1 threads. As soon as I run it with $maxNumberOfParallelJobs=1 it will hang before joining the threads and it will not display the output of printData()

    This is the output when I run the script in debug mode

    Add /SHARED/tmp/test/dir9 to queue Add /SHARED/tmp/test/dir8 to queue Add /SHARED/tmp/test/dir7 to queue Add /SHARED/tmp/test/dir6 to queue Add /SHARED/tmp/test/dir5 to queue Add /SHARED/tmp/test/dir4 to queue Add /SHARED/tmp/test/dir3 to queue Add /SHARED/tmp/test/dir2 to queue Read /SHARED/tmp/test/dir9 from queue with thread 1 Add /SHARED/tmp/test/dir1 to queue Read /SHARED/tmp/test/dir8 from queue with thread 1 Read /SHARED/tmp/test/dir7 from queue with thread 1 Add /SHARED/tmp/test/dir7/dir9 to queue Add /SHARED/tmp/test/dir7/dir8 to queue Add /SHARED/tmp/test/dir7/dir7 to queue Add /SHARED/tmp/test/dir7/dir6 to queue Add /SHARED/tmp/test/dir7/dir5 to queue Add /SHARED/tmp/test/dir7/dir4 to queue Add /SHARED/tmp/test/dir7/dir3 to queue Add /SHARED/tmp/test/dir7/dir2 to queue Add /SHARED/tmp/test/dir7/dir1 to queue Read /SHARED/tmp/test/dir6 from queue with thread 1 Read /SHARED/tmp/test/dir5 from queue with thread 1 Read /SHARED/tmp/test/dir4 from queue with thread 1 Read /SHARED/tmp/test/dir3 from queue with thread 1 Read /SHARED/tmp/test/dir2 from queue with thread 1 Add /SHARED/tmp/test/dir2/dir9 to queue Add /SHARED/tmp/test/dir2/dir8 to queue Add /SHARED/tmp/test/dir2/dir7 to queue Add /SHARED/tmp/test/dir2/dir6 to queue Add /SHARED/tmp/test/dir2/dir5 to queue Add /SHARED/tmp/test/dir2/dir4 to queue Add /SHARED/tmp/test/dir2/dir3 to queue Add /SHARED/tmp/test/dir2/dir2 to queue Add /SHARED/tmp/test/dir2/dir1 to queue Read /SHARED/tmp/test/dir1 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9 from queue with thread 1 Add /SHARED/tmp/test/dir7/dir9/dir9 to queue Add /SHARED/tmp/test/dir7/dir9/dir8 to queue Add /SHARED/tmp/test/dir7/dir9/dir7 to queue Add /SHARED/tmp/test/dir7/dir9/dir6 to queue Add /SHARED/tmp/test/dir7/dir9/dir5 to queue Add /SHARED/tmp/test/dir7/dir9/dir4 to queue Add /SHARED/tmp/test/dir7/dir9/dir3 to queue Add /SHARED/tmp/test/dir7/dir9/dir2 to queue Add /SHARED/tmp/test/dir7/dir9/dir1 to queue Read /SHARED/tmp/test/dir7/dir8 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir7 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir6 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir5 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir4 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir3 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir2 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir1 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir9 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir8 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir7 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir6 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir5 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir4 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir3 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir2 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir1 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir9 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir8 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir7 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir6 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir5 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir4 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir3 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir2 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir1 from queue with thread 1 waiting here.. aborted with Ctrl+C