in reply to Re^3: Proper undefine queue with multithreads
in thread Proper undefine queue with multithreads

Hi

The output of omnidb looks as folows

# omnidb -filesystem server:/SHARED '/SHARED (test)' -listdir '/SHAR +ED/tmp/test' Type Filename ====================================================================== +========= File file9 File file8 File file7 File file6 File file5 File file4 File file3 File file2 File file1 Dir dir9 Dir dir8 Dir dir7 Dir dir6 Dir dir5 Dir dir4 Dir dir3 Dir dir2 Dir dir1
Explain what you are going to do with the list of files you are building in @data.

Right now I am just printing out the output. Nothing fancy.

Here the full code of my script. I am not really a perl programmer. Might not be developed the most efficient way.,?p>

#!/usr/bin/perl -w BEGIN { our($_pathname,$_filename)=($0=~m#(.*)/([^/]+)$#)?($1,$2):("." +,$0); push @INC,$_pathname; }; sub usage { ################################################################ # # Title : dpfilesearch.pl # # Autor : Christian Sandrini # # Description : print STDERR "\nERROR: $_[0]\nUsage:\n", <<"EndOfDescription"; $_filename Required Parameters: --filesystem 'host:dir' Filesystem with format + host:fs ex. host:/SHARED --label 'label' Label --dir 'directory' Directory to search Optional Parameters: --recursive Recursive search --maxCount 10000 Maximum allowed item count --threads 10 Maximul parallel jobs --exclude dir Can be specified muliple times EndOfDescription exit 2 } # ------------------------- # Required libraries # ------------------------- use strict; use Data::Dumper; use Getopt::Long; use Term::ANSIColor; use threads; use Thread::Queue; # ------------------------- # Global Variables # ------------------------- my $omnidb = '/opt/omni/bin/omnidb'; my @data :shared; my $maxNumberOfParallelJobs = 10; my $maxNumberOfItems = 10000; my $itemCount = 0; my $worker = Thread::Queue->new(); my @IDLE_THREADS :shared; # ------------------------- # Argument handling # ------------------------- my( $filesystem, $label, $directory, $recursive, $debug, @exclude ); Getopt::Long::Configure("pass_through"); GetOptions( q{filesystem=s} => \$filesystem, q{label=s} => \$label, q{dir=s} => \$directory, q{recursive!} => \$recursive, q{maxCount=i} => \$maxNumberOfItems, q{threads=i} => \$maxNumberOfParallelJ +obs, q{debug!} => \$debug, q{exclude=s} => \@exclude ); usage "Invalid argument(s)." if (grep {/^-/o } @ARGV); my( @args ) = @ARGV; if ( !($filesystem || $label || $directory) ) { usage "Not enough arguments." if (! @args ); } # ------------------------- # Methods # ------------------------- sub pullDataFromDbWithDirectory { my $_dir = $_[0]; if ($itemCount <= $maxNumberOfItems) { my @retval = grep { /^Dir|^File/ } qx($omnidb -filesys +tem $filesystem '$label' -listdir '$_dir'); foreach my $item (@retval) { $itemCount++; (my $filename = $item) =~ s/^File\s+|^Dir\s+|\ +n//g; my $file = "$_dir/$filename"; if (!($file ~~ @exclude)) { push(@data,$file); if ($item =~ /^Dir/) { $worker->enqueue($file); print "Add $file to queue\n" i +f $debug; } } } } } sub doOperation () { my $ithread = threads->tid(); do { my $folder = $worker->dequeue(); print "Read $folder from queue with thread $ithread\n" + if $debug; pullDataFromDbWithDirectory($folder); } while ($worker->pending()); push(@IDLE_THREADS,$ithread); } sub printData { foreach my $file (sort @data) { print "$file\n"; } if ($itemCount > $maxNumberOfItems) { print colored ['red on_black'], "\nWARNING: Maximum it +em count of $itemCount / $maxNumberOfItems has be + en reached. Please adjust y +our filter\n"; } } # ------------------------- # Main # ------------------------- print "Exclude: " . Dumper(\@exclude) if $debug; my @threads = map threads->create(\&doOperation), 1 .. $maxNumberOfPar +allelJobs; pullDataFromDbWithDirectory($directory); sleep 0.01 while (scalar @IDLE_THREADS < $maxNumberOfParallelJobs); $worker->enqueue((undef) x $maxNumberOfParallelJobs); $_->join for @threads; printData();

