You may find this Forklist package handy which I use to process large lists in parallel. Assuming you have a file(s) you want to process in parallel and safely append data from the kids to output file(s) it provides the infrastructure.
use ForkList;
our $URLS;
for $list(@LISTS) {
$URLS = get_lines($list);
run( \&callback, scalar(@$URLS), { kids => $KIDS, verbose => $VERB
+OSE } );
}
sub callback {
my ( $child, $i ) = @_;
my $url = $URLS->[$i];
my( $code ) = cache_url( $url, $FORCE, $TIMEOUT );
append_file( "$LIST.$code", "$url\n", $child, $i, $VERBOSE );
return 0;
}
Here is the package.
package ForkList;
use strict;
$|++;
use POSIX qw[ WNOHANG ];
use Fcntl ':flock';
use Time::HiRes 'usleep';
use vars qw ( @ISA @EXPORT $VERSION );
$VERSION = "0.01";
require Exporter;
@ISA = qw(Exporter);
@EXPORT = qw( run get_lines append_file );
# this function allows you to parralel process N lines of data stored
# in an array using N kids (default 25). It makes a callback to the
# caller with args ( CHILD_NUM, INDEX, DBH ).
# The DBH is optional and needs to be passed as an option
# { dbh => \&connect_db }
# when passed the DBH option each kid will have its own DBH to play wi
+th
sub run {
my ( $CALLBACK, $LINES, $OPTS ) = @_;
my $KIDS = $OPTS->{kids} ? $OPTS->{kids} : 25;
my $VERBOSE = $OPTS->{verbose} ? $OPTS->{verbose} : 0;
my $DBH = $OPTS->{dbh} ? $OPTS->{dbh} : 0;
my $SLEEP = $OPTS->{sleep} ? $OPTS->{sleep} : 0;
my $start = time();
# we only want as many kids as there are lines as a max
$KIDS = $LINES if $KIDS > $LINES;
for my $child ( 0 .. $KIDS - 1 ) {
my $pid = fork();
defined $pid or do { warn 'Fork Failed!'; sleep 1; next };
if ($pid == 0) {
# Child Process
warn "Starting child $child of $KIDS\n" if $VERBOSE;
# give each kid its own database handle if required
my $dbh = &$DBH if $DBH;
# make sure we disconnect from db no matter how we exit
local $SIG{TERM} = sub {
$dbh->disconnect() if $dbh;
warn "Kid $child Killed!\n";
exit 1
};
local $SIG{INT} = $SIG{TERM};
# get every nth page from the array depending on child id
for ( my $i = $child ; $i < $LINES; $i += $KIDS ) {
warn "Child $child is is processing [$i]\n" if $VERBOS
+E > 1;
# catch any errors that occur during callback
eval{ &$CALLBACK( $child, $i, $dbh ) };
$SLEEP && usleep($SLEEP);
}
warn "Child $child has finished and is exiting\n" if $VER
+BOSE;
$dbh->disconnect() if $dbh;
exit 0; # prevent child from spawning more
}
}
warn 'Waiting for children...' if $VERBOSE;
usleep(500) until waitpid( -1, &WNOHANG ) == -1;
warn sprintf "Done in %d seconds!\n", time()-$start if $VERBOSE;
}
sub get_lines {
my $file = shift;
open F, $file or die "Can't open $file $!\n";
chomp( my @lines = <F> );
close F;
return \@lines;
}
# this function allows kids to safely append to a file
# we are using locking to avoid race conditions
sub append_file {
my ( $file, $data, $child, $i, $VERBOSE ) = @_;
open OUT, ">>$file" or die "Can't write $file $!\n";
warn "Child $child waiting for lock at $i\n" if $VERBOSE && $VERBO
+SE > 2;
flock(OUT, LOCK_EX) or die "Child $child could not lock at $i $!\n
+";
seek OUT,0,2; # make sure we are EOF before appending
print OUT $data;
close OUT; # will release lock
}
1;
|