I come seeking understanding of an issue that has proven confusing at my present level of knowledge.
I have a large amount of information I am having to process from multiple sources. In order to try to speed up the process (since some of the processing time is waiting on the sources), I decided to try to use Parallel::ForkManager to process what I could in parallel. However, I need to make sure I did not have other instances of this running simultaneously, so I thought to try to use a PID file with a write-lock as a mechanism to prevent this. Because the version of the database I am having to deal with uses table-level locking, I figured that semaphores might be a way to let the children know when one or another was using the tables.
When I checked a test run earlier, however, I found that the PID file I had written was apparently several hundred kilobytes in length, consisting of multiple lines of the same PID file.
I think the code below (stripped of some of the processing) contains the relevant backbone of the script (there are some die() possibilities in places, but most of those are wrapped in eval()s). Any feedback would be greatly appreciated.
#!/usr/local/bin/perl -w BEGIN { use strict; use vars qw($dbh $sem $base_dir $PID $pid_file $pid_value); use DBI; use Devel::Carp qw(verbose); use Fcntl qw(:flock); use IPC::SysV qw(IPC_PRIVATE S_IRWXU IPC_CREAT SEM_UNDO ftok); use IPC::Semaphore; use Parallel::ForkManager; use Socket; if ( defined($pid_value) ) { return if ( $$ != $pid_value ); } $base_dir = './'; $pid_file = $base_dir . $0 . '.pid'; $pid_value = $$; open( PPID, "+> " . $pid_file ) or die ("$0 ($$): Can't open $pid_file for write: $! \n "); { $PID = *PPID; unless ( flock( $PID, LOCK_EX | LOCK_NB ) ) { die ( "$0 ($$): Can't get exclusive write-lock on $pid_file: $! \n " ); } print( $PID $$, "\n" ); } } END { # Suggested in CB to handle children calling END{} return if ( $$ != $pid_value ); unless ( flock( $PID, LOCK_UN ) ) { die ( "$0 ($$): Can't release exclusive write-lock on $pid_file: $! \n " ); } close($PID); } # -------------------------------------------------- # Main # -------------------------------------------------- $| = 1; my (@targetlist); my $MAX_PROCESSES = 7; my $pm = new Parallel::ForkManager($MAX_PROCESSES); { # # Load list of processing targets from database # } # # Initialize blocking code (from recent posting response) # my $flags = S_IRWXU; my $id = ftok( $0, 0 ); my $sem = new IPC::Semaphore( $id, 1, $flags ); unless ($sem) { # we appear to be the first one $sem = new IPC::Semaphore( $id, 1, $flags | IPC_CREAT ); $sem->setval( 0, 1 ); } foreach my $target (@targetlist) { $pm->start and next; my ($drh); # # Create each child its own handle to the database, # as they do not play well with each other's # # Perform level one processing foreach my $item (@level1results) { # Perform level two processing next if ($should_not_process); # # Blocking code # { # Lock the semaphore $sem->op( 0, -1, SEM_UNDO ); # # Do work that needs blocking... # { foreach my $entry (@level2results) { &insert_or_update( \$dbh, ( 'data' => 'more', 'where' => 'here' ) ); } } # # Unlock the semaphore # $sem->op( 0, 1, SEM_UNDO ); } # # End blocking code # } $dbh->disconnect; $pm->finish; } # Wait for any remaining children (should be none). $pm->wait_all_children; # -------------------------------------------------- # Subroutines # -------------------------------------------------- sub insert_or_update { my ( $dbh, %entry_list ) = @_; # # Performs SELECT to determine if UPDATE is appropriate, # or INSERT, then performs them # }
Update: Forgot to include the following info: perl = 5.005.03, Parallel::ForkManager = 0.7.4.
In reply to Parallel::ForkManager and BEGIN/END oddity...? by atcroft
| For: | Use: | ||
| & | & | ||
| < | < | ||
| > | > | ||
| [ | [ | ||
| ] | ] |