Sorry, I didn't have time to clean it down to a minimal example, so here it is in its (almost) full horribleness.

It won't run for you, of course, not without the proper data files. I'll paste some excerpts from the output:

$ time perl maketables_threads_broken_minimal.pl 2> dev/null Spawning threads... 1 statistics threads started [snip] > Thread 1 starting ch ch_fm, job *, dist district_2, table sum, size +63223 < Thread 1 finished ch ch_fm, job *, dist district_2, table sum in 11s +ec > Thread 1 starting ch ch_fm, job *, dist district_12, table sum, size + 77721 < Thread 1 finished ch ch_fm, job *, dist district_12, table sum in 17 +sec > Thread 1 starting ch ch_fm, job *, dist district_22, table sum, size + 54784 < Thread 1 finished ch ch_fm, job *, dist district_22, table sum in 8s +ec > Thread 1 starting ch ch_fm, job *, dist district_22, table sum, size + 54784 < Thread 1 finished ch ch_fm, job *, dist district_22, table sum in 8s +ec > Thread 1 starting ch ch_fm, job *, dist *, table sum, size 195728 < Thread 1 finished ch ch_fm, job *, dist *, table sum in 112sec [snip] Thread 1 done 16 jobs !!! 16 packages pushed, 16 result received real 3m39.706s user 0m5.948s sys 0m0.964s $ time perl maketables_threads_broken_minimal.pl 2> dev/null Spawning threads... 2 statistics threads started [snip] > Thread 2 starting ch ch_fm, job *, dist district_2, table sum, size +63223 < Thread 1 finished ch ch_fm, job onkorm, dist *, table sum in 48sec > Thread 1 starting ch ch_fm, job *, dist district_12, table sum, size + 77721 < Thread 2 finished ch ch_fm, job *, dist district_2, table sum in 19s +ec > Thread 2 starting ch ch_fm, job *, dist district_22, table sum, size + 54784 < Thread 2 finished ch ch_fm, job *, dist district_22, table sum in 15 +sec > Thread 2 starting ch ch_fm, job *, dist *, table sum, size 195728 < Thread 1 finished ch ch_fm, job *, dist district_12, table sum in 34 +sec Thread 1 done 10 jobs < Thread 2 finished ch ch_fm, job *, dist *, table sum in 113sec Thread 2 done 6 jobs !!! 16 packages pushed, 16 result received real 3m35.557s user 0m6.100s sys 0m1.008s

It seems that the R process needs twice the time to do the exact same thing when there are two instances of it.

Update:Most of the processing time is spent by these commands:
print $Write "pyros=cenros(outcome,cenoutcome)\n"; print $Write "mean(pyros)\n"; $res = <$Read>;
Data subsetting and enqueuing takes about 7 seconds out of the 3 min 36 s runtime.

