in reply to Re: Queuing in multithread context
in thread Queuing in multithread context

Hi,

Sorry if I'm not clear, as explained a bit earlier my goal in using queued thread is to ensure only one copy task is processed to remote server at a time.

I agree, the error is not perl related, but related to my `psexec` instruction that copy the batch script prior executing it. But it's a side effect of having all my copy threads sent all at the same time to the remote server, this I try to bypass using queued thread. Does it make sens to you ?

Here is the full "working" script:

#!/usr/bin/perl # ----------------------- # Deploy Script with actions # -> require a ini file as argument # -------------------------------------- use strict; use warnings; use Config; use lib "C:/CUSTOM/LIB"; use threads; use Thread::Queue; use Data::Dumper qw(Dumper); use Log::Log4perl; use Log::Log4perl::Level; use Config::IniFiles; use POSIX qw(strftime); use Getopt::Long; use File::Basename; use Net::Ping; #------------------------- # Variables definition #------------------------- my $LOG4PERLINIT = "C:/CUSTOM/CONFIG/Log4Perl.conf"; my $LOGPATH = "C:/CUSTOM/LOGS/"; my $TMPEXT=".tmp" ; my $ROLLBACKEXT=".rollback"; my $DEFAULTEXETOKILL="SomeProcess"; my $DEFAULTCOPYSCRIPT="C:/CUSTOM/cmd/deploy_files.cmd"; my $DEFAULTROLLBACKSCRIPT="C:/CUSTOM/cmd/rollback_deploy.cmd"; my ($displayHelp, $configFile, $flagDeploy, $flagNoMove, $flagRollback +, $section, $srvRef, $processToKill, $copyScript, $rollbackScript, $u +ser, $pass) = ""; my $logger = undef ; my %config; my %threadDetails ; my @deployments ; my @targets ; my @Threads ; my @ThreadsQueue ; my @runningThreads ; #--------------------------------------------------------------------- +--- # Subroutines definition #--------------------------------------------------------------------- +--- # Help function sub Usage { print STDERR << "EOF"; usage: $0 [-h] [-c /path/to/ini_file.ini] [--deploy] [--novemove] [--r +ollback] -h|--help : this (help) message -c|--config : path to configuration ini file -d|--deploy : deploy the files to a temporary directory, ki +ll process and move the temporary as destination old destination will rem +ain as .rollback for rollback purpose -n|--nomove : only dispatch the file in the temporary directo +ry without moving them as reference -r|--rollback : kill the process and move the .rollback as +reference, the old content is removed This script require a configuration file path as argument and at least + one action [--deploy] or [--rollback] Example: $0 --config D:\\CONFIG\\Livraison_Citri\\Livraison_Europe.ini + --deploy EOF if ( @_ == 1 ) { exit($_[0]); } else { exit(0); } } # Exit on unsupported argument sub unsupported_arg { print "[".strftime('%Y/%m/%d %H:%M:%S',localtime(time()))."] | + ERROR | Unexpected argument:". $_[0] ; Usage(1); } # Function used to generate a logfile name sub getLogFileName { #if (defined $logFileName) { return getReplacedPattern($logFileNam +e, time()); } return "$LOGPATH" . fileparse($0) . '.trace.' . &strftime("%Y%m%d" +, localtime(time())) . '.log'; } # For each "DEPLOY-*" section: a Destination and a Source parameters +are required sub checkDeploySection { my $sectionToCheck = $_[0]; if (exists($config{$sectionToCheck}{"Destination"})) { if ( $config{$sectionToCheck}{'Destination'} eq "" ) { $logger->warn("No value defined for parameter 'Destination' in s +ection [$sectionToCheck], section will be ignored"); return 1; } } else { $logger->warn("No parameter 'Destination' [val: ".$config{$section +ToCheck}{'Destination'}." defined in section [$sectionToCheck], secti +on will be ignored"); return 1; } if (exists($config{$sectionToCheck}{"Source"})) { if ( $config{$sectionToCheck}{'Source'} eq "" ) { $logger->warn("No value defined for parameter 'Source' in sectio +n [$sectionToCheck], section will be ignored"); return 1; } else { $logger->warn("Source directory [".$config{$sectionToCheck}{'Sou +rce'}."] in [$sectionToCheck] not found or unreachable, section will +be ignored") if ( ! -d "$config{$sectionToCheck}{'Source'}" ); return 1; } } else { $logger->warn("No parameter 'Source' defined in section [$sectionT +oCheck], section will be ignored"); return 1; } return 0; } # Routine to check configuration provided in the ini file sub checkConfig { if (exists($config{'Config'}{'User'})) { if ( $config{'Config'}{'User'} ne "" ) { $user = $config{'Config'}{'User'} ; $logger->info("User that will execute the deploy cmd is [$user]" +) ; } } else { $logger->logdie("No 'User' parameter provided in [Config] for scri +pt execution, please fix"); } if (exists($config{'Config'}{'Password'})) { if ( $config{'Config'}{'Password'} ne "" ) { $pass = $config{'Config'}{'Password'} ; } } else { $logger->logdie("No 'Password' parameter provided in [Config] for +script execution, please fix"); } if (exists($config{'Config'}{'Srv_Ref'})) { if ( $config{'Config'}{'Srv_Ref'} ne "" ) { $srvRef = $config{'Config'}{'Srv_Ref'} ; $logger->info("Reference server (for archiving) will be [$srvRef +]") ; } } else { $logger->warn("No reference server provided [Config] parameter [Sr +v_Ref] provided, no archiving will be done"); } if (exists($config{'Config'}{'ExeToKill'})) { if ( $config{'Config'}{'ExeToKill'} ne "" ) { $processToKill = $config{'Config'}{'ExeToKill'} ; $logger->info("Deploy script will kill process [$processToKill]" +) ; } } else { $logger->warn("No process name to kill provided, will used default + value [$DEFAULTEXETOKILL] "); $processToKill = $DEFAULTEXETOKILL; } if (exists($config{'Config'}{'CopyScript'})) { if ( $config{'Config'}{'CopyScript'} ne "" ) { $copyScript = $config{'Config'}{'CopyScript'} ; if ( ! -e $copyScript ) { $logger->logdie("Deploy script [$copyScript] does not exist.") + ; } else { $logger->info("Deploy script will call copy script [$copyScrip +t]") ; } } } else { $copyScript = $DEFAULTCOPYSCRIPT; if ( ! -e $copyScript ) { $logger->logdie("Default deploy [$copyScript] does not exist .") + ; } else { $logger->warn("No value for copy script provided, will use defau +lt [$DEFAULTCOPYSCRIPT] "); } } if (exists($config{'Config'}{'RollbackScript'})) { if ( $config{'Config'}{'RollbackScript'} ne "" ) { $rollbackScript = $config{'Config'}{'RollbackScript'} ; if ( ! -e $rollbackScript ) { $logger->logdie("Rollback script [$rollbackScript] does not ex +ist. ") ; } else { $logger->info("Deploy script will call rollback script (if spe +cified) [$rollbackScript]") ; } } } else { $rollbackScript = $DEFAULTROLLBACKSCRIPT; if ( ! -e $rollbackScript ) { $logger->logdie("Rollback script [$rollbackScript] does not exis +t. ") ; } else { $logger->info("Deploy script will call rollback script (if speci +fied) [$rollbackScript]") ; } } } # Function called to ensure deploy section is properly formated sub checkServeursSection { if (exists($config{"Serveurs"}) ) { if (exists($config{"Serveurs"}{"Targets"})) { # Ensure we do not have only one target since it would mess up t +he array... if (ref($config{"Serveurs"}{"Targets"}) eq "HASH") { foreach my $target (keys $config{"Serveurs"}{"Targets"}) { my $server = $config{"Serveurs"}{"Targets"}[$target] ; if( $server !~ /^[-|#]/i ) { push(@targets, $server); } } } else { my $server = scalar($config{"Serveurs"}{"Targets"}) ; push(@targets, $server); } } else { $logger->logdie("No parameter 'Targets' found in section [Serveu +rs], nowhere to deploy" ); } } else { $logger->logdie("No section [Serveurs] found in ini file, nowhere +to deploy" ); } $logger->logdie("No value for 'Targets' parameter in section [Serveu +rs], nowhere to deploy" ) if (@targets == 0 ); } # Main subroutine, here's where we will deploy content sub deployRoutine($$$;$) { my ($remoteHost, $sourceDir, $destDir, $flagNoMove) = @_ ; $flagNoMove="" unless ($flagNoMove); my $cmd = "psexec \\\\$remoteHost /accepteula -u $user -p $pass -h - +e -n 3 -f -c $copyScript $sourceDir \"$destDir\" $TMPEXT $ROLLBACKEXT + $processToKill $flagNoMove" ; my $returnCode = `$cmd`; return $returnCode ; } #--------------------------------------------------------------------- +--- # Main #--------------------------------------------------------------------- +--- # Firt check if we can run with threads, if not... then exit $Config{useithreads} or die "[".strftime('%Y/%m/%d %H:%M:%S',localtime(time()))."] | ERROR | + Perl wasn't built with threads enable, install it with threads first +. "; # Usual check for the call of the script GetOptions ( 'help|h|?' => \$displayHelp, 'c|config=s' => \$configFile, 'd|deploy' => \$flagDeploy, 'n|nomove' => \$flagNoMove, 'r|rollback' => \$flagRollback, '<>' => \&unsupported_arg ); if ( ! -e $configFile ) { print "[".strftime('%Y/%m/%d %H:%M:%S',localtime(time()))."] | ERROR + | Configuration file provided do not exist [".$configFile."] \n" ; exit 1; } # We source and check the ini file tie %config, 'Config::IniFiles', ( -file => "$configFile" ) or die "[".strftime('%Y/%m/%d %H:%M:%S',localtime(time()))."] | ERRO +R | Failed to load configuration file [".$configFile."] \n" ; # Check that there is a Config section in the ini file if (!exists($config{'Config'})) { print "[".strftime('%Y/%m/%d %H:%M:%S',localtime(time()))."] | ERROR + | No [Config] section in provided ini file \n" ; exit(1); } # If ini file contains LOG_PATH instruction we use it (see default abo +ve) if (exists($config{'Config'}{'REP_LOG'})) { if ( $config{'Config'}{'REP_LOG'} ne "" ) { $LOGPATH = $config{'Config'}{'REP_LOG'} ; print "[".strftime('%Y/%m/%d %H:%M:%S',localtime(time()))."] | INF +O | Using configured log path [".$LOGPATH."] \n" ; } } # Then we initiate the logger Log::Log4perl::init($LOG4PERLINIT); $logger = Log::Log4perl->get_logger('APP-BFI-IEC_Log'); $logger->level(uc('TRACE')); # Check if arguments make sens if ($flagRollback && $flagDeploy || $flagRollback && $flagNoMove) { $logger->logdie("Action [--rollback] can't be called "); } Usage(1) unless($flagDeploy || $flagRollback) ; # Check configuration in the ini file checkConfig(); # Check defintion for each deploy section checkServeursSection(); # We generate the of deployments based on DEPLOY- section(s) in the in +i file # deploy section can be commented while starting with - or # and thu +s will be skipped foreach $section (keys %config) { if ($section =~ /deploy-/i and $section !~ /^[-|#]/i ) { push(@deployments, $section); } } # And if no DEPLOY- section exist, we quit $logger->logdie("Nothing to deliver, no section [Deploy-*] found in in +i file") unless (scalar(@deployments) >0 ) ; # Check for each deploy- section if it is properly defined foreach my $deploySection (@deployments) { # Error found, we'll skip the deploy section checkDeploySection($deploySection) ; if ( $? == 1 ) { my $index=0; $index++ until $deployments[$index] eq "$deploySection"; splice(@deployments, $index, 1); } else { $logger->info("Validated section for [".$deploySection."] "); } } # Now we go for the deployment foreach my $targetServer (@targets) { # Ping host to check its available my $p = Net::Ping->new(); if ($p->ping($targetServer)) { $logger->info("Target [$targetServer] is alive, starting deploymen +t "); foreach my $deploySection (@deployments) { if ($flagDeploy) { $logger->info("Starting deployment for [$deploySection] on tar +get [$targetServer] "); my $deploySource = "$config{$deploySection}{'Source'}" ; my $deployDest = "$config{$deploySection}{'Destination'}" ; if ($flagNoMove) { my $thread = threads->new(sub {deployRoutine($targetServer, +$deploySource, $deployDest, "true")}); push (@Threads, $thread); $threadDetails{$thread}{"Section"} = "$deploySection"; $threadDetails{$thread}{"Target"} = "$targetServer"; } else { my $thread = threads->new(sub {deployRoutine($targetServer, +$deploySource, $deployDest)}); push (@Threads, $thread); $threadDetails{$thread}{"Section"} = "$deploySection"; $threadDetails{$thread}{"Target"} = "$targetServer"; } } } } else { $logger->warn("Couldn't ping target [$targetServer] host will be i +gnored "); } $p->close(); } # We now handle open threads, waiting for each completion @runningThreads = threads->list(threads::running); while (scalar @runningThreads != 0) { foreach my $thr (@Threads) { if ($thr->is_joinable()) { $threadDetails{$thr}{"EndStatus"} = $thr->join(); } } @runningThreads = threads->list(threads::running); } exit 0;

