in reply to Re^4: Proper undefine queue with multithreads
in thread Proper undefine queue with multithreads

Untested and without trying to understand the rest if your code, try the following minimal changes to your posted code:

#!/usr/bin/perl -w BEGIN { our($_pathname,$_filename) = ( $0 =~ m[(.*)/([^/]+)$] ) ? ( $1, $2 ) : ( ".", $0 ); push @INC,$_pathname; }; sub usage { print STDERR "\nERROR: $_[0]\nUsage:\n", <<"EndOfDescription"; $_filename Required Parameters: --filesystem 'host:dir' Filesystem with format + host:fs ex. host:/SHARED --label 'label' Label --dir 'directory' Directory to search Optional Parameters: --recursive Recursive search --maxCount 10000 Maximum allowed item count --threads 10 Maximul parallel jobs --exclude dir Can be specified muliple times EndOfDescription exit 2 } # ------------------------- # Required libraries # ------------------------- use strict; use Data::Dumper; use Getopt::Long; use Term::ANSIColor; use threads; use Thread::Queue; # ------------------------- # Global Variables # ------------------------- my $omnidb = '/opt/omni/bin/omnidb'; my @data :shared; my $maxNumberOfParallelJobs = 10; my $maxNumberOfItems = 10000; my $itemCount = 0; my $worker = Thread::Queue->new(); my @IDLE_THREADS :shared; # ------------------------- # Argument handling # ------------------------- my( $filesystem, $label, $directory, $recursive, $debug, @exclude ); Getopt::Long::Configure("pass_through"); GetOptions( q{filesystem=s} => \$filesystem, q{label=s} => \$label, q{dir=s} => \$directory, q{recursive!} => \$recursive, q{maxCount=i} => \$maxNumberOfItems, q{threads=i} => \$maxNumberOfParallelJobs, q{debug!} => \$debug, q{exclude=s} => \@exclude ); usage "Invalid argument(s)." if (grep {/^-/o } @ARGV); my( @args ) = @ARGV; if ( !($filesystem || $label || $directory) ) { usage "Not enough arguments." if (! @args ); } sub pullDataFromDbWithDirectory { my $_dir = $_[0]; if ($itemCount <= $maxNumberOfItems) { my @retval = grep { /^Dir|^File/ } qx($omnidb -filesystem $fil +esystem '$label' -listdir '$_dir'); foreach my $item (@retval) { $itemCount++; (my $filename = $item) =~ s/^File\s+|^Dir\s+|\n//g; my $file = "$_dir/$filename"; if (!($file ~~ @exclude)) { push(@data,$file); if ($item =~ /^Dir/) { $worker->enqueue( $file ); print "Add $file to queue\n" if $debug; } } } } } sub doOperation () { my $ithread = threads->tid(); while( my $folder = $worker->dequeue() ) { print "Read $folder from queue with thread $ithread\n" if $deb +ug; pullDataFromDbWithDirectory($folder); }; } sub printData { foreach my $file (sort @data) { print "$file\n"; } if ($itemCount > $maxNumberOfItems) { print colored ['red on_black'], "\nWARNING: Maximum item count + of $itemCount / $maxNumberOfItems has been reached. Please adjust yo +ur filter\n"; } } # ------------------------- # Main # ------------------------- print "Exclude: " . Dumper( \@exclude ) if $debug; my @threads = map{ threads->create( \&doOperation ) } 1 .. $maxNumberOfParallelJobs; pullDataFromDbWithDirectory( $directory ); $worker->enqueue( (undef) x $maxNumberOfParallelJobs ); $_->join for @threads; printData();

With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.

