The manager package itself.
=pod
=head1 Synopsis ProcessMgr
A perl object that manages multiple child processes occurring in paral
+lel.
ASSUMPTIONS: The output from each child process will be line buffered
+. That is
at each output, there is at least 1 line terminator
=cut
use strict ;
package ProcessMgr ;
use IO::Select ;
use FileHandle ;
use POSIX ":sys_wait_h" ;
=pod
=item new(@options)
=head3 Options
=item -max_processes => num (default 100)
Maximum number of processes to run at any given time
=item -input_timeout => seconds (default 30)
maximum time for output from children, forces a return
=cut
sub new {
my($name) = shift ;
my %opts = (
##
## Defaults
##
-max_processes => 100,
-input_timeout => 60, # 60 seconds
##
## Override with user input
##
@_
) ;
my($self) = { %opts } ;
bless $self => $name ;
return $self ;
} # new
=pod
=item nextProcess
Starts the next process and returns ($pid, $outputHandle, $userData
+).
returns (undef, undef, undef) if there are no more processes to s
+tart.
A return of (undef, $outputHandle, undef | $userData) is possible i
+f
we're connecting to a remote computer instead
=cut
sub nextProcess {
my($self) = @_ ;
my($pid, $outputHandle, $userData) ;
return ($pid, $outputHandle, $userData) ;
}
=pod
=item run
Starts managing processes. Will run until all children have eithe
+r
exitted or timedout.
=cut
sub run {
my($self) = @_ ;
my($timeout, $cmd) ;
my($rdHandles, $wrHandles, $exHandles) ;
my($pid, $outputHandle, $data, $err, $userData, $select) ;
##
## Main Process Loop
##
while( 1 ) {
$timeout = 0 ;
##
## we will process any pending output, then return for the
+ next
## process handle.
##
if( (scalar keys %{$self->{processRecords}}) < $self->{-max_pr
+ocesses} ) {
($pid, $outputHandle, $userData) = (undef, undef, undef) ;
($pid, $outputHandle, $userData) = $self->nextProcess() ;
if( $outputHandle ) {
$self->{processRecords}->{$outputHandle->fileno} = { -
+pid => $pid, -out => $outputHandle, -user => $userData } ;
}
else {
$timeout = 0 ;
##
## we will process any pending output, then return for
+ the next
## process handle.
##
}
if( !$pid && !$outputHandle && !scalar keys %{$self->{proc
+essRecords}} ) {
##
## No more processes to start and no more children run
+ning
##
return ;
}
}
else {
$timeout = $self->{inputTimeout} ;
}
$select = new IO::Select->new(map $_->{-out}, values %{$self->
+{processRecords}}) ;
($rdHandles, $wrHandles, $exHandles) = IO::Select::select($sel
+ect, $select, $select, $timeout) ;
##
## process any handles that may have an exception pending
##
for( @$exHandles ) {
my $rec = $self->{processRecords}->{$_->fileno} ;
next unless $rec ;
delete $self->{processRecords}->{$_->fileno} ;
$self->OnChildException(@$rec{-pid, -out, -user}) ;
} # for
next if $exHandles && @$exHandles ; # process next group
if( $timeout && ($rdHandles || !@$rdHandles) ) {
# timed out, and no one's ready to read
return ;
}
if( $self->can("OnChildWrite") ) {
for( @$wrHandles ) {
my($rec) = $self->{processRecords}->{$_->fileno} ;
$self->OnChildWrite(@$rec{-pid, -out, -user}) ;
}
}
##
## Always done since a process will indicate that it's finishe
+d
## by closing it's output handle, which shows up a zero length
## read, so we always check this method
##
for( @$rdHandles ) {
my $rec = $self->{processRecords}->{$_->fileno} ;
$data = <$_> ;
if( !$data ) {
delete $self->{processRecords}->{$_->fileno} ;
##
## handle closed, process exited
##
if( $rec->{-pid} ) {
$err = waitpid($rec->{-pid}, WNOHANG) ;
$rec->{-status} = $? ; # capture output status
}
else {
$err = 0 ; # case where we're connected to an exte
+rnal socket
$rec->{-status} = 0 ;
}
$self->OnChildDone(@$rec{-pid, -status, -out, -user})
+;
next ;
}
$self->OnChildOutput($rec->{-pid}, $data, $rec->{-user}) ;
} # for
} # while
}
=pod
=item OnChildOutput($pid, $data, $userData)
called when a child process has output
$pid is the process id of the child that produced the output
$data is the text received from the child
$userData is the userdata returned from nextProcess when this
+child was created
=cut
sub OnChildOutput {
my($self, $pid, $data, $userData) = @_ ;
print "$pid: $data\n" ;
}
sub OnChildException {
my($self, $pid, $outh, $userData) = @_ ;
print STDERR "$pid: reported an error: $!\n" ;
}
sub OnChildDone {
my($self, $pid, $status, $outh, $userData) = @_ ;
}
=pod
=head1 AUTHOR
aepage@users.sourceforge.net
=head1 LICENSE
Copyright 2003, Andrew E. Page
All rights reserved.
This program is free software; you can redistribute it and/or modi
+fy
it under the terms of either:
a) the GNU General Public License as published by the Free
Software Foundation; either version 1, or (at your option) any
later version, or
b) the "Artistic License" which comes with this Kit.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See either
the GNU General Public License or the Artistic License for more de
+tails.
=cut
1 ;
A quick and dirty test script:
use FileHandle ;
package CommandList ;
use ProcessMgr ;
our(@ISA) = ProcessMgr ;
sub new {
my($name) = shift ;
my($self) = new ProcessMgr(@_) ;
bless $self => $name ;
return $self ;
}
sub addCommands {
my($self) = shift ;
push @{$self->{'command_list'}}, @_ ;
}
sub nextProcess {
my($self) = @_ ;
my($cmd) = shift @{$self->{'command_list'}} ;
return (undef, undef, undef) if( !$cmd ) ;
my($pid, $h) ;
$h = new FileHandle() ;
$pid = open $h, "$cmd |" ;
return ($pid, $h) ;
}
sub OnChildDone {
my($self, $pid, $status, $outh, $userData) = @_ ;
printf("%4d done status = %d\n", $pid, $status) ;
}
1 ;
package main ;
my($mgr) = new CommandList(-max_processes => 2, #
) ;
$mgr->addCommands("echo 1", "sleep 3", "echo 2", "sleep 10", "ls") ;
$mgr->run ;
print "done\n" ;
|