And here is a sample of ini file the script is parsing and provided as parameter to the script:

[Config] Srv_Ref=myserver1 User=someuser Password=somepassword CopyScript=C:\CUSTOM\cmd\deploy_files.cmd RollbackScript=C:\CUSTOM\cmd\rollback_deploy.cmd ExeToKill=someprocess REP_LOG=C:\CUSTOM\LOGS\ [Serveurs] Targets=<<EOT myserver1 myserver2 myserver3 myserver4 myserver5 EOT [DEPLOY-1] Source=\\dom.loc\share\Soft\Common1 Destination=C:\Program Files\Soft\Common1 Rep_Exclus= Fic_Exclus= [DEPLOY-2] Source=\\dom.loc\share\Soft\Common2 Destination=C:\Program Files\Net\dll2 Rep_Exclus= Fic_Exclus= [DEPLOY-3] Source=\\dom.loc\share\Soft\Common3 Destination=C:\Program Files\Loc\Common3 Rep_Exclus=SomeDir Fic_Exclus=Thumbs.db

I hope this sched some light ? Please be patient as I'm still figuring some stuff out but I'll try to better explain my needs in the future.
Thanks !

Replies are listed 'Best First'.
Re^3: Queuing in multithread context
by BrowserUk (Patriarch) on Jan 20, 2015 at 13:28 UTC

    Okay. Now you've shown us the sample config file, it shows that you are running each of multiple DEPLOY sections on each of the servers, each in a separate thread; hence your conflicts.

    The simple solution to your problem is to only start one thread for each server; and call the deployRoutine() multiple times within that thread; serially.

    Ie. Something like this:

    foreach my $targetServer (@targets) { # Ping host to check its available my $p = Net::Ping->new(); if ($p->ping($targetServer)) { my @subs; ### Accumulate the deployRoutines here foreach my $deploySection (@deployments) { if ($flagDeploy) { my $deploySource = "$config{$deploySection}{'Source'}" ; my $deployDest = "$config{$deploySection}{'Destination'}" +; if ($flagNoMove) { ## Add this sub to the array to be executed for this s +erver push @subs, sub { deployRoutine($targetServer, $deploy +Source, $deployDest, "true") }; $threadDetails{$thread}{"Section"} = "$deploySection"; $threadDetails{$thread}{"Target"} = "$targetServer"; } else { ## Add this sub to the array to be executed for this s +erver push @subs, sub { deployRoutine( $targetServer, $deplo +ySource, $deployDest ) }; $threadDetails{$thread}{"Section"} = "$deploySection"; $threadDetails{$thread}{"Target"} = "$targetServer"; } } } ## Now start one thread to execute them all; serially push @threads, threads->new( sub { $_->() for @subs; } ); $p->close(); }

    Note: That is obviously untested code in the context of your application, but in isolation, the principle works:

    { use threads; my @subs = map{ eval "sub{ print $_; }" } 1 .. 10; threads->new( sub{ $_->() for @subs; } ); sleep 10; };; 1 2 3 4 5 6 7 8 9 10

    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority". I'm with torvalds on this
    In the absence of evidence, opinion is indistinguishable from prejudice. Agile (and TDD) debunked

      That's it! Definitely what I want to achieve, thank you

      For now I'm unable to achieve a working implementation of that logic, if I proceed so thread aren't joined and the script exit while all haven't been joined and I need to retrieve their status anyway.

      I added a join for @threads but now I'm stuck on the first thread execution not sure why it's not going further...

      Still working on it and I appreciate your help!

        For now I'm unable to achieve a working implementation of that logic,

        Post what you've got and I'll try and help.


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority". I'm with torvalds on this
        In the absence of evidence, opinion is indistinguishable from prejudice. Agile (and TDD) debunked