Slappy GNU Year and all that! :) , the holidays have given me time to
tackle this a bit closer. One thing that was worrying me was you seem to
be starting one thread per chunk of info from getData() , and replenishing
the pool with a new thread each time a worker finishes and joins (have we mentioned that joining to thread->self is v.v.bad).
So I played around with a pool of 'fulltime' workers that will perform the same
action ad-infinitum, a 'foremen' thread and the main thread.
Using my favorite breeding ground for URL's (squid.access.log) , I grabbed a list of 250 jpg image URLs to use as
the data to feed the threads. The foremen takes a line at a time from
the file urls and pushes it onto a queue. It might be wise
to limit the length of the pending queue for memory's sake ,but
I skipped that for this example.
Worker threads try to grab the next item from the url queue, if the
queue is empty then the thread sleeps otherwise it downloads the URL with LWP::Simple , then if $finished has not been
set , repeats the loop.
#!/usr/dev/perl/bin/perl -w
use strict;
use threads;
use threads::shared;
use threads::shared::queue;
use LWP::Simple;
use Data::Dumper;
$|=1;
my $results = new threads::shared::queue;
my $urls = new threads::shared::queue;
my $max_threads = 20;
my $finished : shared ;
my $in : shared;
my $out : shared;
my $total : shared;
$finished=0;
$in =0;
$out =0;
$total=0;
# Foreman arrives before workers?
threads->new( 'foremen' );
# Start all the workers
for (1..$max_threads) { threads->new( 'worker' ) };
# Main Loop;
do {
my $result = $results->dequeue_nb;
if ($result) {
$out++;
print $result,$/;
}
else { print "wait: total records $total , results returned $out\n";
+sleep 1 };
if ( $out == ( $total - $max_threads ) ) { $finished = 1 };
} until ($out == $total);
# Cleanup
print "Waiting for remaining threads to detach/exit\n";
my @threads;
do {
@threads = threads->list;
sleep 1;
} until ( 1 == scalar (@threads) ) && print "Exiting\n";
### Send in the subroutines ###
sub foremen {
open D , 'urls' || die ' screaming $!';
while ( <D> ) {
chomp;
$urls->enqueue($_);
$in++;
};
$total = $in;
(threads->self)->detach;
}
sub worker {
do {
my $url =$urls->dequeue_nb;
if ($url) {
$url =~ /([^\/]+)$/;
my $file = $1;
unless ( $file ) { print "Failed , $url \n" };
my $result = getstore ( $url , $file );
$results->enqueue( "$result|$url" );
}
else {
sleep 1
}
} until ( $finished );
print threads->tid , " - finished, detaching\n";
(threads->self)->detach;
}
|