in reply to Re: Using threads to run multiple external processes at the same time
in thread Using threads to run multiple external processes at the same time

I've noticed something strange when doing tests.
The processing time was the roughly same no matter how many work threads (and thus, R instances) I started, even though both CPU cores are on full load and it seems that the threads are sharing the workloads among themselves.

How can this be?
  • Comment on Re^2: Using threads to run multiple external processes at the same time

Replies are listed 'Best First'.
Re^3: Using threads to run multiple external processes at the same time
by BrowserUk (Patriarch) on Sep 03, 2009 at 20:10 UTC

    Post your code and let us see what you are doing wrong.

    How many threads are you starting? With 2 cpus, you are unlikely to see addditional benefits after you have 2 or 3 CPU intensive R instances running.


    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.
      Sorry, I didn't have time to clean it down to a minimal example, so here it is in its (almost) full horribleness.

      It won't run for you, of course, not without the proper data files. I'll paste some excerpts from the output:

      $ time perl maketables_threads_broken_minimal.pl 2> dev/null Spawning threads... 1 statistics threads started [snip] > Thread 1 starting ch ch_fm, job *, dist district_2, table sum, size +63223 < Thread 1 finished ch ch_fm, job *, dist district_2, table sum in 11s +ec > Thread 1 starting ch ch_fm, job *, dist district_12, table sum, size + 77721 < Thread 1 finished ch ch_fm, job *, dist district_12, table sum in 17 +sec > Thread 1 starting ch ch_fm, job *, dist district_22, table sum, size + 54784 < Thread 1 finished ch ch_fm, job *, dist district_22, table sum in 8s +ec > Thread 1 starting ch ch_fm, job *, dist district_22, table sum, size + 54784 < Thread 1 finished ch ch_fm, job *, dist district_22, table sum in 8s +ec > Thread 1 starting ch ch_fm, job *, dist *, table sum, size 195728 < Thread 1 finished ch ch_fm, job *, dist *, table sum in 112sec [snip] Thread 1 done 16 jobs !!! 16 packages pushed, 16 result received real 3m39.706s user 0m5.948s sys 0m0.964s $ time perl maketables_threads_broken_minimal.pl 2> dev/null Spawning threads... 2 statistics threads started [snip] > Thread 2 starting ch ch_fm, job *, dist district_2, table sum, size +63223 < Thread 1 finished ch ch_fm, job onkorm, dist *, table sum in 48sec > Thread 1 starting ch ch_fm, job *, dist district_12, table sum, size + 77721 < Thread 2 finished ch ch_fm, job *, dist district_2, table sum in 19s +ec > Thread 2 starting ch ch_fm, job *, dist district_22, table sum, size + 54784 < Thread 2 finished ch ch_fm, job *, dist district_22, table sum in 15 +sec > Thread 2 starting ch ch_fm, job *, dist *, table sum, size 195728 < Thread 1 finished ch ch_fm, job *, dist district_12, table sum in 34 +sec Thread 1 done 10 jobs < Thread 2 finished ch ch_fm, job *, dist *, table sum in 113sec Thread 2 done 6 jobs !!! 16 packages pushed, 16 result received real 3m35.557s user 0m6.100s sys 0m1.008s

      It seems that the R process needs twice the time to do the exact same thing when there are two instances of it.

      Update:Most of the processing time is spent by these commands:
      print $Write "pyros=cenros(outcome,cenoutcome)\n"; print $Write "mean(pyros)\n"; $res = <$Read>;
      Data subsetting and enqueuing takes about 7 seconds out of the 3 min 36 s runtime.

      The code:
      #!/usr/bin/perl use Storable qw(retrieve nfreeze thaw ); use Data::Dumper; use IPC::Open2; use threads; #use threads::shared; use Thread::Queue; use strict; use warnings; use utf8; my %settings; my %db; #my @data; my %frozen_data; #my $dref; my @canned_texts; $|++; $settings{db_fn} = '/home/kikuchiyo/meresek/t-mobil_exp/t-mobil.dbt'; $settings{data_dir} = '/home/kikuchiyo/meresek/t-mobil_exp/data'; $settings{data_ext} = ".dat"; $settings{hungry_hungry_hippo} = 1; $settings{tables_fn} = 'tables.tex'; $settings{power_density} = 1; $settings{max_threads} = 2; my $input_queue = Thread::Queue->new(); my $result_queue = Thread::Queue->new(); my @channels = qw(ch_fm); print "Spawning threads... "; for (1..$settings{max_threads}) { threads->create(\&work_thread, $_)->detach(); } print "$settings{max_threads} statistics threads started\n"; my $type = 'all'; # O, M, U, E, A, all my %filter; my $result; my @data; my $ar; my %arrays; my %desc; my $manifest; my $package; my @tables; my @conditions; my $number_packages = 0; my $number_results = 0; open_db_simple(); foreach my $ch (@channels) { @data = (); foreach my $job qw(ovoda iskola onkorm ) { foreach my $district qw(district_2 district_12 district_22 ) { %filter = (district => $district, job => $job); $ar = collect_data($ch, \%filter, $type); $arrays{$job}{$district} = $ar; print "\n***Ch: $ch, Job: $job, District: $district\n"; $manifest = { ch => $ch, job => $job, district => $distric +t, type => $type, table => "cat" }; $package = { manifest => $manifest, ar => $ar }; $input_queue->enqueue(nfreeze($package)); $number_package +s++; } } print "Appending table $ch\_all_cat\n"; @data = (); foreach my $job qw(ovoda iskola onkorm ) { $ar = []; foreach my $district qw(district_2 district_12 district_22 ) { push @$ar, @{ $arrays{$job}{$district} }; } print "\n***Ch: $ch, Job: $job\n"; $manifest = { ch => $ch, job => $job, district => "*", type => + $type, table => "sum" }; $package = { manifest => $manifest, ar => $ar }; $input_queue->enqueue(nfreeze($package)); $number_packages++; } foreach my $district qw(district_2 district_12 district_22 ) { $ar = []; foreach my $job qw(ovoda iskola onkorm ) { push @$ar, @{ $arrays{$job}{$district} }; } print "\n***Ch: $ch, District: $district\n"; $manifest = { ch => $ch, job => "*", district => $district, ty +pe => $type, table => "sum" }; $package = { manifest => $manifest, ar => $ar }; $input_queue->enqueue(nfreeze($package)); $number_packages++; } $ar = []; foreach my $job qw(ovoda iskola onkorm ) { foreach my $district qw(district_2 district_12 district_22 ) { + push @$ar, @{ $arrays{$job}{$district} }; } } print "\n***\nCh: $ch, All\n"; $manifest = { ch => $ch, job => "*", district => "*", type => $typ +e, table => "sum" }; $package = { manifest => $manifest, ar => $ar }; $input_queue->enqueue(nfreeze($package)); $number_packages++; print "Appending table $ch\_all_sum\n"; } for (1..$settings{max_threads}){ $input_queue->enqueue("DONE"); } print "Done assigning jobs, waiting...\n"; while (my $to_be_processed = $input_queue->pending() > 0) { sleep(1); } while (my $remaining_results = $result_queue->pending() > 0) { $package = thaw($result_queue->dequeue()); $number_results++; } print "!!! $number_packages packages pushed, $number_results result re +ceived\n"; sleep 1; exit 0; sub work_thread { my ($tid) = shift; my ($work, $result); my $number = 0; my ($ReadFH, $WriteFH); + my $pid = open2($ReadFH, $WriteFH, "R", "--slave"); print $WriteFH "library(splines)\nlibrary(survival)\nlibrary(NADA) +\n"; while (1) { my $work = $input_queue->dequeue(); # If end-of-queue marker reached then cleanup and exit. if ($work eq "DONE"){ close $WriteFH; print "Thread $tid done $number jobs\n"; kill 9, $pid; return 0; } elsif ($work) { $number++; $result = do_R_stat($work, $ReadFH, $WriteFH, $tid); $result_queue->enqueue($result); } } } sub collect_data { my ($ch, $filter, $type) = @_; my @ids; my $dref; foreach my $id (sort keys %db) { my $condition = 1; foreach my $key (keys %$filter) { $condition &= $db{$id}{record}{$key} eq $filter->{$key}; } push @ids, $id if $condition; } my @arr; foreach my $id (@ids) { if ($settings{hungry_hungry_hippo} and exists($frozen_data{$id +})) { $dref = $frozen_data{$id}; print "MEM $id\t"; } else { $dref = retrieve($settings{data_dir}.'/'.$id.$settings{dat +a_ext}); $frozen_data{$id} = $dref; print "DISK $id\t"; } foreach my $i (0..$#$dref) { if ($dref->[$i]{$ch} and ($type eq $dref->[$i]{type} or $t +ype eq 'all')) { push @arr, $dref->[$i]{$ch}; } } } return \@arr; } sub do_R_stat { my $mirelit = shift; my $Read = shift; my $Write = shift; my $tid = shift; my $package = thaw($mirelit); my $arr = $package->{ar}; my $digits = 3; my $dl_nomeas = 0.005; my $dl_fewmeas = 0.01; my $z0 = 3.769911; unless (scalar @$arr) { my %result = ( 'n' => 0, 'max' => 0, 'ros_sd' => 0, 'quantile' => { '0.95' => 0 }, 'n_cen' => 0, 'sd' => 0, 'mean' => 0, 'ros_mean' => 0, #'gsd' => 0, 'gmean' => 0, ) ;# print "FAIL $tid\n"; return nfreeze( { manifest => $package->{manifest}, result => +\%result } ); } print "> Thread $tid starting ch $package->{manifest}{ch}, job $pa +ckage->{manifest}{job}, dist $package->{manifest}{district}, table $p +ackage->{manifest}{table}, size ".(scalar @$arr)."\n"; my $time = time(); my %result; my $res; { local $, = ","; #print 1; print $Write "outcome <- c("; print $Write @$arr[0..19]; print $Write ")\n"; for my $i (0..int($#$arr/20 - 1)) { print $Write "outcome <- c(outcome, "; print $Write @$arr[($i*20)..($i*20+19)]; print $Write ")\n"; } print $Write "outcome <- c(outcome, "; print $Write @$arr[(20*int($#$arr/20))..$#$arr]; print $Write ")\n"; } print $Write "outcome=outcome**2/$z0\n" if $settings{power_density +}; print $Write "cenoutcome=rep(FALSE, length(outcome))\n". "cenoutcome[outcome==min(outcome)]=TRUE\n"; print $Write "sum(cenoutcome)==length(outcome)\n"; my $nomeas = <$Read>; print $Write "sum(cenoutcome)>=length(outcome)-minmeas\n"; my $fewmeas = <$Read>; if ($nomeas =~ /TRUE/) { $result{ros_mean} = $dl_nomeas; $result{ros_sd} = "NA"; $result{n_cen} = 0; $result{quantile}{0.95} = "NA"; } elsif ($fewmeas =~ /TRUE/) { $result{ros_mean} = $dl_fewmeas; $result{ros_sd} = "NA"; print $Write "length(outcome)-sum(cenoutcome)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{n_cen} = $res; $result{quantile}{0.95} = "NA"; } else { print $Write "pyros=cenros(outcome,cenoutcome)\n"; print $Write "mean(pyros)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $res = sqrt($z0 * $res) if $settings{power_density}; $result{ros_mean} = sprintf("%.0".$digits."f", $res); print $Write "sd(pyros)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $res = sqrt($z0 * $res) if $settings{power_density}; $result{ros_sd} = sprintf("%.0".$digits."f", $res); print $Write "quantile(pyros, probs=c(0.95))\n"; $res = <$Read>; $res = <$Read>; chomp $res; chop $res; # a space-t is leszedi +a vegerol $res = sqrt($z0 * $res) if $settings{power_density}; $result{quantile}{0.95} = sprintf("%.0".$digits."f", $res); print $Write "pyros\n"; $res = <$Read>; $res = <$Read>; print $Write "length(outcome)-sum(cenoutcome)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{n_cen} = sprintf("%.01d", $res); } print $Write "outcome=sqrt($z0*outcome)\n" if $settings{power_dens +ity}; print $Write "mean(outcome, na.rm=TRUE)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{mean} = sprintf("%.0".$digits."f", $res); #print 3; print $Write "sd(outcome, na.rm=TRUE)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{sd} = sprintf("%.0".$digits."f", $res); #print 4; print $Write "exp(mean(log(outcome), na.rm=TRUE))\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{gmean} = sprintf("%.0".$digits."f", $res); print $Write "max(outcome)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{max} = sprintf("%.0".$digits."f", $res); #print 7; $result{n} = scalar @$arr; + print "< Thread $tid finished ch $package->{manifest}{ch}, job $pa +ckage->{manifest}{job}, dist $package->{manifest}{district}, table $p +ackage->{manifest}{table} in ".(time()-$time)."sec\n"; return nfreeze( { manifest => $package->{manifest}, result => \%re +sult } ); } sub open_db_simple { local(*DBFN); unless (open(DBFN, '<:encoding(UTF-8)', $settings{db_fn})) { die; } die if check_db_syntax($settings{db_fn}); my $dbcode = do { local( $/ ) ; <DBFN> } ; close DBFN; eval $dbcode; } sub check_db_syntax { my $file = shift; system("perl", "-c", $file); return $? >> 8; }

        I too am confused by your results. Right now I have 3 speculative possibilities:

        1. Your cpu isn't a real dual-core, but rather one of those pseudo-dual core hyperthreaded things, and your OS is presenting it as if it were a true dual core.
        2. One or more of the R dlls is non-threadsafe, and they've dodged the issue by using a semaphore to serialise access.

          This seems to be the most likely explanation.

        3. There is something inherent in your implementation that is causing the threads to serialise.

          I haven't been able to spot anything from a fairly extended inspection, but there is rather too much code to comprehensively 'run it in my head'.

          Unfortunately, even if I had the data files, I would still not be able to run it here as IPC::Open2 (nor any of the alternatives), don't work worth a damn on my platform.

        My best suggestion for isolating which (if any) of the above is the problem, is to log the command output of your demo run, split it into 2 and then start two manual R sessions and feed (pipe) half to each and set them both going at the same time and see how long they take.


        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.