The code:
#!/usr/bin/perl use Storable qw(retrieve nfreeze thaw ); use Data::Dumper; use IPC::Open2; use threads; #use threads::shared; use Thread::Queue; use strict; use warnings; use utf8; my %settings; my %db; #my @data; my %frozen_data; #my $dref; my @canned_texts; $|++; $settings{db_fn} = '/home/kikuchiyo/meresek/t-mobil_exp/t-mobil.dbt'; $settings{data_dir} = '/home/kikuchiyo/meresek/t-mobil_exp/data'; $settings{data_ext} = ".dat"; $settings{hungry_hungry_hippo} = 1; $settings{tables_fn} = 'tables.tex'; $settings{power_density} = 1; $settings{max_threads} = 2; my $input_queue = Thread::Queue->new(); my $result_queue = Thread::Queue->new(); my @channels = qw(ch_fm); print "Spawning threads... "; for (1..$settings{max_threads}) { threads->create(\&work_thread, $_)->detach(); } print "$settings{max_threads} statistics threads started\n"; my $type = 'all'; # O, M, U, E, A, all my %filter; my $result; my @data; my $ar; my %arrays; my %desc; my $manifest; my $package; my @tables; my @conditions; my $number_packages = 0; my $number_results = 0; open_db_simple(); foreach my $ch (@channels) { @data = (); foreach my $job qw(ovoda iskola onkorm ) { foreach my $district qw(district_2 district_12 district_22 ) { %filter = (district => $district, job => $job); $ar = collect_data($ch, \%filter, $type); $arrays{$job}{$district} = $ar; print "\n***Ch: $ch, Job: $job, District: $district\n"; $manifest = { ch => $ch, job => $job, district => $distric +t, type => $type, table => "cat" }; $package = { manifest => $manifest, ar => $ar }; $input_queue->enqueue(nfreeze($package)); $number_package +s++; } } print "Appending table $ch\_all_cat\n"; @data = (); foreach my $job qw(ovoda iskola onkorm ) { $ar = []; foreach my $district qw(district_2 district_12 district_22 ) { push @$ar, @{ $arrays{$job}{$district} }; } print "\n***Ch: $ch, Job: $job\n"; $manifest = { ch => $ch, job => $job, district => "*", type => + $type, table => "sum" }; $package = { manifest => $manifest, ar => $ar }; $input_queue->enqueue(nfreeze($package)); $number_packages++; } foreach my $district qw(district_2 district_12 district_22 ) { $ar = []; foreach my $job qw(ovoda iskola onkorm ) { push @$ar, @{ $arrays{$job}{$district} }; } print "\n***Ch: $ch, District: $district\n"; $manifest = { ch => $ch, job => "*", district => $district, ty +pe => $type, table => "sum" }; $package = { manifest => $manifest, ar => $ar }; $input_queue->enqueue(nfreeze($package)); $number_packages++; } $ar = []; foreach my $job qw(ovoda iskola onkorm ) { foreach my $district qw(district_2 district_12 district_22 ) { + push @$ar, @{ $arrays{$job}{$district} }; } } print "\n***\nCh: $ch, All\n"; $manifest = { ch => $ch, job => "*", district => "*", type => $typ +e, table => "sum" }; $package = { manifest => $manifest, ar => $ar }; $input_queue->enqueue(nfreeze($package)); $number_packages++; print "Appending table $ch\_all_sum\n"; } for (1..$settings{max_threads}){ $input_queue->enqueue("DONE"); } print "Done assigning jobs, waiting...\n"; while (my $to_be_processed = $input_queue->pending() > 0) { sleep(1); } while (my $remaining_results = $result_queue->pending() > 0) { $package = thaw($result_queue->dequeue()); $number_results++; } print "!!! $number_packages packages pushed, $number_results result re +ceived\n"; sleep 1; exit 0; sub work_thread { my ($tid) = shift; my ($work, $result); my $number = 0; my ($ReadFH, $WriteFH); + my $pid = open2($ReadFH, $WriteFH, "R", "--slave"); print $WriteFH "library(splines)\nlibrary(survival)\nlibrary(NADA) +\n"; while (1) { my $work = $input_queue->dequeue(); # If end-of-queue marker reached then cleanup and exit. if ($work eq "DONE"){ close $WriteFH; print "Thread $tid done $number jobs\n"; kill 9, $pid; return 0; } elsif ($work) { $number++; $result = do_R_stat($work, $ReadFH, $WriteFH, $tid); $result_queue->enqueue($result); } } } sub collect_data { my ($ch, $filter, $type) = @_; my @ids; my $dref; foreach my $id (sort keys %db) { my $condition = 1; foreach my $key (keys %$filter) { $condition &= $db{$id}{record}{$key} eq $filter->{$key}; } push @ids, $id if $condition; } my @arr; foreach my $id (@ids) { if ($settings{hungry_hungry_hippo} and exists($frozen_data{$id +})) { $dref = $frozen_data{$id}; print "MEM $id\t"; } else { $dref = retrieve($settings{data_dir}.'/'.$id.$settings{dat +a_ext}); $frozen_data{$id} = $dref; print "DISK $id\t"; } foreach my $i (0..$#$dref) { if ($dref->[$i]{$ch} and ($type eq $dref->[$i]{type} or $t +ype eq 'all')) { push @arr, $dref->[$i]{$ch}; } } } return \@arr; } sub do_R_stat { my $mirelit = shift; my $Read = shift; my $Write = shift; my $tid = shift; my $package = thaw($mirelit); my $arr = $package->{ar}; my $digits = 3; my $dl_nomeas = 0.005; my $dl_fewmeas = 0.01; my $z0 = 3.769911; unless (scalar @$arr) { my %result = ( 'n' => 0, 'max' => 0, 'ros_sd' => 0, 'quantile' => { '0.95' => 0 }, 'n_cen' => 0, 'sd' => 0, 'mean' => 0, 'ros_mean' => 0, #'gsd' => 0, 'gmean' => 0, ) ;# print "FAIL $tid\n"; return nfreeze( { manifest => $package->{manifest}, result => +\%result } ); } print "> Thread $tid starting ch $package->{manifest}{ch}, job $pa +ckage->{manifest}{job}, dist $package->{manifest}{district}, table $p +ackage->{manifest}{table}, size ".(scalar @$arr)."\n"; my $time = time(); my %result; my $res; { local $, = ","; #print 1; print $Write "outcome <- c("; print $Write @$arr[0..19]; print $Write ")\n"; for my $i (0..int($#$arr/20 - 1)) { print $Write "outcome <- c(outcome, "; print $Write @$arr[($i*20)..($i*20+19)]; print $Write ")\n"; } print $Write "outcome <- c(outcome, "; print $Write @$arr[(20*int($#$arr/20))..$#$arr]; print $Write ")\n"; } print $Write "outcome=outcome**2/$z0\n" if $settings{power_density +}; print $Write "cenoutcome=rep(FALSE, length(outcome))\n". "cenoutcome[outcome==min(outcome)]=TRUE\n"; print $Write "sum(cenoutcome)==length(outcome)\n"; my $nomeas = <$Read>; print $Write "sum(cenoutcome)>=length(outcome)-minmeas\n"; my $fewmeas = <$Read>; if ($nomeas =~ /TRUE/) { $result{ros_mean} = $dl_nomeas; $result{ros_sd} = "NA"; $result{n_cen} = 0; $result{quantile}{0.95} = "NA"; } elsif ($fewmeas =~ /TRUE/) { $result{ros_mean} = $dl_fewmeas; $result{ros_sd} = "NA"; print $Write "length(outcome)-sum(cenoutcome)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{n_cen} = $res; $result{quantile}{0.95} = "NA"; } else { print $Write "pyros=cenros(outcome,cenoutcome)\n"; print $Write "mean(pyros)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $res = sqrt($z0 * $res) if $settings{power_density}; $result{ros_mean} = sprintf("%.0".$digits."f", $res); print $Write "sd(pyros)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $res = sqrt($z0 * $res) if $settings{power_density}; $result{ros_sd} = sprintf("%.0".$digits."f", $res); print $Write "quantile(pyros, probs=c(0.95))\n"; $res = <$Read>; $res = <$Read>; chomp $res; chop $res; # a space-t is leszedi +a vegerol $res = sqrt($z0 * $res) if $settings{power_density}; $result{quantile}{0.95} = sprintf("%.0".$digits."f", $res); print $Write "pyros\n"; $res = <$Read>; $res = <$Read>; print $Write "length(outcome)-sum(cenoutcome)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{n_cen} = sprintf("%.01d", $res); } print $Write "outcome=sqrt($z0*outcome)\n" if $settings{power_dens +ity}; print $Write "mean(outcome, na.rm=TRUE)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{mean} = sprintf("%.0".$digits."f", $res); #print 3; print $Write "sd(outcome, na.rm=TRUE)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{sd} = sprintf("%.0".$digits."f", $res); #print 4; print $Write "exp(mean(log(outcome), na.rm=TRUE))\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{gmean} = sprintf("%.0".$digits."f", $res); print $Write "max(outcome)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{max} = sprintf("%.0".$digits."f", $res); #print 7; $result{n} = scalar @$arr; + print "< Thread $tid finished ch $package->{manifest}{ch}, job $pa +ckage->{manifest}{job}, dist $package->{manifest}{district}, table $p +ackage->{manifest}{table} in ".(time()-$time)."sec\n"; return nfreeze( { manifest => $package->{manifest}, result => \%re +sult } ); } sub open_db_simple { local(*DBFN); unless (open(DBFN, '<:encoding(UTF-8)', $settings{db_fn})) { die; } die if check_db_syntax($settings{db_fn}); my $dbcode = do { local( $/ ) ; <DBFN> } ; close DBFN; eval $dbcode; } sub check_db_syntax { my $file = shift; system("perl", "-c", $file); return $? >> 8; }


