in reply to Re^6: Proper undefine queue with multithreads
in thread Proper undefine queue with multithreads
Try this (again untested) code.
I've yet to convince myself that there isn't a race condition here, but if there is, you'll probably find it by running it for real quicker than I will be running it mentally:
#!/usr/bin/perl -w BEGIN { our($_pathname,$_filename) = ( $0 =~ m[(.*)/([^/]+)$] ) ? ( $1, $2 ) : ( ".", $0 ); push @INC,$_pathname; }; sub usage { print STDERR "\nERROR: $_[0]\nUsage:\n", <<"EndOfDescription"; $_filename Required Parameters: --filesystem 'host:dir' Filesystem with format + host:fs ex. host:/SHARED --label 'label' Label --dir 'directory' Directory to search Optional Parameters: --recursive Recursive search --maxCount 10000 Maximum allowed item count --threads 10 Maximul parallel jobs --exclude dir Can be specified muliple times EndOfDescription exit 2 } # ------------------------- # Required libraries # ------------------------- use strict; use Data::Dumper; use Getopt::Long; use Term::ANSIColor; use threads; use Thread::Queue; # ------------------------- # Global Variables # ------------------------- my $omnidb = '/opt/omni/bin/omnidb'; my @data :shared; my $maxNumberOfParallelJobs = 10; my $maxNumberOfItems = 10000; my $itemCount = 0; my $worker = Thread::Queue->new(); my $idle :shared = $maxNumberOfParallelJobs; # ------------------------- # Argument handling # ------------------------- my( $filesystem, $label, $directory, $recursive, $debug, @exclude ); Getopt::Long::Configure("pass_through"); GetOptions( q{filesystem=s} => \$filesystem, q{label=s} => \$label, q{dir=s} => \$directory, q{recursive!} => \$recursive, q{maxCount=i} => \$maxNumberOfItems, q{threads=i} => \$maxNumberOfParallelJobs, q{debug!} => \$debug, q{exclude=s} => \@exclude ); usage "Invalid argument(s)." if (grep {/^-/o } @ARGV); my( @args ) = @ARGV; if ( !($filesystem || $label || $directory) ) { usage "Not enough arguments." if (! @args ); } sub pullDataFromDbWithDirectory { my $_dir = $_[0]; if ($itemCount <= $maxNumberOfItems) { my @retval = grep { /^Dir|^File/ } qx($omnidb -filesystem $fil +esystem '$label' -listdir '$_dir'); foreach my $item (@retval) { $itemCount++; (my $filename = $item) =~ s/^File\s+|^Dir\s+|\n//g; my $file = "$_dir/$filename"; if (!($file ~~ @exclude)) { push(@data,$file); if ($item =~ /^Dir/) { $worker->enqueue( $file ); print "Add $file to queue\n" if $debug; } } } } } sub doOperation () { my $ithread = threads->tid(); while( my $folder = $worker->dequeue() ) { { lock $idle; --$idle } print "Read $folder from queue with thread $ithread\n" if $deb +ug; pullDataFromDbWithDirectory($folder); { lock $idle; ++$idle } }; } sub printData { foreach my $file (sort @data) { print "$file\n"; } if ($itemCount > $maxNumberOfItems) { print colored ['red on_black'], "\nWARNING: Maximum item count + of $itemCount / $maxNumberOfItems has been reached. Please adjust yo +ur filter\n"; } } # ------------------------- # Main # ------------------------- print "Exclude: " . Dumper( \@exclude ) if $debug; my @threads = map{ threads->create( \&doOperation ) } 1 .. $maxNumberOfParallelJobs; pullDataFromDbWithDirectory( $directory ); sleep 1 until $idle < $maxNumberOfParallelJobs; ## Wait for (some) thr +eads to get started sleep 1 until $idle == $maxNumberOfParallelJobs; ## wait until they a +re all done $worker->enqueue( (undef) x $maxNumberOfParallelJobs ); ## Tell'em to +stop $_->join for @threads; printData();
|
---|
Replies are listed 'Best First'. | |
---|---|
Re^8: Proper undefine queue with multithreads
by sanc (Initiate) on Jun 06, 2014 at 07:22 UTC | |
by BrowserUk (Patriarch) on Jun 06, 2014 at 11:16 UTC | |
by ikegami (Patriarch) on Jun 06, 2014 at 17:05 UTC | |
Re^8: Proper undefine queue with multithreads
by ikegami (Patriarch) on Jun 05, 2014 at 18:54 UTC | |
by Anonymous Monk on Jun 06, 2014 at 11:20 UTC | |
by ikegami (Patriarch) on Jun 06, 2014 at 17:02 UTC | |
by Anonymous Monk on Jun 07, 2014 at 09:08 UTC | |
by ikegami (Patriarch) on Jun 07, 2014 at 13:27 UTC | |
Re^8: Proper undefine queue with multithreads
by sanc (Initiate) on Jun 06, 2014 at 06:57 UTC |