in reply to Re^3: Using threads to run multiple external processes at the same time
in thread Using threads to run multiple external processes at the same time

Sorry, I didn't have time to clean it down to a minimal example, so here it is in its (almost) full horribleness.

It won't run for you, of course, not without the proper data files. I'll paste some excerpts from the output:

$ time perl maketables_threads_broken_minimal.pl 2> dev/null Spawning threads... 1 statistics threads started [snip] > Thread 1 starting ch ch_fm, job *, dist district_2, table sum, size +63223 < Thread 1 finished ch ch_fm, job *, dist district_2, table sum in 11s +ec > Thread 1 starting ch ch_fm, job *, dist district_12, table sum, size + 77721 < Thread 1 finished ch ch_fm, job *, dist district_12, table sum in 17 +sec > Thread 1 starting ch ch_fm, job *, dist district_22, table sum, size + 54784 < Thread 1 finished ch ch_fm, job *, dist district_22, table sum in 8s +ec > Thread 1 starting ch ch_fm, job *, dist district_22, table sum, size + 54784 < Thread 1 finished ch ch_fm, job *, dist district_22, table sum in 8s +ec > Thread 1 starting ch ch_fm, job *, dist *, table sum, size 195728 < Thread 1 finished ch ch_fm, job *, dist *, table sum in 112sec [snip] Thread 1 done 16 jobs !!! 16 packages pushed, 16 result received real 3m39.706s user 0m5.948s sys 0m0.964s $ time perl maketables_threads_broken_minimal.pl 2> dev/null Spawning threads... 2 statistics threads started [snip] > Thread 2 starting ch ch_fm, job *, dist district_2, table sum, size +63223 < Thread 1 finished ch ch_fm, job onkorm, dist *, table sum in 48sec > Thread 1 starting ch ch_fm, job *, dist district_12, table sum, size + 77721 < Thread 2 finished ch ch_fm, job *, dist district_2, table sum in 19s +ec > Thread 2 starting ch ch_fm, job *, dist district_22, table sum, size + 54784 < Thread 2 finished ch ch_fm, job *, dist district_22, table sum in 15 +sec > Thread 2 starting ch ch_fm, job *, dist *, table sum, size 195728 < Thread 1 finished ch ch_fm, job *, dist district_12, table sum in 34 +sec Thread 1 done 10 jobs < Thread 2 finished ch ch_fm, job *, dist *, table sum in 113sec Thread 2 done 6 jobs !!! 16 packages pushed, 16 result received real 3m35.557s user 0m6.100s sys 0m1.008s

It seems that the R process needs twice the time to do the exact same thing when there are two instances of it.

Update:Most of the processing time is spent by these commands:
print $Write "pyros=cenros(outcome,cenoutcome)\n"; print $Write "mean(pyros)\n"; $res = <$Read>;
Data subsetting and enqueuing takes about 7 seconds out of the 3 min 36 s runtime.

