Anonymous Monk has asked for the wisdom of the Perl Monks concerning the following question:

I'm doing a complex bit of data processing that goes like this:
* build a large data structure (~500 MB) by retrieve()-ing several data files,
* do some data mining to find all data points that agree to pre-defined filters and collect them in arrays,
* use IPC:Open2 to start an R process for statistics,
* feed the arrays (one at a time) and R commands to the R process and read back the results,
* dump the results into a LaTeX table.

Now all of this takes quite a long time - especially the R commands themselves.

I was wondering how could I speed up this procedure when I realized that I have a dual-core machine, and one glance at the CPU usage graph told me that one of the CPUs just sits there uselessly while my beard is turning white. If could have two R processes running at the same time, and I could share the input between them, I could have the processing finished in half the time! Only problem is, that I'm not exactly sure how to do it.

I guess the obvious answer is "threads", more specifically, a dispatcher/worker model where there is one central thread that assembles the input for the workers and puts it on a queue, and there are several (at least two) worker threads, each munching data off the queue and feeding it to its own R process.
However, I'm not comfortable with threads and I don't even know if the above scheme is viable at all. For example, how can I avoid a race condition when I'm filling up the result data structure (for the table) if the results themselves come in asynchronously? How do I tell which thread is busy and which one is available?

And there is an other thing: it would be best if the solution could be extended to include clients that run on other computers. That is, there would be a server that decides which datasets should be processed, then it would look around on the local network and send data to clients that have available slots. Are there by any chance ready-made solutions for problems like this?
  • Comment on Using threads to run multiple external processes at the same time

Replies are listed 'Best First'.
Re: Using threads to run multiple external processes at the same time
by bot403 (Beadle) on Aug 31, 2009 at 21:58 UTC

    Actually a simple boss/worker model should be easy enough. This wont scale to multiple computers but its a starting point. Note that this code was torn from a larger fully functional threaded program I have. The code below is not complete and not even tested but it should demonstrate all the concepts you need.

    For example, how can I avoid a race condition when I'm filling up the result data structure (for the table) if the results themselves come in asynchronously?

    You can put your work back into a queue to avoid a race condition. Alternatively look into lock(). Just make a variable called $lock then you can control access to your output structure. by doing something like this..

    my $lock : shared; { lock($lock) ...do something that only 1 thread at a time should do... }

    How do I tell which thread is busy and which one is available?

    You shouldn't need to. In the scheme below using a queue an idle thread will pick up a unit of work if it can, if there is no more work it will exit and if no work exists yet it will wait.

    use threads; use threads::shared; use Thread::Queue; my $max_threads = 8; my $queue = Thread::Queue->new(); print "Spawning threads..."; for(1...$max_threads){ threads->create(\&thread_main)->detach(); } print "Done!\n"; ...Do what you need to do to get a work unit... $queue->enqueue("SOME_WORK_UNIT"); #Signal to the threads they're done. for(1...$max_threads){ $queue->enqueue("DONE"); } wait_on_threads(); exit 0; sub thread_main(){ while(1){ my ($work) = $queue->dequeue()); # If end-of-queue marker reached then cleanup and exit. if($app eq "DONE"){ return 0; } ...do processing work... } } sub wait_on_threads(){ #Dont even start printing messages until our queue is near deplete +d. while($queue->pending() > $max_threads){ sleep 1; } my $cnt = 0; my $wait = 5; while(my $items = $queue->pending() > 0){ # Fortunatly on AIX sleep doesnt seem to put ALL the threads t +o sleep. # whew! This is a non-portable loop though ;) if ($cnt++ > $wait){ print "Please wait. $items items still in progress.\n"; $cnt = 0; $wait++; } sleep 3; } }
Re: Using threads to run multiple external processes at the same time
by Sewi (Friar) on Aug 31, 2009 at 21:36 UTC
    Let me answer your last question first: I don't know any (and I don't think that there is a stable, easy) way to extend a threaded program over multiple workstations. (Remember: I said "easy". :-) ).

    Here is what I'ld do to extend this over multiple computers with as little time as possible:

  • Create a NFS or other shared directory which could be used by all computers.
  • Split your job in smaller files and put them into the shared directory
  • Start a worker script on each computer which does the following:
    • Select a file from the spool, exit if there are no files left
    • Rename the file and include a unique computer name and the $$ PID in the new filename, if the rename fails, start over (another process grabed the file between our selection and the rename command)
    • Wait a random short time (for example select undef,undef,undef,rand(1);)
    • Check if a file with the new (renamed) name exists
    • Now the duplicate-worker-detection is done, process your file and store the result either in another sub-dir or create an output file with a suffix which is ignored during the selection process.
  • If you want to use threads, have a look at perlthrtut and maybe perlothrtut. These are good manpages and I couldn't write it better here.
    When using a Queue (described there), you should be safe from race and other bad conditions, but be sure to "my" all variables in each sub and advoid slow thread-shared variables whenever possible. Also check all modules used by the threaded part if they're thread-safe!

    Personally, I'ld prefer the first method. It's not as simple as a thread queue once you learned how to use the queue, but it could be spread over a huge amount of computer. A launcher which detects the number of CPUs (/proc/cpuinfo) and starts one process per CPU should be easy to write and could be started as a CGI if you secure the URL (.htaccess?).