Replies are listed 'Best First'.
Re^6: Proper undefine queue with multithreads
by sanc (Initiate) on Jun 05, 2014 at 13:44 UTC
    Hi Thanks for your answer. This is exactly how I had it in the beginning. The problem here is that it does not parse all the sub directories. The problem here I think is that as soon as the queue gets empty it hits the undefined entries. But after that, new entries are being pushed into the queue but get ignored because the threads were joined already at that time. Or maybe I misunderstood the logic behind adding undef into the queue. Here an example:
    # ./dpfilesearch.pl --object server:/SHARED --label '/SHARED (test)' - +-dir /SHARED/tmp/test --recursive --threads 1 /SHARED/tmp/test/dir7 /SHARED/tmp/test/dir7/dir1 /SHARED/tmp/test/dir7/dir2 /SHARED/tmp/test/dir7/dir3 /SHARED/tmp/test/dir7/dir4 /SHARED/tmp/test/dir7/dir5 /SHARED/tmp/test/dir7/dir6 /SHARED/tmp/test/dir7/dir7 /SHARED/tmp/test/dir7/dir8 /SHARED/tmp/test/dir7/dir9 Still pending in queue: 18
    While if I search directly within dir7 I get more subfolders reviled.
    # ./dpfilesearch.pl --object server:/SHARED --label '/SHARED (test)' - +-dir /SHARED/tmp/test/dir7 --recursive --threads 1 /SHARED/tmp/test/dir7/dir1 /SHARED/tmp/test/dir7/dir2 /SHARED/tmp/test/dir7/dir3 /SHARED/tmp/test/dir7/dir4 /SHARED/tmp/test/dir7/dir5 /SHARED/tmp/test/dir7/dir6 /SHARED/tmp/test/dir7/dir7 /SHARED/tmp/test/dir7/dir8 /SHARED/tmp/test/dir7/dir9 /SHARED/tmp/test/dir7/dir9/dir1 /SHARED/tmp/test/dir7/dir9/dir2 /SHARED/tmp/test/dir7/dir9/dir3 /SHARED/tmp/test/dir7/dir9/dir4 /SHARED/tmp/test/dir7/dir9/dir5 /SHARED/tmp/test/dir7/dir9/dir6 /SHARED/tmp/test/dir7/dir9/dir7 /SHARED/tmp/test/dir7/dir9/dir8 /SHARED/tmp/test/dir7/dir9/dir9 Still pending in queue: 9
    It is the same if I specify more threads than one. I add a $worker->pending() just after printing the output.

      Try this (again untested) code.

      I've yet to convince myself that there isn't a race condition here, but if there is, you'll probably find it by running it for real quicker than I will be running it mentally:

      #!/usr/bin/perl -w BEGIN { our($_pathname,$_filename) = ( $0 =~ m[(.*)/([^/]+)$] ) ? ( $1, $2 ) : ( ".", $0 ); push @INC,$_pathname; }; sub usage { print STDERR "\nERROR: $_[0]\nUsage:\n", <<"EndOfDescription"; $_filename Required Parameters: --filesystem 'host:dir' Filesystem with format + host:fs ex. host:/SHARED --label 'label' Label --dir 'directory' Directory to search Optional Parameters: --recursive Recursive search --maxCount 10000 Maximum allowed item count --threads 10 Maximul parallel jobs --exclude dir Can be specified muliple times EndOfDescription exit 2 } # ------------------------- # Required libraries # ------------------------- use strict; use Data::Dumper; use Getopt::Long; use Term::ANSIColor; use threads; use Thread::Queue; # ------------------------- # Global Variables # ------------------------- my $omnidb = '/opt/omni/bin/omnidb'; my @data :shared; my $maxNumberOfParallelJobs = 10; my $maxNumberOfItems = 10000; my $itemCount = 0; my $worker = Thread::Queue->new(); my $idle :shared = $maxNumberOfParallelJobs; # ------------------------- # Argument handling # ------------------------- my( $filesystem, $label, $directory, $recursive, $debug, @exclude ); Getopt::Long::Configure("pass_through"); GetOptions( q{filesystem=s} => \$filesystem, q{label=s} => \$label, q{dir=s} => \$directory, q{recursive!} => \$recursive, q{maxCount=i} => \$maxNumberOfItems, q{threads=i} => \$maxNumberOfParallelJobs, q{debug!} => \$debug, q{exclude=s} => \@exclude ); usage "Invalid argument(s)." if (grep {/^-/o } @ARGV); my( @args ) = @ARGV; if ( !($filesystem || $label || $directory) ) { usage "Not enough arguments." if (! @args ); } sub pullDataFromDbWithDirectory { my $_dir = $_[0]; if ($itemCount <= $maxNumberOfItems) { my @retval = grep { /^Dir|^File/ } qx($omnidb -filesystem $fil +esystem '$label' -listdir '$_dir'); foreach my $item (@retval) { $itemCount++; (my $filename = $item) =~ s/^File\s+|^Dir\s+|\n//g; my $file = "$_dir/$filename"; if (!($file ~~ @exclude)) { push(@data,$file); if ($item =~ /^Dir/) { $worker->enqueue( $file ); print "Add $file to queue\n" if $debug; } } } } } sub doOperation () { my $ithread = threads->tid(); while( my $folder = $worker->dequeue() ) { { lock $idle; --$idle } print "Read $folder from queue with thread $ithread\n" if $deb +ug; pullDataFromDbWithDirectory($folder); { lock $idle; ++$idle } }; } sub printData { foreach my $file (sort @data) { print "$file\n"; } if ($itemCount > $maxNumberOfItems) { print colored ['red on_black'], "\nWARNING: Maximum item count + of $itemCount / $maxNumberOfItems has been reached. Please adjust yo +ur filter\n"; } } # ------------------------- # Main # ------------------------- print "Exclude: " . Dumper( \@exclude ) if $debug; my @threads = map{ threads->create( \&doOperation ) } 1 .. $maxNumberOfParallelJobs; pullDataFromDbWithDirectory( $directory ); sleep 1 until $idle < $maxNumberOfParallelJobs; ## Wait for (some) thr +eads to get started sleep 1 until $idle == $maxNumberOfParallelJobs; ## wait until they a +re all done $worker->enqueue( (undef) x $maxNumberOfParallelJobs ); ## Tell'em to +stop $_->join for @threads; printData();

      With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.

        Ok. The problem was that my $idle :shared = $maxNumberOfParallelJobs; was specified too early as maxNumberOfParallelJobs gets overwritten later using parameters.,

        Your solution performs well and I get the idea of how this all should work together. Thank you for your patience.

        It does have a race condition. Explained and solved here

        Hi, thanks for your code.

        The code you provided works well with >1 threads. As soon as I run it with $maxNumberOfParallelJobs=1 it will hang before joining the threads and it will not display the output of printData()

        This is the output when I run the script in debug mode

        Add /SHARED/tmp/test/dir9 to queue Add /SHARED/tmp/test/dir8 to queue Add /SHARED/tmp/test/dir7 to queue Add /SHARED/tmp/test/dir6 to queue Add /SHARED/tmp/test/dir5 to queue Add /SHARED/tmp/test/dir4 to queue Add /SHARED/tmp/test/dir3 to queue Add /SHARED/tmp/test/dir2 to queue Read /SHARED/tmp/test/dir9 from queue with thread 1 Add /SHARED/tmp/test/dir1 to queue Read /SHARED/tmp/test/dir8 from queue with thread 1 Read /SHARED/tmp/test/dir7 from queue with thread 1 Add /SHARED/tmp/test/dir7/dir9 to queue Add /SHARED/tmp/test/dir7/dir8 to queue Add /SHARED/tmp/test/dir7/dir7 to queue Add /SHARED/tmp/test/dir7/dir6 to queue Add /SHARED/tmp/test/dir7/dir5 to queue Add /SHARED/tmp/test/dir7/dir4 to queue Add /SHARED/tmp/test/dir7/dir3 to queue Add /SHARED/tmp/test/dir7/dir2 to queue Add /SHARED/tmp/test/dir7/dir1 to queue Read /SHARED/tmp/test/dir6 from queue with thread 1 Read /SHARED/tmp/test/dir5 from queue with thread 1 Read /SHARED/tmp/test/dir4 from queue with thread 1 Read /SHARED/tmp/test/dir3 from queue with thread 1 Read /SHARED/tmp/test/dir2 from queue with thread 1 Add /SHARED/tmp/test/dir2/dir9 to queue Add /SHARED/tmp/test/dir2/dir8 to queue Add /SHARED/tmp/test/dir2/dir7 to queue Add /SHARED/tmp/test/dir2/dir6 to queue Add /SHARED/tmp/test/dir2/dir5 to queue Add /SHARED/tmp/test/dir2/dir4 to queue Add /SHARED/tmp/test/dir2/dir3 to queue Add /SHARED/tmp/test/dir2/dir2 to queue Add /SHARED/tmp/test/dir2/dir1 to queue Read /SHARED/tmp/test/dir1 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9 from queue with thread 1 Add /SHARED/tmp/test/dir7/dir9/dir9 to queue Add /SHARED/tmp/test/dir7/dir9/dir8 to queue Add /SHARED/tmp/test/dir7/dir9/dir7 to queue Add /SHARED/tmp/test/dir7/dir9/dir6 to queue Add /SHARED/tmp/test/dir7/dir9/dir5 to queue Add /SHARED/tmp/test/dir7/dir9/dir4 to queue Add /SHARED/tmp/test/dir7/dir9/dir3 to queue Add /SHARED/tmp/test/dir7/dir9/dir2 to queue Add /SHARED/tmp/test/dir7/dir9/dir1 to queue Read /SHARED/tmp/test/dir7/dir8 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir7 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir6 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir5 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir4 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir3 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir2 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir1 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir9 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir8 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir7 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir6 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir5 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir4 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir3 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir2 from queue with thread 1 Read /SHARED/tmp/test/dir2/dir1 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir9 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir8 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir7 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir6 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir5 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir4 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir3 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir2 from queue with thread 1 Read /SHARED/tmp/test/dir7/dir9/dir1 from queue with thread 1 waiting here.. aborted with Ctrl+C
      The problem here I think is that as soon as the queue gets empty it hits the undefined entries. But after that, new entries are being pushed into the queue but get ignored because the threads were joined already at that time.

      Okay. So now we get to the crux of the real question you should have asked: How to determine when there are no more directories to be processed? (And thus, the threads can safely terminate.)

      You cannot have the main thread post the undefs once it has finished the initial population of the queue, because the worker threads will be adding to the queue as they process that initial population.

      And you cannot use the absence of queue entries (Q->pending == 0), because another thread may add another directory to the queue a millisecond later.

      This is the classic chicken & egg scenario. How can any of the worker threads know when they are finished, when at any given moment in time one of the other worker threads might queue an new directory to be processed?

      I don't have an answer for you yet. I'm not sure that there is an answer given the current methodology of your code.

      I'm sure that there is a better way, and I'll get back to you once I think of it.


      With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.