The code:
#!/usr/bin/perl use Storable qw(retrieve nfreeze thaw ); use Data::Dumper; use IPC::Open2; use threads; #use threads::shared; use Thread::Queue; use strict; use warnings; use utf8; my %settings; my %db; #my @data; my %frozen_data; #my $dref; my @canned_texts; $|++; $settings{db_fn} = '/home/kikuchiyo/meresek/t-mobil_exp/t-mobil.dbt'; $settings{data_dir} = '/home/kikuchiyo/meresek/t-mobil_exp/data'; $settings{data_ext} = ".dat"; $settings{hungry_hungry_hippo} = 1; $settings{tables_fn} = 'tables.tex'; $settings{power_density} = 1; $settings{max_threads} = 2; my $input_queue = Thread::Queue->new(); my $result_queue = Thread::Queue->new(); my @channels = qw(ch_fm); print "Spawning threads... "; for (1..$settings{max_threads}) { threads->create(\&work_thread, $_)->detach(); } print "$settings{max_threads} statistics threads started\n"; my $type = 'all'; # O, M, U, E, A, all my %filter; my $result; my @data; my $ar; my %arrays; my %desc; my $manifest; my $package; my @tables; my @conditions; my $number_packages = 0; my $number_results = 0; open_db_simple(); foreach my $ch (@channels) { @data = (); foreach my $job qw(ovoda iskola onkorm ) { foreach my $district qw(district_2 district_12 district_22 ) { %filter = (district => $district, job => $job); $ar = collect_data($ch, \%filter, $type); $arrays{$job}{$district} = $ar; print "\n***Ch: $ch, Job: $job, District: $district\n"; $manifest = { ch => $ch, job => $job, district => $distric +t, type => $type, table => "cat" }; $package = { manifest => $manifest, ar => $ar }; $input_queue->enqueue(nfreeze($package)); $number_package +s++; } } print "Appending table $ch\_all_cat\n"; @data = (); foreach my $job qw(ovoda iskola onkorm ) { $ar = []; foreach my $district qw(district_2 district_12 district_22 ) { push @$ar, @{ $arrays{$job}{$district} }; } print "\n***Ch: $ch, Job: $job\n"; $manifest = { ch => $ch, job => $job, district => "*", type => + $type, table => "sum" }; $package = { manifest => $manifest, ar => $ar }; $input_queue->enqueue(nfreeze($package)); $number_packages++; } foreach my $district qw(district_2 district_12 district_22 ) { $ar = []; foreach my $job qw(ovoda iskola onkorm ) { push @$ar, @{ $arrays{$job}{$district} }; } print "\n***Ch: $ch, District: $district\n"; $manifest = { ch => $ch, job => "*", district => $district, ty +pe => $type, table => "sum" }; $package = { manifest => $manifest, ar => $ar }; $input_queue->enqueue(nfreeze($package)); $number_packages++; } $ar = []; foreach my $job qw(ovoda iskola onkorm ) { foreach my $district qw(district_2 district_12 district_22 ) { + push @$ar, @{ $arrays{$job}{$district} }; } } print "\n***\nCh: $ch, All\n"; $manifest = { ch => $ch, job => "*", district => "*", type => $typ +e, table => "sum" }; $package = { manifest => $manifest, ar => $ar }; $input_queue->enqueue(nfreeze($package)); $number_packages++; print "Appending table $ch\_all_sum\n"; } for (1..$settings{max_threads}){ $input_queue->enqueue("DONE"); } print "Done assigning jobs, waiting...\n"; while (my $to_be_processed = $input_queue->pending() > 0) { sleep(1); } while (my $remaining_results = $result_queue->pending() > 0) { $package = thaw($result_queue->dequeue()); $number_results++; } print "!!! $number_packages packages pushed, $number_results result re +ceived\n"; sleep 1; exit 0; sub work_thread { my ($tid) = shift; my ($work, $result); my $number = 0; my ($ReadFH, $WriteFH); + my $pid = open2($ReadFH, $WriteFH, "R", "--slave"); print $WriteFH "library(splines)\nlibrary(survival)\nlibrary(NADA) +\n"; while (1) { my $work = $input_queue->dequeue(); # If end-of-queue marker reached then cleanup and exit. if ($work eq "DONE"){ close $WriteFH; print "Thread $tid done $number jobs\n"; kill 9, $pid; return 0; } elsif ($work) { $number++; $result = do_R_stat($work, $ReadFH, $WriteFH, $tid); $result_queue->enqueue($result); } } } sub collect_data { my ($ch, $filter, $type) = @_; my @ids; my $dref; foreach my $id (sort keys %db) { my $condition = 1; foreach my $key (keys %$filter) { $condition &= $db{$id}{record}{$key} eq $filter->{$key}; } push @ids, $id if $condition; } my @arr; foreach my $id (@ids) { if ($settings{hungry_hungry_hippo} and exists($frozen_data{$id +})) { $dref = $frozen_data{$id}; print "MEM $id\t"; } else { $dref = retrieve($settings{data_dir}.'/'.$id.$settings{dat +a_ext}); $frozen_data{$id} = $dref; print "DISK $id\t"; } foreach my $i (0..$#$dref) { if ($dref->[$i]{$ch} and ($type eq $dref->[$i]{type} or $t +ype eq 'all')) { push @arr, $dref->[$i]{$ch}; } } } return \@arr; } sub do_R_stat { my $mirelit = shift; my $Read = shift; my $Write = shift; my $tid = shift; my $package = thaw($mirelit); my $arr = $package->{ar}; my $digits = 3; my $dl_nomeas = 0.005; my $dl_fewmeas = 0.01; my $z0 = 3.769911; unless (scalar @$arr) { my %result = ( 'n' => 0, 'max' => 0, 'ros_sd' => 0, 'quantile' => { '0.95' => 0 }, 'n_cen' => 0, 'sd' => 0, 'mean' => 0, 'ros_mean' => 0, #'gsd' => 0, 'gmean' => 0, ) ;# print "FAIL $tid\n"; return nfreeze( { manifest => $package->{manifest}, result => +\%result } ); } print "> Thread $tid starting ch $package->{manifest}{ch}, job $pa +ckage->{manifest}{job}, dist $package->{manifest}{district}, table $p +ackage->{manifest}{table}, size ".(scalar @$arr)."\n"; my $time = time(); my %result; my $res; { local $, = ","; #print 1; print $Write "outcome <- c("; print $Write @$arr[0..19]; print $Write ")\n"; for my $i (0..int($#$arr/20 - 1)) { print $Write "outcome <- c(outcome, "; print $Write @$arr[($i*20)..($i*20+19)]; print $Write ")\n"; } print $Write "outcome <- c(outcome, "; print $Write @$arr[(20*int($#$arr/20))..$#$arr]; print $Write ")\n"; } print $Write "outcome=outcome**2/$z0\n" if $settings{power_density +}; print $Write "cenoutcome=rep(FALSE, length(outcome))\n". "cenoutcome[outcome==min(outcome)]=TRUE\n"; print $Write "sum(cenoutcome)==length(outcome)\n"; my $nomeas = <$Read>; print $Write "sum(cenoutcome)>=length(outcome)-minmeas\n"; my $fewmeas = <$Read>; if ($nomeas =~ /TRUE/) { $result{ros_mean} = $dl_nomeas; $result{ros_sd} = "NA"; $result{n_cen} = 0; $result{quantile}{0.95} = "NA"; } elsif ($fewmeas =~ /TRUE/) { $result{ros_mean} = $dl_fewmeas; $result{ros_sd} = "NA"; print $Write "length(outcome)-sum(cenoutcome)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{n_cen} = $res; $result{quantile}{0.95} = "NA"; } else { print $Write "pyros=cenros(outcome,cenoutcome)\n"; print $Write "mean(pyros)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $res = sqrt($z0 * $res) if $settings{power_density}; $result{ros_mean} = sprintf("%.0".$digits."f", $res); print $Write "sd(pyros)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $res = sqrt($z0 * $res) if $settings{power_density}; $result{ros_sd} = sprintf("%.0".$digits."f", $res); print $Write "quantile(pyros, probs=c(0.95))\n"; $res = <$Read>; $res = <$Read>; chomp $res; chop $res; # a space-t is leszedi +a vegerol $res = sqrt($z0 * $res) if $settings{power_density}; $result{quantile}{0.95} = sprintf("%.0".$digits."f", $res); print $Write "pyros\n"; $res = <$Read>; $res = <$Read>; print $Write "length(outcome)-sum(cenoutcome)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{n_cen} = sprintf("%.01d", $res); } print $Write "outcome=sqrt($z0*outcome)\n" if $settings{power_dens +ity}; print $Write "mean(outcome, na.rm=TRUE)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{mean} = sprintf("%.0".$digits."f", $res); #print 3; print $Write "sd(outcome, na.rm=TRUE)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{sd} = sprintf("%.0".$digits."f", $res); #print 4; print $Write "exp(mean(log(outcome), na.rm=TRUE))\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{gmean} = sprintf("%.0".$digits."f", $res); print $Write "max(outcome)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{max} = sprintf("%.0".$digits."f", $res); #print 7; $result{n} = scalar @$arr; + print "< Thread $tid finished ch $package->{manifest}{ch}, job $pa +ckage->{manifest}{job}, dist $package->{manifest}{district}, table $p +ackage->{manifest}{table} in ".(time()-$time)."sec\n"; return nfreeze( { manifest => $package->{manifest}, result => \%re +sult } ); } sub open_db_simple { local(*DBFN); unless (open(DBFN, '<:encoding(UTF-8)', $settings{db_fn})) { die; } die if check_db_syntax($settings{db_fn}); my $dbcode = do { local( $/ ) ; <DBFN> } ; close DBFN; eval $dbcode; } sub check_db_syntax { my $file = shift; system("perl", "-c", $file); return $? >> 8; }

