#!/usr/bin/perl -X use strict; use threads 1.39; use threads::shared; use Thread::Queue; use LWP::UserAgent; use HTTP::Request; use Data::Dumper; $Data::Dumper::Sortkeys=1; $Data::Dumper::Indent=1; use File::Basename; use Time::Local; use Getopt::Long; use POSIX qw(strftime); use ProcDaemon; use File::Path qw(make_path remove_tree); no warnings 'utf8'; #----------------------------------------------------------------------- # Global CLI Variables #----------------------------------------------------------------------- my $VERSION = '1.0'; my $HELP; my $VERBOSE; my $NAME = basename($0,qw(.pl)); my $LABEL = basename($0,qw(.pl)); my $SECONDS; my $MODE; my $TIME; #------------------------------------------------------------------------- # Evaluate parameters that have been passed #------------------------------------------------------------------------- GetOptions( 'verbose|v' => \$VERBOSE, 'help|h|?' => \$HELP, 'name|n=s' => \$NAME, 'lable|l=s' => \$LABEL, 'seconds|s=s' => \$SECONDS, 'mode|m=s' => \$MODE, 'time|t=s' => \$TIME, ); #----------------------------------------------------------------------- # Global Variables # # Maximum working threads # Flag to inform all threads that application is terminating # Threads add their ID to this queue when they are ready for work # Also, when app terminates a -1 is added to this queue #----------------------------------------------------------------------- my $MAX_THREADS = 50; my $TERM :shared = 0; my $IDLE_QUEUE = Thread::Queue->new(); #----------------------------------------------------------------------- ### Signal Handling ### # # Gracefully terminate application on ^C or command line 'kill' #----------------------------------------------------------------------- $SIG{'INT'} = $SIG{'TERM'} = sub { print(">>> Terminating <<<\n"); $TERM = 1; # Add -1 to head of idle queue to signal termination $IDLE_QUEUE->insert(0, -1); }; $MODE = uc($MODE); # Always make -m option uppercase my %months = qw (0 January 1 Febuary 2 March 3 April 4 May 5 June 6 July 7 August 8 September 9 October 10 November 11 December ); #------------------------------------------------------------------------------ # Create a ref to contain all sorts of data throughout the program #------------------------------------------------------------------------------ my @targets; my @Data :shared; my $ref; $ref->{cmd} = (split(/\//,$0))[-1]; $ref->{loaddir} = (split("$ref->{cmd}",$0))[0]; $ref->{dirname} = dirname($0); $ref->{seconds} = $SECONDS || 60; my $in = read_conf("$ref->{loaddir}","$ref->{cmd}"); usage() if ($HELP); usage("ERROR: Must pass $ref->{cmd} a -m option indicating which MODE to run Daemon in!") if (!$MODE); chdir($ref->{dirname}); GETCOMPLETEPOOLLIST: # Target services from conf file { @{$ref->{services}} = split(/,/, $in->{services}); } GETDATACENTERLIST: # target Data Centers { @targets = split(/,/, $in->{datacenters}); } =comment by default run 'status' because that is most safe =cut push @ARGV, "status" unless @ARGV; exit control(@ARGV) ? 0 : 1; =comment work {} Subroutine is the main for this script. Nested inside is the while(){} =cut sub work { =comment ### INITIALIZE ### Thread work queues referenced by thread ID Create the thread pool Create a work queue for a thread Create the thread, and give it the work queue Remember the thread's work queue =cut my %work_queues; for (1..$MAX_THREADS) { my $work_q = Thread::Queue->new(); my $thr = threads->create('worker', $work_q); $work_queues{$thr->tid()} = $work_q; } SETMODETIME: # Set the inital starting point based on the mode of the script { $ref->{time} = ($in->{$MODE} * 60); $ref->{time} = ($ref->{time} + (60 * 60)) if (strftime("%Z", localtime()) eq "MST"); } sleep_topmin(); # Start TOM regardless of when agent was started # Manage the thread pool until signalled to terminate while (! $TERM) { my $t; undef @Data; logit("Starting loop"); # Logging Routine. my $time = time(); # do not touch, needed for the 60 second loop print "".localtime()." $LABEL \n" if $VERBOSE; $ref->{epoch} = timelocal('',(localtime(time() - $ref->{time}))[1,2,3,4,5]); $ref->{d_epoch} = timelocal('',(localtime(time() - $ref->{time} - (86400 * $in->{remove})))[1,2,3,4,5]); $ref->{time_url}= form_url($ref->{epoch}, "/../../custom_log/../"); =sub comment Log file format name YYYY.MM.DD.HH.mm.txt name.b.2013.10.30.12.15.txt =cut $ref->{suffix} = strftime "%F.%H.%M.txt", localtime($ref->{epoch}); # gmtime() $ref->{suffix} =~s/-/\./g; =comment The delete reference is used for deleting files and directories older then the days listed in the conf file. =cut $ref->{delete} = strftime "%F.%H.%M", localtime($ref->{d_epoch}); $ref->{delete} =~s/-/\./g; =comment %/name/2013/October/30/12 This secion simply creates directories that may not exist. This also prunes hour directories that are older then X (conf value) =cut MAKEDIRECTORIES: { $ref->{dr} = (split(/\./,$ref->{suffix}))[0]."/". $months{(localtime($ref->{epoch}))[4]}."/". (split(/\./,$ref->{suffix}))[2]."/". (split(/\./,$ref->{suffix}))[3].""; $ref->{dd} = (split(/\./,$ref->{delete}))[0]."/". $months{(localtime($ref->{d_epoch}))[4]}."/". (split(/\./,$ref->{delete}))[2]."/". (split(/\./,$ref->{delete}))[3].""; foreach my $s (@{$ref->{services}}) { my $dn = "$in->{localbase}/$s/$ref->{dr}"; make_path("$dn") unless -d "$dn"; #Directory for files $ref->{dr2} = "$ref->{dr}/".(split(/\./,$ref->{suffix}))[4].""; # Directory for size validations make_path("$in->{localbase}/FILESIZES/$ref->{dr2}") unless -d "$in->{localbase}/FILESIZES/$ref->{dr2}"; my $dd = "$in->{localbase}/$s/$ref->{dd}"; remove_tree("$dd"); } } =comment This follow section will download and parse out the landing page for the respective minute being queried. If exists in conf file it will be added. Additionally this section will get the current listed size of the file and use that with MODE's other then base to check on any changes in the file. Note* The threads->create adds the request to the thread queue =cut GETURLDATAFILELIST: { for(my $x='0'; $x<=$#targets; $x++){ my @dc = $targets[$x] =~ /[-\.]([abp])/; my $ua = LWP::UserAgent->new(timeout => 10); my $req = HTTP::Request->new('GET', 'http://root'.$targets[$x].'.company.com'.$ref->{time_url}); my $response = $ua->request($req); foreach (grep {/a href="[\w\.-]+/} split /$/m, $response->as_string()){ my ($gz,$sz) = ($_ =~/a href="([\w\.-]+)".*\s([\w\.]+)/) ? ($1,$2) : undef; if ((split /\./,$gz)[0] ~~ @{$ref->{services}}) { my $file = "$in->{localbase}/". (split /\./,$gz)[0]."/$ref->{dr}/". (split /\./,$gz)[0].".$dc[0].$ref->{suffix}"; $t->{$file} = "http://root$targets[$x].company.com$ref->{time_url}/$gz"; my $tid = $IDLE_QUEUE->dequeue(); last if ($tid < 0); $work_queues{$tid}->enqueue({ url => "$t->{$file}", file => "$gz", action => 'SIZE', dc => "$targets[$x]"}); } } =comment PROBLEM FOR LOOP This is the PROBLEM area.. For each of the requests sent to the shared thread pool I need them to wait until all active/used threads (may only use 10 of the 50 created) complete so I can write the file out for that specific target. When that target is done, process the next target in the @targets array. Currently, they are coliding with each other and not sure what I'm doing wrong. @Data is a shared array and cleared at the end of each of the @targets for statment =cut open(WR, ">$in->{localbase}/FILESIZES/$ref->{dr2}/$in->{filesizes}.$MODE.$targets[$x]"); # Open for Write print WR @Data; close WR; logit("Finished creating ".localtime($ref->{epoch})." File Size File for $targets[$x]"); undef @Data; # Clear the shared array so that it can be used again for the next for target } } # End GETURLDATAFILELIST Block =comment This section of the while loop works fine as there is no dependecies on the sequence order of the request and all threads are not shared. =cut =comment Need to put in the Sync validation piece here. Compare the $MODE files created. if they match the byte counts, consider the file downloaded and remove it from the list of files to download. =cut my $s; ($MODE eq "SYNC") ? eval{ for(my $x='0'; $x<=$#targets; $x++){ foreach my $mode ('SYNC','BASE') { my @data = readfile("$in->{localbase}/FILESIZES/$ref->{dr2}/$in->{filesizes}.$mode.$targets[$x]"); map{ $s->{$targets[$x]}->{(split('=',$_))[0]}->{$mode} = (split('=',$_))[1]; } @data; } } }:undef; =comment This is the second use of the thread pool.. In this section, everything is unique with no dependencies so they can finish independently.. =cut foreach my $files (keys %{$t}) { my $tid = $IDLE_QUEUE->dequeue(); last if ($tid < 0); ($MODE eq "SYNC") ? eval{ ($t->{$files} =~/http:\/\/root([\w\-\.]+).company.com.*\/(.*.txt.gz)/) ? eval{ ($s->{$1}->{$2}->{SYNC} == $s->{$1}->{$2}->{BASE}) ? logit("$2 is in equal byte count") : eval{ logit("$2 is not in sync, downloading"); $work_queues{$tid}->enqueue({ url => "$t->{$files}", file => "$files", action => 'FILE'}); }; }: undef; } : eval { $work_queues{$tid}->enqueue({ url => "$t->{$files}", file => "$files", action => 'FILE'}); } } print "Finshed ".localtime()."\n" if $VERBOSE; logit("Finished loop"); undef $t; sleep_topmin(); } ## CLEANING UP ### #Signal all threads that there is no more work $work_queues{$_}->enqueue(-1) foreach keys(%work_queues); #Wait for all the threads to finish $_->join() foreach threads->list(); } exit(0); ### Thread Entry Point Subroutines ### # A worker thread sub worker { my ($work_q) = @_; # This thread's ID my $tid = threads->tid(); # Work loop do { # Indicate that were are ready to do work printf("Idle -> %2d\n", $tid); $IDLE_QUEUE->enqueue($tid); # Wait for work from the queue my $work = $work_q->dequeue(); # If no more work, exit last unless ($work); # Do some work while monitoring $TERM printf(" %2d <- Working\n", $tid); while ($work && ! $TERM) { use feature qw/switch/; given($work->{action}) { when (/SIZE/) { size("$work->{url}","$work->{file}", "$work->{dc}"); } when (/FILE/) { download("$work->{url}","$work->{file}"); } default { undef } } undef $work; } } while (! $TERM); # All done printf("Finished -> %2d\n", $tid); } sub writetimestamp { open(OUT, ">$ref->{loaddir}$in->{timestampfile}.$MODE"); print OUT "$_[0]"; close OUT; } =comment Simple generic read file into array and return it =cut sub readfile { logit("Reading $_[0]"); open my $fh, $_[0] or logit("$_[0]: $!"); @_ = <$fh>; close $fh; return @_; } sub logit { open(LOG, ">>$ref->{loaddir}$LABEL.log") or die $!; print LOG "$MODE: ".localtime()." $_[0]\n"; close LOG; } =comment The control routine runs the daemon Make no changes to this =cut sub read_conf { my $_ = $_[0]; my @array; (open(IN, "$_".(split(/\./, $_[1]))[0].".conf")) ? eval { my @data = ; close IN; @array = grep {!/^#|^\s+$/} @data; chomp @array; map {s/\n|\r\s+//} @array; }:usage("Could not load ".(split(/\./, $_[1]))[0].".conf! Please ensure that it exists in the same directory"); my %hash = map { (split(/=/,$_))[0] => (split(/=/,$_))[1] } @array; return \%hash; } sub download{ my ($url, $file) = @_; my $request = HTTP::Request->new (GET => $url); $request->accept_decodable; my $ua = LWP::UserAgent->new (); my $response = $ua->request ($request); if ($response->is_success ()) { open(FILE, ">$file"); print FILE $response->decoded_content(charset => 'none'); close FILE; =comment Need to put in the -e validation that the file exists. If not, spawn a new thread to retry the file download. =cut (-e $file) ? eval { logit("Finished download, decode, and creation of ".(split('/', $file))[-1].""); runFilter($file); } : eval{ logit("FAILED download of ".(split('/', $file))[-1]." Trying again."); #Will add back to thread pool to try again }; } else { print STDERR "get '$url' failed: ", $response->status_line, "\n"; #Will add back to thread pool to try again } } sub size { my ($url, $file, $dc) = @_; my $request = HTTP::Request->new (HEAD => $url); my $ua = LWP::UserAgent->new (); my $response = $ua->request ($request); if ($response->is_success ()) { push @Data, "$file=".$response->content_length().""; } else { logit("get '$url' failed: ", $response->status_line, "\n"); } } #------------------------------------------------------------------------------ # Format the Time URL /2013/05-May/28/12:00:00/31/ # Data comes in as the modified epoch timestamp #------------------------------------------------------------------------------ sub form_url { my ($epoch, $base) = ($_[0], $_[1]); my $time = localtime($epoch); my $url = "$base" .(split(/\s+/, $time))[4]."/" .sprintf("%02i",((localtime($epoch))[4] + 1))."-" .$months{(localtime($epoch))[4]}."/" .sprintf("%02i",(split(/\s+/, $time))[2])."/" .(split(/:/, (split(/\s+/, $time))[3]))[0].":00:00/" .(split(/:/, (split(/\s+/, $time))[3]))[1].""; return $url; } #------------------------------------------------------------------------------ # Simple routine to return unique elements from withing a passed array #------------------------------------------------------------------------------ sub sleep_topmin { sleep(60 - (localtime)[0]) } #------------------------------------------------------------------------------ # Simple for help and command line options #------------------------------------------------------------------------------ sub usage { my $message = shift; print <{cmd} v $VERSION : By J.... -------------------------------------------------------------------- $message Usage output $message EOF ; }