Replies are listed 'Best First'.
Re^5: Proper undefine queue with multithreads
by BrowserUk (Patriarch) on Jun 05, 2014 at 12:55 UTC

    Untested and without trying to understand the rest if your code, try the following minimal changes to your posted code:

    #!/usr/bin/perl -w BEGIN { our($_pathname,$_filename) = ( $0 =~ m[(.*)/([^/]+)$] ) ? ( $1, $2 ) : ( ".", $0 ); push @INC,$_pathname; }; sub usage { print STDERR "\nERROR: $_[0]\nUsage:\n", <<"EndOfDescription"; $_filename Required Parameters: --filesystem 'host:dir' Filesystem with format + host:fs ex. host:/SHARED --label 'label' Label --dir 'directory' Directory to search Optional Parameters: --recursive Recursive search --maxCount 10000 Maximum allowed item count --threads 10 Maximul parallel jobs --exclude dir Can be specified muliple times EndOfDescription exit 2 } # ------------------------- # Required libraries # ------------------------- use strict; use Data::Dumper; use Getopt::Long; use Term::ANSIColor; use threads; use Thread::Queue; # ------------------------- # Global Variables # ------------------------- my $omnidb = '/opt/omni/bin/omnidb'; my @data :shared; my $maxNumberOfParallelJobs = 10; my $maxNumberOfItems = 10000; my $itemCount = 0; my $worker = Thread::Queue->new(); my @IDLE_THREADS :shared; # ------------------------- # Argument handling # ------------------------- my( $filesystem, $label, $directory, $recursive, $debug, @exclude ); Getopt::Long::Configure("pass_through"); GetOptions( q{filesystem=s} => \$filesystem, q{label=s} => \$label, q{dir=s} => \$directory, q{recursive!} => \$recursive, q{maxCount=i} => \$maxNumberOfItems, q{threads=i} => \$maxNumberOfParallelJobs, q{debug!} => \$debug, q{exclude=s} => \@exclude ); usage "Invalid argument(s)." if (grep {/^-/o } @ARGV); my( @args ) = @ARGV; if ( !($filesystem || $label || $directory) ) { usage "Not enough arguments." if (! @args ); } sub pullDataFromDbWithDirectory { my $_dir = $_[0]; if ($itemCount <= $maxNumberOfItems) { my @retval = grep { /^Dir|^File/ } qx($omnidb -filesystem $fil +esystem '$label' -listdir '$_dir'); foreach my $item (@retval) { $itemCount++; (my $filename = $item) =~ s/^File\s+|^Dir\s+|\n//g; my $file = "$_dir/$filename"; if (!($file ~~ @exclude)) { push(@data,$file); if ($item =~ /^Dir/) { $worker->enqueue( $file ); print "Add $file to queue\n" if $debug; } } } } } sub doOperation () { my $ithread = threads->tid(); while( my $folder = $worker->dequeue() ) { print "Read $folder from queue with thread $ithread\n" if $deb +ug; pullDataFromDbWithDirectory($folder); }; } sub printData { foreach my $file (sort @data) { print "$file\n"; } if ($itemCount > $maxNumberOfItems) { print colored ['red on_black'], "\nWARNING: Maximum item count + of $itemCount / $maxNumberOfItems has been reached. Please adjust yo +ur filter\n"; } } # ------------------------- # Main # ------------------------- print "Exclude: " . Dumper( \@exclude ) if $debug; my @threads = map{ threads->create( \&doOperation ) } 1 .. $maxNumberOfParallelJobs; pullDataFromDbWithDirectory( $directory ); $worker->enqueue( (undef) x $maxNumberOfParallelJobs ); $_->join for @threads; printData();

    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.
      Hi Thanks for your answer. This is exactly how I had it in the beginning. The problem here is that it does not parse all the sub directories. The problem here I think is that as soon as the queue gets empty it hits the undefined entries. But after that, new entries are being pushed into the queue but get ignored because the threads were joined already at that time. Or maybe I misunderstood the logic behind adding undef into the queue. Here an example:
      # ./dpfilesearch.pl --object server:/SHARED --label '/SHARED (test)' - +-dir /SHARED/tmp/test --recursive --threads 1 /SHARED/tmp/test/dir7 /SHARED/tmp/test/dir7/dir1 /SHARED/tmp/test/dir7/dir2 /SHARED/tmp/test/dir7/dir3 /SHARED/tmp/test/dir7/dir4 /SHARED/tmp/test/dir7/dir5 /SHARED/tmp/test/dir7/dir6 /SHARED/tmp/test/dir7/dir7 /SHARED/tmp/test/dir7/dir8 /SHARED/tmp/test/dir7/dir9 Still pending in queue: 18
      While if I search directly within dir7 I get more subfolders reviled.
      # ./dpfilesearch.pl --object server:/SHARED --label '/SHARED (test)' - +-dir /SHARED/tmp/test/dir7 --recursive --threads 1 /SHARED/tmp/test/dir7/dir1 /SHARED/tmp/test/dir7/dir2 /SHARED/tmp/test/dir7/dir3 /SHARED/tmp/test/dir7/dir4 /SHARED/tmp/test/dir7/dir5 /SHARED/tmp/test/dir7/dir6 /SHARED/tmp/test/dir7/dir7 /SHARED/tmp/test/dir7/dir8 /SHARED/tmp/test/dir7/dir9 /SHARED/tmp/test/dir7/dir9/dir1 /SHARED/tmp/test/dir7/dir9/dir2 /SHARED/tmp/test/dir7/dir9/dir3 /SHARED/tmp/test/dir7/dir9/dir4 /SHARED/tmp/test/dir7/dir9/dir5 /SHARED/tmp/test/dir7/dir9/dir6 /SHARED/tmp/test/dir7/dir9/dir7 /SHARED/tmp/test/dir7/dir9/dir8 /SHARED/tmp/test/dir7/dir9/dir9 Still pending in queue: 9
      It is the same if I specify more threads than one. I add a $worker->pending() just after printing the output.

        Try this (again untested) code.

        I've yet to convince myself that there isn't a race condition here, but if there is, you'll probably find it by running it for real quicker than I will be running it mentally:

        #!/usr/bin/perl -w BEGIN { our($_pathname,$_filename) = ( $0 =~ m[(.*)/([^/]+)$] ) ? ( $1, $2 ) : ( ".", $0 ); push @INC,$_pathname; }; sub usage { print STDERR "\nERROR: $_[0]\nUsage:\n", <<"EndOfDescription"; $_filename Required Parameters: --filesystem 'host:dir' Filesystem with format + host:fs ex. host:/SHARED --label 'label' Label --dir 'directory' Directory to search Optional Parameters: --recursive Recursive search --maxCount 10000 Maximum allowed item count --threads 10 Maximul parallel jobs --exclude dir Can be specified muliple times EndOfDescription exit 2 } # ------------------------- # Required libraries # ------------------------- use strict; use Data::Dumper; use Getopt::Long; use Term::ANSIColor; use threads; use Thread::Queue; # ------------------------- # Global Variables # ------------------------- my $omnidb = '/opt/omni/bin/omnidb'; my @data :shared; my $maxNumberOfParallelJobs = 10; my $maxNumberOfItems = 10000; my $itemCount = 0; my $worker = Thread::Queue->new(); my $idle :shared = $maxNumberOfParallelJobs; # ------------------------- # Argument handling # ------------------------- my( $filesystem, $label, $directory, $recursive, $debug, @exclude ); Getopt::Long::Configure("pass_through"); GetOptions( q{filesystem=s} => \$filesystem, q{label=s} => \$label, q{dir=s} => \$directory, q{recursive!} => \$recursive, q{maxCount=i} => \$maxNumberOfItems, q{threads=i} => \$maxNumberOfParallelJobs, q{debug!} => \$debug, q{exclude=s} => \@exclude ); usage "Invalid argument(s)." if (grep {/^-/o } @ARGV); my( @args ) = @ARGV; if ( !($filesystem || $label || $directory) ) { usage "Not enough arguments." if (! @args ); } sub pullDataFromDbWithDirectory { my $_dir = $_[0]; if ($itemCount <= $maxNumberOfItems) { my @retval = grep { /^Dir|^File/ } qx($omnidb -filesystem $fil +esystem '$label' -listdir '$_dir'); foreach my $item (@retval) { $itemCount++; (my $filename = $item) =~ s/^File\s+|^Dir\s+|\n//g; my $file = "$_dir/$filename"; if (!($file ~~ @exclude)) { push(@data,$file); if ($item =~ /^Dir/) { $worker->enqueue( $file ); print "Add $file to queue\n" if $debug; } } } } } sub doOperation () { my $ithread = threads->tid(); while( my $folder = $worker->dequeue() ) { { lock $idle; --$idle } print "Read $folder from queue with thread $ithread\n" if $deb +ug; pullDataFromDbWithDirectory($folder); { lock $idle; ++$idle } }; } sub printData { foreach my $file (sort @data) { print "$file\n"; } if ($itemCount > $maxNumberOfItems) { print colored ['red on_black'], "\nWARNING: Maximum item count + of $itemCount / $maxNumberOfItems has been reached. Please adjust yo +ur filter\n"; } } # ------------------------- # Main # ------------------------- print "Exclude: " . Dumper( \@exclude ) if $debug; my @threads = map{ threads->create( \&doOperation ) } 1 .. $maxNumberOfParallelJobs; pullDataFromDbWithDirectory( $directory ); sleep 1 until $idle < $maxNumberOfParallelJobs; ## Wait for (some) thr +eads to get started sleep 1 until $idle == $maxNumberOfParallelJobs; ## wait until they a +re all done $worker->enqueue( (undef) x $maxNumberOfParallelJobs ); ## Tell'em to +stop $_->join for @threads; printData();

        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.
        The problem here I think is that as soon as the queue gets empty it hits the undefined entries. But after that, new entries are being pushed into the queue but get ignored because the threads were joined already at that time.

        Okay. So now we get to the crux of the real question you should have asked: How to determine when there are no more directories to be processed? (And thus, the threads can safely terminate.)

        You cannot have the main thread post the undefs once it has finished the initial population of the queue, because the worker threads will be adding to the queue as they process that initial population.

        And you cannot use the absence of queue entries (Q->pending == 0), because another thread may add another directory to the queue a millisecond later.

        This is the classic chicken & egg scenario. How can any of the worker threads know when they are finished, when at any given moment in time one of the other worker threads might queue an new directory to be processed?

        I don't have an answer for you yet. I'm not sure that there is an answer given the current methodology of your code.

        I'm sure that there is a better way, and I'll get back to you once I think of it.


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.