tnttinxses has asked for the wisdom of the Perl Monks concerning the following question:

Have a daemon that runs at the top of every minute. I need some, hopefully quick guidance from a threading SME. I created a shared thread pool that is used multiple times throughout the program. It is loaded with MAXTHREADS that are invoked and used until signaled to terminate. My problem is in the first use of the pool.. Against each target location, I'm making about 20 calls to HEAD a file for size respective to the target. HOWEVER, I need the program to want and process the shared @Data array for that location before going back into the next array element. Currently it's trampling all over the activity to write out the file respective to the target.. Missing data, combined.. The goal is to use the one shared pool to just feed the waiting threads as needed.

Below is the majority of the main script with the daemon and extended processing removed. I have also added comments showing exactly where I'm having the problem


#!/usr/bin/perl -X use strict; use threads 1.39; use threads::shared; use Thread::Queue; use LWP::UserAgent; use HTTP::Request; use Data::Dumper; $Data::Dumper::Sortkeys=1; $Data::Dumper::Indent=1; use File::Basename; use Time::Local; use Getopt::Long; use POSIX qw(strftime); use ProcDaemon; use File::Path qw(make_path remove_tree); no warnings 'utf8'; #--------------------------------------------------------------------- +-- # Global CLI Variables #--------------------------------------------------------------------- +-- my $VERSION = '1.0'; my $HELP; my $VERBOSE; my $NAME = basename($0,qw(.pl)); my $LABEL = basename($0,qw(.pl)); my $SECONDS; my $MODE; my $TIME; #--------------------------------------------------------------------- +---- # Evaluate parameters that have been passed #--------------------------------------------------------------------- +---- GetOptions( 'verbose|v' => \$VERBOSE, 'help|h|?' => \$HELP, 'name|n=s' => \$NAME, 'lable|l=s' => \$LABEL, 'seconds|s=s' => \$SECONDS, 'mode|m=s' => \$MODE, 'time|t=s' => \$TIME, ); #--------------------------------------------------------------------- +-- # Global Variables # # Maximum working threads # Flag to inform all threads that application is terminating # Threads add their ID to this queue when they are ready for work # Also, when app terminates a -1 is added to this queue #--------------------------------------------------------------------- +-- my $MAX_THREADS = 50; my $TERM :shared = 0; my $IDLE_QUEUE = Thread::Queue->new(); #--------------------------------------------------------------------- +-- ### Signal Handling ### # # Gracefully terminate application on ^C or command line 'kill' #--------------------------------------------------------------------- +-- $SIG{'INT'} = $SIG{'TERM'} = sub { print(">>> Terminating <<<\n"); $TERM = 1; # Add -1 to head of idle queue to signal termination $IDLE_QUEUE->insert(0, -1); }; $MODE = uc($MODE); # Always make -m option uppercase my %months = qw (0 January 1 Febuary 2 March 3 April 4 May 5 June 6 July 7 August 8 September 9 October 10 November 11 December ); #--------------------------------------------------------------------- +--------- # Create a ref to contain all sorts of data throughout the program #--------------------------------------------------------------------- +--------- my @targets; my @Data :shared; my $ref; $ref->{cmd} = (split(/\//,$0))[-1]; $ref->{loaddir} = (split("$ref->{cmd}",$0))[0]; $ref->{dirname} = dirname($0); $ref->{seconds} = $SECONDS || 60; my $in = read_conf("$ref->{loaddir}","$ref->{cmd}"); usage() if ($HELP); usage("ERROR: Must pass $ref->{cmd} a -m option indicating which MODE +to run Daemon in!") if (!$MODE); chdir($ref->{dirname}); GETCOMPLETEPOOLLIST: # Target services from conf file { @{$ref->{services}} = split(/,/, $in->{services}); } GETDATACENTERLIST: # target Data Centers { @targets = split(/,/, $in->{datacenters}); } =comment by default run 'status' because that is most safe =cut push @ARGV, "status" unless @ARGV; exit control(@ARGV) ? 0 : 1; =comment work {} Subroutine is the main for this script. Nested inside is t +he while(){} =cut sub work { =comment ### INITIALIZE ### Thread work queues referenced by thread ID Create the thread pool Create a work queue for a thread Create the thread, and give it the work queue Remember the thread's work queue =cut my %work_queues; for (1..$MAX_THREADS) { my $work_q = Thread::Queue->new(); my $thr = threads->create('worker', $work_q); $work_queues{$thr->tid()} = $work_q; } SETMODETIME: # Set the inital starting point based on the mode of the +script { $ref->{time} = ($in->{$MODE} * 60); $ref->{time} = ($ref->{time} + (60 * 60)) if (strftime("%Z", local +time()) eq "MST"); } sleep_topmin(); # Start TOM regardless of when agent was started # Manage the thread pool until signalled to terminate while (! $TERM) { my $t; undef @Data; logit("Starting loop"); # Logging Routine. my $time = time(); # do not touch, needed for the 60 second lo +op print "".localtime()." $LABEL \n" if $VERBOSE; $ref->{epoch} = timelocal('',(localtime(time() - $ref->{time}) +)[1,2,3,4,5]); $ref->{d_epoch} = timelocal('',(localtime(time() - $ref->{time} + - (86400 * $in->{remove})))[1,2,3,4,5]); $ref->{time_url}= form_url($ref->{epoch}, "/../../custom_log/../"); =sub comment Log file format name YYYY.MM.DD.HH.mm.txt name.b.2013.10.30.12.15.txt =cut $ref->{suffix} = strftime "%F.%H.%M.txt", localtime($ref->{epo +ch}); # gmtime() $ref->{suffix} =~s/-/\./g; =comment The delete reference is used for deleting files and directories older then the days listed in the conf file. =cut $ref->{delete} = strftime "%F.%H.%M", localtime($ref->{d_epoch}); $ref->{delete} =~s/-/\./g; =comment %/name/2013/October/30/12 This secion simply creates directories that may not exist. This also prunes hour directories that are older then X (conf valu +e) =cut MAKEDIRECTORIES: { $ref->{dr} = (split(/\./,$ref->{suffix}))[0]."/". $months{(localtime($ref->{epoch}))[4]}."/". (split(/\./,$ref->{suffix}))[2]."/". (split(/\./,$ref->{suffix}))[3].""; $ref->{dd} = (split(/\./,$ref->{delete}))[0]."/". $months{(localtime($ref->{d_epoch}))[4]}."/". (split(/\./,$ref->{delete}))[2]."/". (split(/\./,$ref->{delete}))[3].""; foreach my $s (@{$ref->{services}}) { my $dn = "$in->{localbase}/$s/$ref->{dr}"; make_path("$dn") unless -d "$dn"; #Directory for files $ref->{dr2} = "$ref->{dr}/".(split(/\./,$ref->{suffix}))[4]."" +; # Directory for size validations make_path("$in->{localbase}/FILESIZES/$ref->{dr2}") unless -d +"$in->{localbase}/FILESIZES/$ref->{dr2}"; my $dd = "$in->{localbase}/$s/$ref->{dd}"; remove_tree("$dd"); } } =comment This follow section will download and parse out the landing page f +or the respective minute being queried. If exists in conf file it will b +e added. Additionally this section will get the current listed size of the +file and use that with MODE's other then base to check on any changes in th +e file. Note* The threads->create adds the request to the thread queue =cut GETURLDATAFILELIST: { for(my $x='0'; $x<=$#targets; $x++){ my @dc = $targets[$x] =~ /[-\.]([abp])/; my $ua = LWP::UserAgent->new(timeout => 10); my $req = HTTP::Request->new('GET', 'http://root'.$targets[$x] +.'.company.com'.$ref->{time_url}); my $response = $ua->request($req); foreach (grep {/a href="[\w\.-]+/} split /$/m, $response->as_s +tring()){ my ($gz,$sz) = ($_ =~/a href="([\w\.-]+)".*\s([\w\.]+)/) ? ($1 +,$2) : undef; if ((split /\./,$gz)[0] ~~ @{$ref->{services}}) { my $file = "$in->{localbase}/". (split /\./,$gz)[0]."/$ref->{dr}/". (split /\./,$gz)[0].".$dc[0].$ref->{suffix}"; $t->{$file} = "http://root$targets[$x].company.com$ref->{t +ime_url}/$gz"; my $tid = $IDLE_QUEUE->dequeue(); last if ($tid < 0); $work_queues{$tid}->enqueue({ url => "$t->{$file}", file => "$gz", action => 'SIZE', dc => "$targets[$ +x]"}); } } =comment PROBLEM FOR LOOP This is the PROBLEM area.. For each of the requests sent to the shared thread pool I need them to wait until all active/used threads (may onl +y use 10 of the 50 created) complete so I can write the file out for tha +t specific target. When that target is done, process the next target in the @targets arra +y. Currently, they are coliding with each other and not sure what I'm doi +ng wrong. @Data is a shared array and cleared at the end of each of the @targets + for statment =cut open(WR, ">$in->{localbase}/FILESIZES/$ref->{dr2}/$in->{filesi +zes}.$MODE.$targets[$x]"); # Open for Write print WR @Data; close WR; logit("Finished creating ".localtime($ref->{epoch})." File Siz +e File for $targets[$x]"); undef @Data; # Clear the shared array so that it can be used a +gain for the next for target } } # End GETURLDATAFILELIST Block =comment This section of the while loop works fine as there is no dependecies o +n the sequence order of the request and all threads are not shared. =cut =comment Need to put in the Sync validation piece here. Compare the $MODE +files created. if they match the byte counts, consider the file downloaded and re +move it from the list of files to download. =cut my $s; ($MODE eq "SYNC") ? eval{ for(my $x='0'; $x<=$#targets; $x++){ foreach my $mode ('SYNC','BASE') { my @data = readfile("$in->{localbase}/FILESIZES/$ref->{dr2 +}/$in->{filesizes}.$mode.$targets[$x]"); map{ $s->{$targets[$x]}->{(split('=',$_))[0]}->{$mode} = (split +('=',$_))[1]; } @data; } } }:undef; =comment This is the second use of the thread pool.. In this section, e +verything is unique with no dependencies so they can finish independently.. + =cut foreach my $files (keys %{$t}) { my $tid = $IDLE_QUEUE->dequeue(); last if ($tid < 0); ($MODE eq "SYNC") ? eval{ ($t->{$files} =~/http:\/\/root([\w\-\.]+).company.com.*\/(.*.t +xt.gz)/) ? eval{ ($s->{$1}->{$2}->{SYNC} == $s->{$1}->{$2}->{BASE}) ? logit +("$2 is in equal byte count") : eval{ logit("$2 is not in sync, downloading"); $work_queues{$tid}->enqueue({ url => "$t->{$files}" +, file => "$files", action => 'FILE'}); }; }: undef; } : eval { $work_queues{$tid}->enqueue({ url => "$t->{$files}" +, file => "$files", action => 'FILE'}); } } print "Finshed ".localtime()."\n" if $VERBOSE; logit("Finished loop"); undef $t; sleep_topmin(); } ## CLEANING UP ### #Signal all threads that there is no more work $work_queues{$_}->enqueue(-1) foreach keys(%work_queues); #Wait for all the threads to finish $_->join() foreach threads->list(); } exit(0); ### Thread Entry Point Subroutines ### # A worker thread sub worker { my ($work_q) = @_; # This thread's ID my $tid = threads->tid(); # Work loop do { # Indicate that were are ready to do work printf("Idle -> %2d\n", $tid); $IDLE_QUEUE->enqueue($tid); # Wait for work from the queue my $work = $work_q->dequeue(); # If no more work, exit last unless ($work); # Do some work while monitoring $TERM printf(" %2d <- Working\n", $tid); while ($work && ! $TERM) { use feature qw/switch/; given($work->{action}) { when (/SIZE/) { size("$work->{url}","$work->{file}", "$work->{dc}" +); } when (/FILE/) { download("$work->{url}","$work->{file}"); } default { undef } } undef $work; } } while (! $TERM); # All done printf("Finished -> %2d\n", $tid); } sub writetimestamp { open(OUT, ">$ref->{loaddir}$in->{timestampfile}.$MODE"); print OUT "$_[0]"; close OUT; } =comment Simple generic read file into array and return it =cut sub readfile { logit("Reading $_[0]"); open my $fh, $_[0] or logit("$_[0]: $!"); @_ = <$fh>; close $fh; return @_; } sub logit { open(LOG, ">>$ref->{loaddir}$LABEL.log") or die $!; print LOG "$MODE: ".localtime()." $_[0]\n"; close LOG; } =comment The control routine runs the daemon Make no changes to this =cut sub read_conf { my $_ = $_[0]; my @array; (open(IN, "$_".(split(/\./, $_[1]))[0].".conf")) ? eval { my @data = <IN>; close IN; @array = grep {!/^#|^\s+$/} @data; chomp @array; map {s/\n|\r\s+//} @array; }:usage("Could not load ".(split(/\./, $_[1]))[0].".conf! Ple +ase ensure that it exists in the same directory"); my %hash = map { (split(/=/,$_))[0] => (split(/=/,$_))[1] } @a +rray; return \%hash; } sub download{ my ($url, $file) = @_; my $request = HTTP::Request->new (GET => $url); $request->accept_decodable; my $ua = LWP::UserAgent->new (); my $response = $ua->request ($request); if ($response->is_success ()) { open(FILE, ">$file"); print FILE $response->decoded_content(charset => 'none'); close FILE; =comment Need to put in the -e validation that the file exists. If not, sp +awn a new thread to retry the file download. =cut (-e $file) ? eval { logit("Finished download, decode, and creation of ".(split('/' +, $file))[-1].""); runFilter($file); } : eval{ logit("FAILED download of ".(split('/', $file))[-1]." Trying a +gain."); #Will add back to thread pool to try again }; } else { print STDERR "get '$url' failed: ", $response->status_line, "\ +n"; #Will add back to thread pool to try again } } sub size { my ($url, $file, $dc) = @_; my $request = HTTP::Request->new (HEAD => $url); my $ua = LWP::UserAgent->new (); my $response = $ua->request ($request); if ($response->is_success ()) { push @Data, "$file=".$response->content_length().""; } else { logit("get '$url' failed: ", $response->status_line, "\n"); } } #--------------------------------------------------------------------- +--------- # Format the Time URL /2013/05-May/28/12:00:00/31/ # Data comes in as the modified epoch timestamp #--------------------------------------------------------------------- +--------- sub form_url { my ($epoch, $base) = ($_[0], $_[1]); my $time = localtime($epoch); my $url = "$base" .(split(/\s+/, $time))[4]."/" .sprintf("%02i",((localtime($epoch))[4] + 1))."-" .$months{(localtime($epoch))[4]}."/" .sprintf("%02i",(split(/\s+/, $time))[2])."/" .(split(/:/, (split(/\s+/, $time))[3]))[0].":00:00/" .(split(/:/, (split(/\s+/, $time))[3]))[1].""; return $url; } #--------------------------------------------------------------------- +--------- # Simple routine to return unique elements from withing a passed arra +y #--------------------------------------------------------------------- +--------- sub sleep_topmin { sleep(60 - (localtime)[0]) } #--------------------------------------------------------------------- +--------- # Simple for help and command line options #--------------------------------------------------------------------- +--------- sub usage { my $message = shift; print <<EOF; exit($_[0] || 0); -------------------------------------------------------------------- $ref->{cmd} v $VERSION : By J.... -------------------------------------------------------------------- $message Usage output $message EOF ; }

Replies are listed 'Best First'.
Re: Shared Thread Pool (used more then once,)
by zentara (Cardinal) on Nov 15, 2013 at 21:40 UTC
    I'm not able to understand your code, but just glancing at the code you have used undef on your shared array @Data numerous times.
    #for instance you have # Manage the thread pool until signalled to terminate while (! $TERM) { my $t; undef @Data; ################################# # and undef @Data; # Clear the shared array so that it can be used a +gain for the next for target
    Are you sure that undef'ing a shared array is a correct way of clearing out the array? I might think you would use lock on @Data, and then do @Data = ();

    That brings up locking shared variables, your symptoms are similar to those where unlocked shared variables get clobbered by other threads accessing @Data.

    See Threads: why locking is required when using shared variables

      Are you sure that undef'ing a shared array is a correct way of clearing out the array? I might think you would use lock on @Data, and then do @Data = '';

      undef works perfectly on shared arrays.

      Your method is completely broken, as it leaves the array with a single element containing a zero length string.


      With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.