Re: Using threads to run multiple external processes at the same time
by ig (Vicar) on Aug 31, 2009 at 21:43 UTC

    There are various modules that might help you. POE is often discussed and if you search CPAN for "processes" you will find many more that you might investigate.

Re: Using threads to run multiple external processes at the same time
by JavaFan (Canon) on Aug 31, 2009 at 21:15 UTC
    I guess the obvious answer is "threads",
    Actually, the obvious answer is "fork".
      I'm a fan of fork, I use it much more than threads, but this is a better job for threads if running on a single computer. Thread-queues are much faster and very much easier as shared memory. The same is true for thread control vs. forked-child control (like waiting and not missing childs).
        The OP wants to go to a situation where different computers are used. That's never going to work with threads. You'd better off starting with forks and doing IPC with sockets than first implementing a thread based solution, and then redoing part of the work if you take it to the next level.

        Besides, I don't trust that every OS treats threads in such a way that it's actually running different threads of the same process on different CPUs. Or that it gives 2 threads the same number of run-slots as it does with 2 processes.

Re: Using threads to run multiple external processes at the same time
by jdrago_999 (Hermit) on Sep 01, 2009 at 17:35 UTC

    Take a look at forks.

    I typically would do what you are doing like this:

    #!/usr/bin/perl -w use strict; use warnings 'all'; use POSIX 'ceil'; use forks; use forks::shared; my @unprocessed = get_data(); my @processed : shared = ( ); my $total_records = scalar(@unprocessed); my $max_workers = 5; my $chunk_size = ceil($total_records / $max_workers); while( my @chunk = splice(@unprocessed, 0, $chunk_size) ) { threads->create(\&process_chunk, \@chunk); }# end while() # Keep an eye on our progress: threads->create(sub { my $finished = scalar(@processed); print STDERR "\r$finished/$total_records"; }); # Now wait for everyone to finish... $_->join foreach threads->list; # Process a chunk of data: sub process_chunk { my @chunk = @_; while( my $item = shift(@chunk) ) { # Do something with $_, then... # Supposing $item has an id... lock(@processed); push @processed, $item->id; }# end while() }# end process_chunk()
Re: Using threads to run multiple external processes at the same time
by NiJo (Friar) on Sep 01, 2009 at 20:42 UTC
    'Fork'ing is probably the right architecture when using IPC, but there are efficient poor man alternatives exploiting the separate R process and unix infrastructure:

    1) Dump your arrays into files (probably CSV) into a directory shared e. g. via NFS or sftp.

    2) SSH / telnet to the computing boxes (via 'expect' or equivalent perl modules) to start work packages:

     "R commandfile <infile >outfile && rename outfile outfile.done

    3) Depending on low memory requirements you could start all R processes at once. If it is just 'nice'ed CPU load I would not care.

    4) When memory requirements would cause swapping, you could use the unix 'batch' command to queue work packages.

    5) If the work packages include the latex table setting, you could have that 'mail'ed to you.

    6) Alternatively a second script checks the NFS share and processes finished R output.

    Don't spend too much effort reinventing infrastructure!

Re: Using threads to run multiple external processes at the same time
by kikuchiyo (Hermit) on Sep 02, 2009 at 20:01 UTC
    OP here, sorry for not logging in for my original post.

    Thank you all for your helpful comments and suggestions.

    I wrote a threaded solution along the lines of bot403's example. One of the hurdles I had to overcome was that my original program was quite sequential: the order of data subset creation also determined the order in which they were processed and dumped into tables. This was simple to code and for this scheme, the statistics part did not need to know which data subset it was working on.
    However, in the multithreaded program data processing is asynchronous, so I had to label each dataset to mark where they belong.

    BrowserUk's suggestion about using ssh to communicate with R processes on other machines was also helpful, as this is quite easy to implement. There is one practical problem with this: I can only use computers where I have shell access - so my colleagues' Windows boxes are not eligible.

    About the other type of solutions outlined above - I might implement something like that if the present version dows not prove to be efficient enough.
      I've noticed something strange when doing tests.
      The processing time was the roughly same no matter how many work threads (and thus, R instances) I started, even though both CPU cores are on full load and it seems that the threads are sharing the workloads among themselves.

      How can this be?

        Post your code and let us see what you are doing wrong.

        How many threads are you starting? With 2 cpus, you are unlikely to see addditional benefits after you have 2 or 3 CPU intensive R instances running.


        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.

        Imagine you have 10 units of work each needing 1,000 seconds to complete. You need to spend a total of 10,000 seconds of "work" (CPU seconds).

        You can only divide efficiently into as many CPUs as you have. With 2 CPUs you can finish in 5,000 seconds. With 10 CPUs you can finish in 1,000 seconds. With 100 CPUs you still need 1,000 seconds unless you can further subdivide your unit of work. 90 CPUs will be idle while 10 do the processing.

        Also, the OS takes care of the sharing. No matter if you have 2 or 1,000 threads the OS will make sure each of them gets their fair share of time to run. When you have more threads or processes then you have CPUs in a system they just fight (in a sense) over who can currently execute. You can only have as many running programs as you have CPUs. Even modern CPUs always run 1 program at a time. It just seems like everything is running at once because they constantly switch between programs extremely fast. :)

        It could be one (or a combination of) several factors:

        • Are you passing small messages from your worker threads to your "manager" thread, or are you passing large chunks of data?
        • How often are you passing messages from your worker threads to your manager thread? Can you reduce the frequency that this occurs?
        • Can you add more threads?