Category: Utility Code
Author/Contact Info aepage@users.sourceforge.net
Description: A work in progress. OO Based process manager This will start and manage processes, collect outputs, detect program closusers, and limit the number of concurrent processes.
The manager package itself.

=pod

=head1 Synopsis ProcessMgr

A perl object that manages multiple child processes occurring in paral
+lel.

ASSUMPTIONS:  The output from each child process will be line buffered
+.  That is
at each output, there is at least 1 line terminator

=cut

use strict ;

package ProcessMgr ;

use IO::Select ;
use FileHandle ;
use POSIX ":sys_wait_h" ;

=pod

=item new(@options)

=head3 Options

=item -max_processes => num (default 100)

     Maximum number of processes to run at any given time

=item -input_timeout => seconds (default 30)
        maximum time for output from children, forces a return
=cut

sub new {
    my($name) = shift ;
    my %opts = (
                     ##
                     ## Defaults
                     ##
                     -max_processes => 100,
                     -input_timeout => 60, # 60 seconds
                     ##
                     ## Override with user input
                     ##
                     @_ 
                     ) ;
    my($self) = { %opts } ;


    bless $self => $name ;

    return $self ;

} # new

=pod

=item nextProcess

   Starts the next process and returns ($pid, $outputHandle, $userData
+).
     returns (undef, undef, undef) if there are no more processes to s
+tart.
    
   A return of (undef, $outputHandle, undef | $userData) is possible i
+f
     we're connecting to a remote computer instead
    

=cut

sub nextProcess {
    my($self) = @_ ;
    my($pid, $outputHandle, $userData) ;

    return ($pid, $outputHandle, $userData) ;

}


=pod

=item run

    Starts managing processes.  Will run until all children have eithe
+r
exitted or timedout.

=cut

sub run {
    my($self) = @_ ;
    my($timeout, $cmd) ;
    my($rdHandles, $wrHandles, $exHandles) ;
    my($pid, $outputHandle, $data, $err, $userData, $select) ;
    ##
    ## Main Process Loop
    ##

    while( 1 ) {
            $timeout = 0 ;
            ##
            ## we will process any pending output, then return for the
+ next
            ## process handle.
            ##

        if( (scalar keys %{$self->{processRecords}}) < $self->{-max_pr
+ocesses} ) {
            ($pid, $outputHandle, $userData) = (undef, undef, undef) ;

            ($pid, $outputHandle, $userData) = $self->nextProcess() ;
            if( $outputHandle ) {
                $self->{processRecords}->{$outputHandle->fileno} = { -
+pid => $pid, -out => $outputHandle, -user => $userData } ;
            }
            else {
                $timeout = 0 ;
                ##
                ## we will process any pending output, then return for
+ the next
                ## process handle.
                ##
                
            }

            if( !$pid && !$outputHandle && !scalar keys %{$self->{proc
+essRecords}} ) {
                ##
                ## No more processes to start and no more children run
+ning
                ##
                return ;
            }
        }
        else {
            $timeout = $self->{inputTimeout} ;
        }
        
        $select = new IO::Select->new(map $_->{-out}, values %{$self->
+{processRecords}}) ;

        ($rdHandles, $wrHandles, $exHandles) = IO::Select::select($sel
+ect, $select, $select, $timeout) ;

        ##
        ## process any handles that may have an exception pending
        ##
        for( @$exHandles ) {
            my $rec = $self->{processRecords}->{$_->fileno} ;
            next unless $rec ;
            delete $self->{processRecords}->{$_->fileno} ;
            $self->OnChildException(@$rec{-pid, -out, -user}) ;
        } # for
        next if $exHandles && @$exHandles ; # process next group

        if( $timeout && ($rdHandles || !@$rdHandles) ) {
            # timed out, and no one's ready to read
            return ;
        }
        
        if( $self->can("OnChildWrite") ) {
            for( @$wrHandles ) {
                my($rec) = $self->{processRecords}->{$_->fileno} ;
                $self->OnChildWrite(@$rec{-pid, -out, -user}) ;
            }
        }
    
        ##
        ## Always done since a process will indicate that it's finishe
+d
        ## by closing it's output handle, which shows up a zero length
        ## read, so we always check this method
        ##
        for( @$rdHandles ) {
            my $rec = $self->{processRecords}->{$_->fileno} ;
            $data = <$_> ;
            if( !$data ) {
                delete $self->{processRecords}->{$_->fileno} ;
                ##
                ## handle closed, process exited
                ##
                if( $rec->{-pid} ) {
                    $err = waitpid($rec->{-pid}, WNOHANG) ;
                    $rec->{-status} = $? ; # capture output status
                }
                else {
                    $err = 0 ; # case where we're connected to an exte
+rnal socket
                    $rec->{-status} = 0 ;
                }
                $self->OnChildDone(@$rec{-pid, -status, -out, -user}) 
+;
                next ;
            }
            $self->OnChildOutput($rec->{-pid}, $data, $rec->{-user}) ;
        } # for
    
    } # while


}

