Beefy Boxes and Bandwidth Generously Provided by pair Networks
We don't bite newbies here... much
 
PerlMonks  

Queuing in multithread context

by Hardin (Novice)
on Jan 20, 2015 at 10:32 UTC ( [id://1113850]=perlquestion: print w/replies, xml ) Need Help??

Hardin has asked for the wisdom of the Perl Monks concerning the following question:

Hi Monks,

I'm yet fairly new to perl but have been assigned some pretty challenging -to me- task which consist in building a script to copy several sources directories to several destination on several servers. All that under Windows environment.

My script is reading a configuration file that contains delevery instructions, thus this can vary between each execution. As the size of data is huge and this needs to be deployed on several servers (I call them targets) we need the process to be multithreaded so each copy task are dispatched to each server.

My problem now is that I can't send each copy task at once on the same server (lower I/Os, increase copy time, may have high impact on overall server performance and so on), so I try to implement queuing in thread.

I try to implement the Thread::Queue module, but the provided example and the other resources I have been reading only consider a defined amount of queued tasks (when this may vary in my case).

Below is my code for thread loop creation:

foreach my $targetServer (@targets) { # Ping host to check its available my $p = Net::Ping->new(); if ($p->ping($targetServer)) { $logger->info("Target [$targetServer] is alive, starting deploymen +t "); foreach my $deploySection (@deployments) { if ($flagDeploy) { $logger->info("Starting deployment for [$deploySection] on tar +get [$targetServer] "); my $deploySource = "$config{$deploySection}{'Source'}" ; my $deployDest = "$config{$deploySection}{'Destination'}" ; if ($flagNoMove) { my $thread = threads->new(sub {deployRoutine($targetServer, +$deploySource, $deployDest, "true")}); push (@Threads, $thread); } else { my $thread = threads->new(sub {deployRoutine($targetServer, +$deploySource, $deployDest)}); push (@Threads, $thread); } } } } else { $logger->warn("Couldn't ping target [$targetServer] host will be i +gnored "); } $p->close(); }

Heart of the problem is my DeployRoutine that is a call to batch file that will execute a Robocopy with provided parameters :

# Main subroutine, here's where we will deploy content sub deployRoutine($$$;$) { my ($remoteHost, $sourceDir, $destDir, $flagNoMove) = @_ ; $flagNoMove="" unless ($flagNoMove); my $cmd = "psexec \\\\$remoteHost /accepteula -u $user -p $pass -h - +e -n 3 -f -c $copyScript $sourceDir \"$destDir\" $flagNoMove" ; my $returnCode = `$cmd`; return $returnCode ; }

While proceeding like this I bump into the below error:

Process can't access file because it is used by another process Connecting to MYSERVER...Starting PsExec service on MYSERVER...Connect +ing with PsExec service on MYSERVER...Copying C:\temp\deploy_files.cm +d to MYSERVER...Error copying C:\temp\deploy_files.cmd to remote syst +em: Free to wrong pool 348d3a0 not 298de8 during global destruction.

So I wish to enqueue each "deployRoutine" tasks for each server, but I don't see how I could achieve this not impacting my current multithreading implementation.

Any wisdom advice will be greatly appreciated! Thanks

Replies are listed 'Best First'.
Re: Queuing in multithread context
by BrowserUk (Patriarch) on Jan 20, 2015 at 11:10 UTC
    My problem now is that I can't send each copy task at once on the same server (lower I/Os, increase copy time, may have high impact on overall server performance and so on), so I try to implement queuing in thread.

    That sentence doesn't make any sense.

    You don't explain what problem "implement queuing in thread" is meant to address; nor how it will help. Actually, not even what "implement queuing in thread" means?

    While proceeding like this I bump into the below error:

    Two of those error messages have nothing to do with your perl script:

    1. "Process can't access file because it is used by another process"

      That comes from the OS, not Perl.

    2. "Error copying C:\temp\deploy_files.cmd to remote system"

      And that not a OS error, so it must be coming from PsExec.exe. Nothing to do with perl.

    Finally, "Free to wrong pool 348d3a0 not 298de8 during global destruction." simply means that you aren't cleaning up your threads properly before ending the script.

    We might've been able to help you with that, had you posted a short but complete and working, runnable script; but as is there is nothing we can do.


    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority". I'm with torvalds on this
    In the absence of evidence, opinion is indistinguishable from prejudice. Agile (and TDD) debunked

      Hi,

      Sorry if I'm not clear, as explained a bit earlier my goal in using queued thread is to ensure only one copy task is processed to remote server at a time.

      I agree, the error is not perl related, but related to my `psexec` instruction that copy the batch script prior executing it. But it's a side effect of having all my copy threads sent all at the same time to the remote server, this I try to bypass using queued thread. Does it make sens to you ?

      Here is the full "working" script:

      And here is a sample of ini file the script is parsing and provided as parameter to the script:

      I hope this sched some light ? Please be patient as I'm still figuring some stuff out but I'll try to better explain my needs in the future.
      Thanks !

        Okay. Now you've shown us the sample config file, it shows that you are running each of multiple DEPLOY sections on each of the servers, each in a separate thread; hence your conflicts.

        The simple solution to your problem is to only start one thread for each server; and call the deployRoutine() multiple times within that thread; serially.

        Ie. Something like this:

        foreach my $targetServer (@targets) { # Ping host to check its available my $p = Net::Ping->new(); if ($p->ping($targetServer)) { my @subs; ### Accumulate the deployRoutines here foreach my $deploySection (@deployments) { if ($flagDeploy) { my $deploySource = "$config{$deploySection}{'Source'}" ; my $deployDest = "$config{$deploySection}{'Destination'}" +; if ($flagNoMove) { ## Add this sub to the array to be executed for this s +erver push @subs, sub { deployRoutine($targetServer, $deploy +Source, $deployDest, "true") }; $threadDetails{$thread}{"Section"} = "$deploySection"; $threadDetails{$thread}{"Target"} = "$targetServer"; } else { ## Add this sub to the array to be executed for this s +erver push @subs, sub { deployRoutine( $targetServer, $deplo +ySource, $deployDest ) }; $threadDetails{$thread}{"Section"} = "$deploySection"; $threadDetails{$thread}{"Target"} = "$targetServer"; } } } ## Now start one thread to execute them all; serially push @threads, threads->new( sub { $_->() for @subs; } ); $p->close(); }

        Note: That is obviously untested code in the context of your application, but in isolation, the principle works:

        { use threads; my @subs = map{ eval "sub{ print $_; }" } 1 .. 10; threads->new( sub{ $_->() for @subs; } ); sleep 10; };; 1 2 3 4 5 6 7 8 9 10

        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority". I'm with torvalds on this
        In the absence of evidence, opinion is indistinguishable from prejudice. Agile (and TDD) debunked
Re: Queuing in multithread context
by SimonPratt (Friar) on Jan 20, 2015 at 11:53 UTC

    From what I understand / can see your current multi-threading implementation is flawed. It appears you want to multi-thread sending data to servers, however for each server you want to send the data serially. At the moment, it looks like your implementation is to work through the servers serially and multi-thread the data being copied on each server

    This is Not Good and using Thread::Queue won't solve your problem. You need to re-address your threading model. In fact, I would suggest you rebuild your application without using threading - Once its working, then add Thread::Queue to queue your list of servers, then add threads (with a static pool of threads) to do the work for each server.

    Finally, you get the Free to wrong pool error because something you are doing (probably a library you are loading) is not thread safe. This can cause all sorts of problems, though there are potential workarounds (such as require'ing affected libraries after threads have been created, or only within the threads that need them).

      Thanks !!

      I was considering my approach wasn't the proper or best one. You perfectly understood what I was trying to achieve thus you made a important point.

      I've already been able to work without thread in a sequential way and ended implementing threads to hurry the stuff and not depending on tasks between separated servers.

      I'm still figuring out how I could use the queue as you pointed but at least it sounds like a more relevant direction...

        This is a very basic representation of how your threading model might look:

        use 5.16.2; use threads; use Thread::Queue; my @serverList = qw(one two three four five six); my $serverQueue = Thread::Queue->new( @serverList ); $serverQueue->end; my @threads = map { threads->new( \&worker ) } 1..4; $_->join for @threads; say 'Back to main - Finished!'; sub worker { my $tid = threads->tid; while ( my $server = $serverQueue->dequeue_nb ) { say "I'm thread ID $tid and I'm processing server $server to c +opy a whole bunch of files"; sleep 1; } say "Thread ID $tid finished"; }
Re: Queuing in multithread context
by Anonymous Monk on Jan 20, 2015 at 10:58 UTC

    I try to implement the Thread::Queue module, but the provided example and the other resources I have been reading only consider a defined amount of queued tasks (when this may vary in my case).

    What does that mean?

    Its unclear what your diagram looks like, but I think Thread::Queue is workable like this

    #!/usr/bin/perl -- use strict; use warnings; use threads stack_size => 4096; use Thread::Queue; my %serverQ; my $resultQ = Thread::Queue->new; my @workers = map { my $workQ = Thread::Queue->new; threads->create( \&Worker, $workQ, $resultQ ); } 1..10; ... while( @targets ){ ... if( my $q = $serverQ{ $host } ){ $q->enqueue( [ ## 'Foo::bar::deployRoutine', $onearg, $twoarg, $threearg , 'deployRoutine' => $targetServer, $deploySource, $deployDest, "true", ] ); ... FreeFreeWorkers( \%serverQ ); ## check empty Qs, unassociate from +a host ... $_->join for threads->list; exit( 0 ); sub Worker { my( $workQ, $resultQ ) = @_; while( defined( my $jobDesc = $workQ->dequeue ) ) { my( $package, $function, @args ) = @$jobDesc; require Module::Load; Module::Load::load( $package ); $resultQ->enqueue( [ $function->( @args ) ] ); } return; }
      Bah, typo, idea is
      ### workQs , workerQs my @workers = map { my $workQ = Thread::Queue->new; threads->create( \&Worker, $workQ, $resultQ ); $workQ; } 1..10;

      Hi!

      Thanks for taking time to answer, sorry if I didn't make myself clear, my aim is to achieve queueing of my sub "deployRoutine" so that each copy task is executed after another and not sent alltogether.

      I have two main loops, the first loop through "targets" (ie: server where I need to copy files/directories) and the second embbeded is to loop through "deployments" which are several separated copy tasks.

      Lets say I have 3 servers : srv1, srv2 & srv3, I need to copy on each of those some remote content to local contents: copy1, copy2 & copy3 with

      • copy1: source=\\some\remote\content1 dest=C:/some/local/content1
      • copy2: soure=\\some\different\remoteconte dest=C:/another/local/destination
      • copy3: ... (you get the idea)

      I want the copy tasks (I said 3 but that could be 4, 5 or more, this can vary) sent to each servers one after another while currently they are sent at the same time since I create the thread its directly executed to the server.

      Right now I'm trying to understand your code as it's bit unclear to me.

      Cheers :)

        my aim is to achieve queueing of my sub "deployRoutine" so that each copy task is executed after another and not sent alltogether.

        If you want to execute the subroutines sequentially; don't use threads!

        Just call the subroutine from within your loop; and they'll execute sequentially with no conflicts. Simples!


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority". I'm with torvalds on this
        In the absence of evidence, opinion is indistinguishable from prejudice. Agile (and TDD) debunked

      Please ignore this. I had a brain fart! Please don't advocate Thread::Queue. Its broken.


      With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority". I'm with torvalds on this
      In the absence of evidence, opinion is indistinguishable from prejudice. Agile (and TDD) debunked

        Please don't advocate Thread::Queue. Its broken.

        Why? I learned to advocate it from your postings :)

Re: Queuing in multithread context
by jellisii2 (Hermit) on Jan 20, 2015 at 15:03 UTC
    Might I suggest using a tool that specifically designed for server cluster management, like Salt, which does have windows support.
Re: Queuing in multithread context
by sundialsvc4 (Abbot) on Jan 20, 2015 at 14:31 UTC

    Reading this thread (no pun intended) through several times now, I still don’t understand how threading should enter into the picture ... at all.   Threading is the sub-division of a single process, on a single computer.   You are using a cluster of computers to work on a massive copy-job so that you do not overtax the I/O resources of any one machine.   (Is my understanding correct, so far?)

    If so, it appears to me that this work should be performed by instances of a single-threaded worker process (one or more per machine, depending on how “beefy” that machine is).   They could simply be run from a Unix command-line, e.g. as background-jobs.   And so, now, “the problem to be solved,” in a flexible and dynamic way, is:   “what am I supposed to do next?”

    If the right-answer can be computed in advance, you could literally construct a shell-script consisting of one or more executions of this copy script ... dare I say, even an rsync command? ... and send it to each server so that it may execute it.   Each server just runs through its script without considering anyone else, and when all of them are done, you’re done.   The configuration file allows this process to be varied, but all scheduling decisions are made in advance.   “Here are your instructions for today ... now, go do them.”

    Another approach would be to build these processes so that they open a socket-connection to some “grand marshal” process on a single machine, i.e. this being the one which has read the configuration file.   They ask the grand-marshal what it is that they are to do next, and what tell him what they have done, just by sending and receiving single text-lines through a socket.   The grand-marshal uses nothing more than a select() loop to manage all of the connections.   (Other, higher-level network piping options are also available, but the difference is irrelevant to my suggestion.)   The grand-marshal is single threaded, processing a stream of messages sequentially and sending back new orders to each worker until, by closing the connection, it indicates that the job is done.   This would only be called for if the nature of the job (the hardware configuration, etc.) warrants dynamic, on-the-fly marshaling decisions.

      Hi and thank you for your answer,

      I think you misinterpreted my description, I think I lost a lot of people on that haha.

      No cluster or anything to dispatch heavy copy load, I basically need to execute sequentially several copy tasks on several servers in parallel.

      I think this is a more straightforward description of what I'm trying to achieve. I think I dropped confusion while talking about I/O and loads, as basically all I want is that the copy jobs are executed one after the other on each server and not all at the same time as my first pattern was achieving

      Hope this is more clear ? :)

A reply falls below the community's threshold of quality. You may see it by logging in.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: perlquestion [id://1113850]
Approved by marto
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others contemplating the Monastery: (4)
As of 2024-03-29 10:48 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found