Have a daemon that runs at the top of every minute. I need some, hopefully quick guidance from a threading SME. I created a shared thread pool that is used multiple times throughout the program. It is loaded with MAXTHREADS that are invoked and used until signaled to terminate. My problem is in the first use of the pool.. Against each target location, I'm making about 20 calls to HEAD a file for size respective to the target. HOWEVER, I need the program to want and process the shared @Data array for that location before going back into the next array element. Currently it's trampling all over the activity to write out the file respective to the target.. Missing data, combined.. The goal is to use the one shared pool to just feed the waiting threads as needed.
Below is the majority of the main script with the daemon and extended processing removed. I have also added comments showing exactly where I'm having the problem
#!/usr/bin/perl -X use strict; use threads 1.39; use threads::shared; use Thread::Queue; use LWP::UserAgent; use HTTP::Request; use Data::Dumper; $Data::Dumper::Sortkeys=1; $Data::Dumper::Indent=1; use File::Basename; use Time::Local; use Getopt::Long; use POSIX qw(strftime); use ProcDaemon; use File::Path qw(make_path remove_tree); no warnings 'utf8'; #--------------------------------------------------------------------- +-- # Global CLI Variables #--------------------------------------------------------------------- +-- my $VERSION = '1.0'; my $HELP; my $VERBOSE; my $NAME = basename($0,qw(.pl)); my $LABEL = basename($0,qw(.pl)); my $SECONDS; my $MODE; my $TIME; #--------------------------------------------------------------------- +---- # Evaluate parameters that have been passed #--------------------------------------------------------------------- +---- GetOptions( 'verbose|v' => \$VERBOSE, 'help|h|?' => \$HELP, 'name|n=s' => \$NAME, 'lable|l=s' => \$LABEL, 'seconds|s=s' => \$SECONDS, 'mode|m=s' => \$MODE, 'time|t=s' => \$TIME, ); #--------------------------------------------------------------------- +-- # Global Variables # # Maximum working threads # Flag to inform all threads that application is terminating # Threads add their ID to this queue when they are ready for work # Also, when app terminates a -1 is added to this queue #--------------------------------------------------------------------- +-- my $MAX_THREADS = 50; my $TERM :shared = 0; my $IDLE_QUEUE = Thread::Queue->new(); #--------------------------------------------------------------------- +-- ### Signal Handling ### # # Gracefully terminate application on ^C or command line 'kill' #--------------------------------------------------------------------- +-- $SIG{'INT'} = $SIG{'TERM'} = sub { print(">>> Terminating <<<\n"); $TERM = 1; # Add -1 to head of idle queue to signal termination $IDLE_QUEUE->insert(0, -1); }; $MODE = uc($MODE); # Always make -m option uppercase my %months = qw (0 January 1 Febuary 2 March 3 April 4 May 5 June 6 July 7 August 8 September 9 October 10 November 11 December ); #--------------------------------------------------------------------- +--------- # Create a ref to contain all sorts of data throughout the program #--------------------------------------------------------------------- +--------- my @targets; my @Data :shared; my $ref; $ref->{cmd} = (split(/\//,$0))[-1]; $ref->{loaddir} = (split("$ref->{cmd}",$0))[0]; $ref->{dirname} = dirname($0); $ref->{seconds} = $SECONDS || 60; my $in = read_conf("$ref->{loaddir}","$ref->{cmd}"); usage() if ($HELP); usage("ERROR: Must pass $ref->{cmd} a -m option indicating which MODE +to run Daemon in!") if (!$MODE); chdir($ref->{dirname}); GETCOMPLETEPOOLLIST: # Target services from conf file { @{$ref->{services}} = split(/,/, $in->{services}); } GETDATACENTERLIST: # target Data Centers { @targets = split(/,/, $in->{datacenters}); } =comment by default run 'status' because that is most safe =cut push @ARGV, "status" unless @ARGV; exit control(@ARGV) ? 0 : 1; =comment work {} Subroutine is the main for this script. Nested inside is t +he while(){} =cut sub work { =comment ### INITIALIZE ### Thread work queues referenced by thread ID Create the thread pool Create a work queue for a thread Create the thread, and give it the work queue Remember the thread's work queue =cut my %work_queues; for (1..$MAX_THREADS) { my $work_q = Thread::Queue->new(); my $thr = threads->create('worker', $work_q); $work_queues{$thr->tid()} = $work_q; } SETMODETIME: # Set the inital starting point based on the mode of the +script { $ref->{time} = ($in->{$MODE} * 60); $ref->{time} = ($ref->{time} + (60 * 60)) if (strftime("%Z", local +time()) eq "MST"); } sleep_topmin(); # Start TOM regardless of when agent was started # Manage the thread pool until signalled to terminate while (! $TERM) { my $t; undef @Data; logit("Starting loop"); # Logging Routine. my $time = time(); # do not touch, needed for the 60 second lo +op print "".localtime()." $LABEL \n" if $VERBOSE; $ref->{epoch} = timelocal('',(localtime(time() - $ref->{time}) +)[1,2,3,4,5]); $ref->{d_epoch} = timelocal('',(localtime(time() - $ref->{time} + - (86400 * $in->{remove})))[1,2,3,4,5]); $ref->{time_url}= form_url($ref->{epoch}, "/../../custom_log/../"); =sub comment Log file format name YYYY.MM.DD.HH.mm.txt name.b.2013.10.30.12.15.txt =cut $ref->{suffix} = strftime "%F.%H.%M.txt", localtime($ref->{epo +ch}); # gmtime() $ref->{suffix} =~s/-/\./g; =comment The delete reference is used for deleting files and directories older then the days listed in the conf file. =cut $ref->{delete} = strftime "%F.%H.%M", localtime($ref->{d_epoch}); $ref->{delete} =~s/-/\./g; =comment %/name/2013/October/30/12 This secion simply creates directories that may not exist. This also prunes hour directories that are older then X (conf valu +e) =cut MAKEDIRECTORIES: { $ref->{dr} = (split(/\./,$ref->{suffix}))[0]."/". $months{(localtime($ref->{epoch}))[4]}."/". (split(/\./,$ref->{suffix}))[2]."/". (split(/\./,$ref->{suffix}))[3].""; $ref->{dd} = (split(/\./,$ref->{delete}))[0]."/". $months{(localtime($ref->{d_epoch}))[4]}."/". (split(/\./,$ref->{delete}))[2]."/". (split(/\./,$ref->{delete}))[3].""; foreach my $s (@{$ref->{services}}) { my $dn = "$in->{localbase}/$s/$ref->{dr}"; make_path("$dn") unless -d "$dn"; #Directory for files $ref->{dr2} = "$ref->{dr}/".(split(/\./,$ref->{suffix}))[4]."" +; # Directory for size validations make_path("$in->{localbase}/FILESIZES/$ref->{dr2}") unless -d +"$in->{localbase}/FILESIZES/$ref->{dr2}"; my $dd = "$in->{localbase}/$s/$ref->{dd}"; remove_tree("$dd"); } } =comment This follow section will download and parse out the landing page f +or the respective minute being queried. If exists in conf file it will b +e added. Additionally this section will get the current listed size of the +file and use that with MODE's other then base to check on any changes in th +e file. Note* The threads->create adds the request to the thread queue =cut GETURLDATAFILELIST: { for(my $x='0'; $x<=$#targets; $x++){ my @dc = $targets[$x] =~ /[-\.]([abp])/; my $ua = LWP::UserAgent->new(timeout => 10); my $req = HTTP::Request->new('GET', 'http://root'.$targets[$x] +.'.company.com'.$ref->{time_url}); my $response = $ua->request($req); foreach (grep {/a href="[\w\.-]+/} split /$/m, $response->as_s +tring()){ my ($gz,$sz) = ($_ =~/a href="([\w\.-]+)".*\s([\w\.]+)/) ? ($1 +,$2) : undef; if ((split /\./,$gz)[0] ~~ @{$ref->{services}}) { my $file = "$in->{localbase}/". (split /\./,$gz)[0]."/$ref->{dr}/". (split /\./,$gz)[0].".$dc[0].$ref->{suffix}"; $t->{$file} = "http://root$targets[$x].company.com$ref->{t +ime_url}/$gz"; my $tid = $IDLE_QUEUE->dequeue(); last if ($tid < 0); $work_queues{$tid}->enqueue({ url => "$t->{$file}", file => "$gz", action => 'SIZE', dc => "$targets[$ +x]"}); } } =comment PROBLEM FOR LOOP This is the PROBLEM area.. For each of the requests sent to the shared thread pool I need them to wait until all active/used threads (may onl +y use 10 of the 50 created) complete so I can write the file out for tha +t specific target. When that target is done, process the next target in the @targets arra +y. Currently, they are coliding with each other and not sure what I'm doi +ng wrong. @Data is a shared array and cleared at the end of each of the @targets + for statment =cut open(WR, ">$in->{localbase}/FILESIZES/$ref->{dr2}/$in->{filesi +zes}.$MODE.$targets[$x]"); # Open for Write print WR @Data; close WR; logit("Finished creating ".localtime($ref->{epoch})." File Siz +e File for $targets[$x]"); undef @Data; # Clear the shared array so that it can be used a +gain for the next for target } } # End GETURLDATAFILELIST Block =comment This section of the while loop works fine as there is no dependecies o +n the sequence order of the request and all threads are not shared. =cut =comment Need to put in the Sync validation piece here. Compare the $MODE +files created. if they match the byte counts, consider the file downloaded and re +move it from the list of files to download. =cut my $s; ($MODE eq "SYNC") ? eval{ for(my $x='0'; $x<=$#targets; $x++){ foreach my $mode ('SYNC','BASE') { my @data = readfile("$in->{localbase}/FILESIZES/$ref->{dr2 +}/$in->{filesizes}.$mode.$targets[$x]"); map{ $s->{$targets[$x]}->{(split('=',$_))[0]}->{$mode} = (split +('=',$_))[1]; } @data; } } }:undef; =comment This is the second use of the thread pool.. In this section, e +verything is unique with no dependencies so they can finish independently.. + =cut foreach my $files (keys %{$t}) { my $tid = $IDLE_QUEUE->dequeue(); last if ($tid < 0); ($MODE eq "SYNC") ? eval{ ($t->{$files} =~/http:\/\/root([\w\-\.]+).company.com.*\/(.*.t +xt.gz)/) ? eval{ ($s->{$1}->{$2}->{SYNC} == $s->{$1}->{$2}->{BASE}) ? logit +("$2 is in equal byte count") : eval{ logit("$2 is not in sync, downloading"); $work_queues{$tid}->enqueue({ url => "$t->{$files}" +, file => "$files", action => 'FILE'}); }; }: undef; } : eval { $work_queues{$tid}->enqueue({ url => "$t->{$files}" +, file => "$files", action => 'FILE'}); } } print "Finshed ".localtime()."\n" if $VERBOSE; logit("Finished loop"); undef $t; sleep_topmin(); } ## CLEANING UP ### #Signal all threads that there is no more work $work_queues{$_}->enqueue(-1) foreach keys(%work_queues); #Wait for all the threads to finish $_->join() foreach threads->list(); } exit(0); ### Thread Entry Point Subroutines ### # A worker thread sub worker { my ($work_q) = @_; # This thread's ID my $tid = threads->tid(); # Work loop do { # Indicate that were are ready to do work printf("Idle -> %2d\n", $tid); $IDLE_QUEUE->enqueue($tid); # Wait for work from the queue my $work = $work_q->dequeue(); # If no more work, exit last unless ($work); # Do some work while monitoring $TERM printf(" %2d <- Working\n", $tid); while ($work && ! $TERM) { use feature qw/switch/; given($work->{action}) { when (/SIZE/) { size("$work->{url}","$work->{file}", "$work->{dc}" +); } when (/FILE/) { download("$work->{url}","$work->{file}"); } default { undef } } undef $work; } } while (! $TERM); # All done printf("Finished -> %2d\n", $tid); } sub writetimestamp { open(OUT, ">$ref->{loaddir}$in->{timestampfile}.$MODE"); print OUT "$_[0]"; close OUT; } =comment Simple generic read file into array and return it =cut sub readfile { logit("Reading $_[0]"); open my $fh, $_[0] or logit("$_[0]: $!"); @_ = <$fh>; close $fh; return @_; } sub logit { open(LOG, ">>$ref->{loaddir}$LABEL.log") or die $!; print LOG "$MODE: ".localtime()." $_[0]\n"; close LOG; } =comment The control routine runs the daemon Make no changes to this =cut sub read_conf { my $_ = $_[0]; my @array; (open(IN, "$_".(split(/\./, $_[1]))[0].".conf")) ? eval { my @data = <IN>; close IN; @array = grep {!/^#|^\s+$/} @data; chomp @array; map {s/\n|\r\s+//} @array; }:usage("Could not load ".(split(/\./, $_[1]))[0].".conf! Ple +ase ensure that it exists in the same directory"); my %hash = map { (split(/=/,$_))[0] => (split(/=/,$_))[1] } @a +rray; return \%hash; } sub download{ my ($url, $file) = @_; my $request = HTTP::Request->new (GET => $url); $request->accept_decodable; my $ua = LWP::UserAgent->new (); my $response = $ua->request ($request); if ($response->is_success ()) { open(FILE, ">$file"); print FILE $response->decoded_content(charset => 'none'); close FILE; =comment Need to put in the -e validation that the file exists. If not, sp +awn a new thread to retry the file download. =cut (-e $file) ? eval { logit("Finished download, decode, and creation of ".(split('/' +, $file))[-1].""); runFilter($file); } : eval{ logit("FAILED download of ".(split('/', $file))[-1]." Trying a +gain."); #Will add back to thread pool to try again }; } else { print STDERR "get '$url' failed: ", $response->status_line, "\ +n"; #Will add back to thread pool to try again } } sub size { my ($url, $file, $dc) = @_; my $request = HTTP::Request->new (HEAD => $url); my $ua = LWP::UserAgent->new (); my $response = $ua->request ($request); if ($response->is_success ()) { push @Data, "$file=".$response->content_length().""; } else { logit("get '$url' failed: ", $response->status_line, "\n"); } } #--------------------------------------------------------------------- +--------- # Format the Time URL /2013/05-May/28/12:00:00/31/ # Data comes in as the modified epoch timestamp #--------------------------------------------------------------------- +--------- sub form_url { my ($epoch, $base) = ($_[0], $_[1]); my $time = localtime($epoch); my $url = "$base" .(split(/\s+/, $time))[4]."/" .sprintf("%02i",((localtime($epoch))[4] + 1))."-" .$months{(localtime($epoch))[4]}."/" .sprintf("%02i",(split(/\s+/, $time))[2])."/" .(split(/:/, (split(/\s+/, $time))[3]))[0].":00:00/" .(split(/:/, (split(/\s+/, $time))[3]))[1].""; return $url; } #--------------------------------------------------------------------- +--------- # Simple routine to return unique elements from withing a passed arra +y #--------------------------------------------------------------------- +--------- sub sleep_topmin { sleep(60 - (localtime)[0]) } #--------------------------------------------------------------------- +--------- # Simple for help and command line options #--------------------------------------------------------------------- +--------- sub usage { my $message = shift; print <<EOF; exit($_[0] || 0); -------------------------------------------------------------------- $ref->{cmd} v $VERSION : By J.... -------------------------------------------------------------------- $message Usage output $message EOF ; }
In reply to Shared Thread Pool (used more then once,) by tnttinxses
| For: | Use: | ||
| & | & | ||
| < | < | ||
| > | > | ||
| [ | [ | ||
| ] | ] |