Replies are listed 'Best First'.
Re^5: Using threads to run multiple external processes at the same time
by BrowserUk (Patriarch) on Sep 04, 2009 at 22:21 UTC

    I too am confused by your results. Right now I have 3 speculative possibilities:

    1. Your cpu isn't a real dual-core, but rather one of those pseudo-dual core hyperthreaded things, and your OS is presenting it as if it were a true dual core.
    2. One or more of the R dlls is non-threadsafe, and they've dodged the issue by using a semaphore to serialise access.

      This seems to be the most likely explanation.

    3. There is something inherent in your implementation that is causing the threads to serialise.

      I haven't been able to spot anything from a fairly extended inspection, but there is rather too much code to comprehensively 'run it in my head'.

      Unfortunately, even if I had the data files, I would still not be able to run it here as IPC::Open2 (nor any of the alternatives), don't work worth a damn on my platform.

    My best suggestion for isolating which (if any) of the above is the problem, is to log the command output of your demo run, split it into 2 and then start two manual R sessions and feed (pipe) half to each and set them both going at the same time and see how long they take.


    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.
      Thanks for looking at it.

      Possibility 1 is out of the question, it's a C2D E7300 - or it's damn good at pretending to be one. :)

      Possibility 3 is also out, I think. I tried to replace the function that talks to R with one that simply calls one of my dumb C Mandelbrot renderers: the 2 threaded version finished in half time, confirming that the CPU is capable of running two processes in a truly parallel way. This also suggests that it is not my implementation that's at fault.

      This leaves possibility 2, which seems to suggest that something fishy is going on with R when it comes to multithreading. Maybe it's as you say that there are two R interpreter instances, but only one backend?

      I'll try to isolate the issue. Thanks again.
      It seems that you were right.

      I found something relevant on the R mailing lists:

      "> Specifically if it is possible to ask R to run a given R-program > from withing a posix thread (on linux) without providing a Mutex tha +t > would serialise access to R process. No. You need to make sure that only one thread calls R, which means ha +ving some sort of handler to queue the commands."

      This means that the threaded approach is useless. Back to square one.
        This means that the threaded approach is useless.

        Hm. I'm not sure that is true.

        It's unclear to me from the 3 posts in that thread whether they are talking about talking to multiple processes from different threads--as you are trying to do--or whether they are talking about talking to R.dll from multiple threads when embeding R in a C/C++ program.

        At one point the OP talks of "calling R", at another "the R process". And most of the "threads" discussion by the 2 experts seems to be talking about threading R internally--ie. within a single R process--rather than having two process instances running concurrently.

        I remember many of the dlls in OS/2 v1.x were inherently thread-unsafe, mostly because they were written in C by ex-COBOL programmers who hadn't quite gotten over the 'static data section' way of thinking. But I didn't think anyone still coding for a living was still doing stuff like that.

        By far the simplest way of verifying this would be run something in each of two concurrent interactive sessions that takes an appreciable amount of time--a minute or two--and see if the time is overlapped or serialised. I have two Rgui sessions running now, but I don;t know enough about R to come up with something that doesn't complete instantaneously :(


        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.