=pod

=item OnChildOutput($pid, $data, $userData) 
        
        called when a child process has output

        $pid is the process id of the child that produced the output

        $data is the text received from the child

        $userData is the userdata returned from nextProcess when this 
+child was created

=cut
sub OnChildOutput {
    my($self, $pid, $data, $userData) = @_ ;

    print "$pid: $data\n" ;

}

sub OnChildException {
    my($self, $pid, $outh, $userData) = @_ ;

    print STDERR "$pid: reported an error:  $!\n" ;
}

sub OnChildDone {
    my($self, $pid, $status, $outh, $userData) = @_ ;
}

=pod

=head1 AUTHOR

aepage@users.sourceforge.net

=head1 LICENSE

          Copyright 2003, Andrew E. Page
         All rights reserved.

    This program is free software; you can redistribute it and/or modi
+fy
    it under the terms of either:

 a) the GNU General Public License as published by the Free
 Software Foundation; either version 1, or (at your option) any
 later version, or

 b) the "Artistic License" which comes with this Kit.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See either
    the GNU General Public License or the Artistic License for more de
+tails.


=cut

1 ;
A quick and dirty test script:

use FileHandle ;

package CommandList ;
use ProcessMgr ;

our(@ISA) = ProcessMgr ;

sub new {
    my($name) = shift ;
    my($self) = new ProcessMgr(@_) ;

    bless $self => $name ;

    return $self ;
}

sub addCommands {
    my($self) = shift ;

    push @{$self->{'command_list'}}, @_ ;

}

sub nextProcess {
    my($self) = @_ ;

    my($cmd) = shift @{$self->{'command_list'}} ;

    return (undef, undef, undef)  if( !$cmd ) ;

    my($pid, $h) ;

    $h = new FileHandle() ;

    $pid = open $h, "$cmd |" ; 

    return ($pid, $h) ;

    
}

sub OnChildDone {
    my($self, $pid, $status, $outh, $userData) = @_ ;

    printf("%4d done status = %d\n", $pid, $status) ;
    
}

1 ;

package main ;


my($mgr) = new CommandList(-max_processes => 2, #
                                                     ) ;

$mgr->addCommands("echo 1", "sleep 3", "echo 2", "sleep 10", "ls") ;

$mgr->run ;

print "done\n" ;
Replies are listed 'Best First'.
Re: Process Manager
by Anonymous Monk on Oct 22, 2003 at 18:33 UTC
    I'm sorry. I meant Parallel::Jobs in the above post (so many modules, so little brain).
Re: Process Manager
by Anonymous Monk on Oct 22, 2003 at 18:12 UTC
    I'm not sure what capabilities this gives me that Parallel::ForkManager doesn't already provide. Can you elaborate?