In reply to Re^4: Using threads to run multiple external processes at the same time by kikuchiyo
in thread Using threads to run multiple external processes at the same time by Anonymous Monk

Title:
Use:  <p> text here (a paragraph) </p>
and:  <code> code here </code>
to format your post, it's "PerlMonks-approved HTML":



  • Posts are HTML formatted. Put <p> </p> tags around your paragraphs. Put <code> </code> tags around your code and data!
  • Titles consisting of a single word are discouraged, and in most cases are disallowed outright.
  • Read Where should I post X? if you're not absolutely sure you're posting in the right place.
  • Please read these before you post! —
  • Posts may use any of the Perl Monks Approved HTML tags:
    a, abbr, b, big, blockquote, br, caption, center, col, colgroup, dd, del, details, div, dl, dt, em, font, h1, h2, h3, h4, h5, h6, hr, i, ins, li, ol, p, pre, readmore, small, span, spoiler, strike, strong, sub, summary, sup, table, tbody, td, tfoot, th, thead, tr, tt, u, ul, wbr
  • You may need to use entities for some characters, as follows. (Exception: Within code tags, you can put the characters literally.)
            For:     Use:
    & &amp;
    < &lt;
    > &gt;
    [ &#91;
    ] &#93;
  • Link using PerlMonks shortcuts! What shortcuts can I use for linking?
  • See Writeup Formatting Tips and other pages linked from there for more info.