Re^3: Using threads to run multiple external processes at the same time
by bot403 (Beadle) on Sep 03, 2009 at 23:16 UTC

    Imagine you have 10 units of work each needing 1,000 seconds to complete. You need to spend a total of 10,000 seconds of "work" (CPU seconds).

    You can only divide efficiently into as many CPUs as you have. With 2 CPUs you can finish in 5,000 seconds. With 10 CPUs you can finish in 1,000 seconds. With 100 CPUs you still need 1,000 seconds unless you can further subdivide your unit of work. 90 CPUs will be idle while 10 do the processing.

    Also, the OS takes care of the sharing. No matter if you have 2 or 1,000 threads the OS will make sure each of them gets their fair share of time to run. When you have more threads or processes then you have CPUs in a system they just fight (in a sense) over who can currently execute. You can only have as many running programs as you have CPUs. Even modern CPUs always run 1 program at a time. It just seems like everything is running at once because they constantly switch between programs extremely fast. :)

      I'm aware of this; that's why I was surprised when I compared the runtimes of the one worker thread vs. two worker threads runs. The processing time was approx 43 minutes for both cases.

      I looked at the output of ps -u while my program was running: in the two threaded case there were two R processes running, both using 99% CPU power, and from the output it seemed that they were dividing the jobs among themselves. Yet, the processing took as much time as in the single threaded case.

      As for the question about messages between threads: The manager thread uses Storable to create work units from the dataset arrays; the frozen arrays are placed on the input queue. These are fairly large.
      The work threads use a second queue to get the results back to the manager thread; however, the results are just hashes with about a dozen keys.

      I'll construct a minimal example and get back to you.
        The manager thread uses Storable to create work units from the dataset arrays; the frozen arrays are placed on the input queue. These are fairly large.

        You shouldn't create the subsets in the main thread and queue them to the workers. This is far too costly.

        Better:

        1. Share the main array so the workers have access;
        2. Queue the subset criteria to the workers;
        3. They dequeue a criteria and use it to create the subsets locally from the shared raw data array.

          As they will only be reading that array, no locking is required.

        4. They generate the subset, pass it to their R instance and wait for the response.

          Unless there is a real need to pass the results back to the main thread, have them finish dealing with them locally before going back for a new criteria set.

        This way, your workers won't be sitting around idle while your main thread is performing the subsetting for all of them. And you won't be churning over costly shared resources by enqueuing and dequeuing large serialised (storable) subsets.

        Let the workers do the work; let the manager sit back and manage :)


        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.
        Sorry. I mis-understood you. Certainly 1 vs 2 worker threads on a 2 CPU machine should not take the same amount of time. Something is certainly not right in how the work is being divided....
Re^3: Using threads to run multiple external processes at the same time
by jdrago_999 (Hermit) on Sep 03, 2009 at 17:35 UTC

    It could be one (or a combination of) several factors:

    • Are you passing small messages from your worker threads to your "manager" thread, or are you passing large chunks of data?
    • How often are you passing messages from your worker threads to your manager thread? Can you reduce the frequency that this occurs?
    • Can you add more threads?