#!/usr/local/bin/perl -w BEGIN { use strict; use vars qw($dbh $sem $base_dir $PID $pid_file $pid_value); use DBI; use Devel::Carp qw(verbose); use Fcntl qw(:flock); use IPC::SysV qw(IPC_PRIVATE S_IRWXU IPC_CREAT SEM_UNDO ftok); use IPC::Semaphore; use Parallel::ForkManager; use Socket; if ( defined($pid_value) ) { return if ( $$ != $pid_value ); } $base_dir = './'; $pid_file = $base_dir . $0 . '.pid'; $pid_value = $$; open( PPID, "+> " . $pid_file ) or die ("$0 ($$): Can't open $pid_file for write: $! \n "); { $PID = *PPID; unless ( flock( $PID, LOCK_EX | LOCK_NB ) ) { die ( "$0 ($$): Can't get exclusive write-lock on $pid_file: $! \n " ); } print( $PID $$, "\n" ); } } END { # Suggested in CB to handle children calling END{} return if ( $$ != $pid_value ); unless ( flock( $PID, LOCK_UN ) ) { die ( "$0 ($$): Can't release exclusive write-lock on $pid_file: $! \n " ); } close($PID); } # -------------------------------------------------- # Main # -------------------------------------------------- $| = 1; my (@targetlist); my $MAX_PROCESSES = 7; my $pm = new Parallel::ForkManager($MAX_PROCESSES); { # # Load list of processing targets from database # } # # Initialize blocking code (from recent posting response) # my $flags = S_IRWXU; my $id = ftok( $0, 0 ); my $sem = new IPC::Semaphore( $id, 1, $flags ); unless ($sem) { # we appear to be the first one $sem = new IPC::Semaphore( $id, 1, $flags | IPC_CREAT ); $sem->setval( 0, 1 ); } foreach my $target (@targetlist) { $pm->start and next; my ($drh); # # Create each child its own handle to the database, # as they do not play well with each other's # # Perform level one processing foreach my $item (@level1results) { # Perform level two processing next if ($should_not_process); # # Blocking code # { # Lock the semaphore $sem->op( 0, -1, SEM_UNDO ); # # Do work that needs blocking... # { foreach my $entry (@level2results) { &insert_or_update( \$dbh, ( 'data' => 'more', 'where' => 'here' ) ); } } # # Unlock the semaphore # $sem->op( 0, 1, SEM_UNDO ); } # # End blocking code # } $dbh->disconnect; $pm->finish; } # Wait for any remaining children (should be none). $pm->wait_all_children; # -------------------------------------------------- # Subroutines # -------------------------------------------------- sub insert_or_update { my ( $dbh, %entry_list ) = @_; # # Performs SELECT to determine if UPDATE is appropriate, # or INSERT, then performs them # }