Sorry, I didn't have time to clean it down to a minimal example, so here it is in its (almost) full horribleness.
It won't run for you, of course, not without the proper data files. I'll paste some excerpts from the output:
$ time perl maketables_threads_broken_minimal.pl 2> dev/null
Spawning threads... 1 statistics threads started
[snip]
> Thread 1 starting ch ch_fm, job *, dist district_2, table sum, size
+63223
< Thread 1 finished ch ch_fm, job *, dist district_2, table sum in 11s
+ec
> Thread 1 starting ch ch_fm, job *, dist district_12, table sum, size
+ 77721
< Thread 1 finished ch ch_fm, job *, dist district_12, table sum in 17
+sec
> Thread 1 starting ch ch_fm, job *, dist district_22, table sum, size
+ 54784
< Thread 1 finished ch ch_fm, job *, dist district_22, table sum in 8s
+ec
> Thread 1 starting ch ch_fm, job *, dist district_22, table sum, size
+ 54784
< Thread 1 finished ch ch_fm, job *, dist district_22, table sum in 8s
+ec
> Thread 1 starting ch ch_fm, job *, dist *, table sum, size 195728
< Thread 1 finished ch ch_fm, job *, dist *, table sum in 112sec
[snip]
Thread 1 done 16 jobs
!!! 16 packages pushed, 16 result received
real 3m39.706s
user 0m5.948s
sys 0m0.964s
$ time perl maketables_threads_broken_minimal.pl 2> dev/null
Spawning threads... 2 statistics threads started
[snip]
> Thread 2 starting ch ch_fm, job *, dist district_2, table sum, size
+63223
< Thread 1 finished ch ch_fm, job onkorm, dist *, table sum in 48sec
> Thread 1 starting ch ch_fm, job *, dist district_12, table sum, size
+ 77721
< Thread 2 finished ch ch_fm, job *, dist district_2, table sum in 19s
+ec
> Thread 2 starting ch ch_fm, job *, dist district_22, table sum, size
+ 54784
< Thread 2 finished ch ch_fm, job *, dist district_22, table sum in 15
+sec
> Thread 2 starting ch ch_fm, job *, dist *, table sum, size 195728
< Thread 1 finished ch ch_fm, job *, dist district_12, table sum in 34
+sec
Thread 1 done 10 jobs
< Thread 2 finished ch ch_fm, job *, dist *, table sum in 113sec
Thread 2 done 6 jobs
!!! 16 packages pushed, 16 result received
real 3m35.557s
user 0m6.100s
sys 0m1.008s
It seems that the R process needs twice the time to do the exact same thing when there are two instances of it.
Update:Most of the processing time is spent by these commands:
print $Write "pyros=cenros(outcome,cenoutcome)\n";
print $Write "mean(pyros)\n";
$res = <$Read>;
Data subsetting and enqueuing takes about 7 seconds out of the 3 min 36 s runtime.
The code:
#!/usr/bin/perl
use Storable qw(retrieve nfreeze thaw );
use Data::Dumper;
use IPC::Open2;
use threads;
#use threads::shared;
use Thread::Queue;
use strict;
use warnings;
use utf8;
my %settings;
my %db;
#my @data;
my %frozen_data;
#my $dref;
my @canned_texts;
$|++;
$settings{db_fn} = '/home/kikuchiyo/meresek/t-mobil_exp/t-mobil.dbt';
$settings{data_dir} = '/home/kikuchiyo/meresek/t-mobil_exp/data';
$settings{data_ext} = ".dat";
$settings{hungry_hungry_hippo} = 1;
$settings{tables_fn} = 'tables.tex';
$settings{power_density} = 1;
$settings{max_threads} = 2;
my $input_queue = Thread::Queue->new();
my $result_queue = Thread::Queue->new();
my @channels = qw(ch_fm);
print "Spawning threads... ";
for (1..$settings{max_threads}) {
threads->create(\&work_thread, $_)->detach();
}
print "$settings{max_threads} statistics threads started\n";
my $type = 'all'; # O, M, U, E, A, all
my %filter;
my $result;
my @data;
my $ar;
my %arrays;
my %desc;
my $manifest;
my $package;
my @tables;
my @conditions;
my $number_packages = 0;
my $number_results = 0;
open_db_simple();
foreach my $ch (@channels) {
@data = ();
foreach my $job qw(ovoda iskola onkorm ) {
foreach my $district qw(district_2 district_12 district_22 ) {
%filter = (district => $district, job => $job);
$ar = collect_data($ch, \%filter, $type);
$arrays{$job}{$district} = $ar;
print "\n***Ch: $ch, Job: $job, District: $district\n";
$manifest = { ch => $ch, job => $job, district => $distric
+t, type => $type, table => "cat" };
$package = { manifest => $manifest, ar => $ar };
$input_queue->enqueue(nfreeze($package)); $number_package
+s++;
}
}
print "Appending table $ch\_all_cat\n";
@data = ();
foreach my $job qw(ovoda iskola onkorm ) {
$ar = [];
foreach my $district qw(district_2 district_12 district_22 ) {
push @$ar, @{ $arrays{$job}{$district} };
}
print "\n***Ch: $ch, Job: $job\n";
$manifest = { ch => $ch, job => $job, district => "*", type =>
+ $type, table => "sum" };
$package = { manifest => $manifest, ar => $ar };
$input_queue->enqueue(nfreeze($package)); $number_packages++;
}
foreach my $district qw(district_2 district_12 district_22 ) {
$ar = [];
foreach my $job qw(ovoda iskola onkorm ) {
push @$ar, @{ $arrays{$job}{$district} };
}
print "\n***Ch: $ch, District: $district\n";
$manifest = { ch => $ch, job => "*", district => $district, ty
+pe => $type, table => "sum" };
$package = { manifest => $manifest, ar => $ar };
$input_queue->enqueue(nfreeze($package)); $number_packages++;
}
$ar = [];
foreach my $job qw(ovoda iskola onkorm ) {
foreach my $district qw(district_2 district_12 district_22 ) {
+
push @$ar, @{ $arrays{$job}{$district} };
}
}
print "\n***\nCh: $ch, All\n";
$manifest = { ch => $ch, job => "*", district => "*", type => $typ
+e, table => "sum" };
$package = { manifest => $manifest, ar => $ar };
$input_queue->enqueue(nfreeze($package)); $number_packages++;
print "Appending table $ch\_all_sum\n";
}
for (1..$settings{max_threads}){
$input_queue->enqueue("DONE");
}
print "Done assigning jobs, waiting...\n";
while (my $to_be_processed = $input_queue->pending() > 0) {
sleep(1);
}
while (my $remaining_results = $result_queue->pending() > 0) {
$package = thaw($result_queue->dequeue());
$number_results++;
}
print "!!! $number_packages packages pushed, $number_results result re
+ceived\n";
sleep 1;
exit 0;
sub work_thread {
my ($tid) = shift;
my ($work, $result);
my $number = 0;
my ($ReadFH, $WriteFH);
+
my $pid = open2($ReadFH, $WriteFH, "R", "--slave");
print $WriteFH "library(splines)\nlibrary(survival)\nlibrary(NADA)
+\n";
while (1) {
my $work = $input_queue->dequeue();
# If end-of-queue marker reached then cleanup and exit.
if ($work eq "DONE"){
close $WriteFH;
print "Thread $tid done $number jobs\n";
kill 9, $pid;
return 0;
} elsif ($work) {
$number++;
$result = do_R_stat($work, $ReadFH, $WriteFH, $tid);
$result_queue->enqueue($result);
}
}
}
sub collect_data {
my ($ch, $filter, $type) = @_;
my @ids;
my $dref;
foreach my $id (sort keys %db) {
my $condition = 1;
foreach my $key (keys %$filter) {
$condition &= $db{$id}{record}{$key} eq $filter->{$key};
}
push @ids, $id if $condition;
}
my @arr;
foreach my $id (@ids) {
if ($settings{hungry_hungry_hippo} and exists($frozen_data{$id
+})) {
$dref = $frozen_data{$id};
print "MEM $id\t";
} else {
$dref = retrieve($settings{data_dir}.'/'.$id.$settings{dat
+a_ext});
$frozen_data{$id} = $dref;
print "DISK $id\t";
}
foreach my $i (0..$#$dref) {
if ($dref->[$i]{$ch} and ($type eq $dref->[$i]{type} or $t
+ype eq 'all')) {
push @arr, $dref->[$i]{$ch};
}
}
}
return \@arr;
}
sub do_R_stat {
my $mirelit = shift;
my $Read = shift;
my $Write = shift;
my $tid = shift;
my $package = thaw($mirelit);
my $arr = $package->{ar};
my $digits = 3;
my $dl_nomeas = 0.005;
my $dl_fewmeas = 0.01;
my $z0 = 3.769911;
unless (scalar @$arr) {
my %result = (
'n' => 0,
'max' => 0,
'ros_sd' => 0,
'quantile' => {
'0.95' => 0
},
'n_cen' => 0,
'sd' => 0,
'mean' => 0,
'ros_mean' => 0,
#'gsd' => 0,
'gmean' => 0,
) ;# print "FAIL $tid\n";
return nfreeze( { manifest => $package->{manifest}, result =>
+\%result } );
}
print "> Thread $tid starting ch $package->{manifest}{ch}, job $pa
+ckage->{manifest}{job}, dist $package->{manifest}{district}, table $p
+ackage->{manifest}{table}, size ".(scalar @$arr)."\n";
my $time = time();
my %result;
my $res;
{
local $, = ",";
#print 1;
print $Write "outcome <- c(";
print $Write @$arr[0..19];
print $Write ")\n";
for my $i (0..int($#$arr/20 - 1)) {
print $Write "outcome <- c(outcome, ";
print $Write @$arr[($i*20)..($i*20+19)];
print $Write ")\n";
}
print $Write "outcome <- c(outcome, ";
print $Write @$arr[(20*int($#$arr/20))..$#$arr];
print $Write ")\n";
}
print $Write "outcome=outcome**2/$z0\n" if $settings{power_density
+};
print $Write "cenoutcome=rep(FALSE, length(outcome))\n".
"cenoutcome[outcome==min(outcome)]=TRUE\n";
print $Write "sum(cenoutcome)==length(outcome)\n";
my $nomeas = <$Read>;
print $Write "sum(cenoutcome)>=length(outcome)-minmeas\n";
my $fewmeas = <$Read>;
if ($nomeas =~ /TRUE/) {
$result{ros_mean} = $dl_nomeas;
$result{ros_sd} = "NA";
$result{n_cen} = 0;
$result{quantile}{0.95} = "NA";
} elsif ($fewmeas =~ /TRUE/) {
$result{ros_mean} = $dl_fewmeas;
$result{ros_sd} = "NA";
print $Write "length(outcome)-sum(cenoutcome)\n";
$res = <$Read>; chomp $res;
$res =~ s/^\[1\] //;
$result{n_cen} = $res;
$result{quantile}{0.95} = "NA";
} else {
print $Write "pyros=cenros(outcome,cenoutcome)\n";
print $Write "mean(pyros)\n";
$res = <$Read>; chomp $res;
$res =~ s/^\[1\] //;
$res = sqrt($z0 * $res) if $settings{power_density};
$result{ros_mean} = sprintf("%.0".$digits."f", $res);
print $Write "sd(pyros)\n";
$res = <$Read>; chomp $res;
$res =~ s/^\[1\] //;
$res = sqrt($z0 * $res) if $settings{power_density};
$result{ros_sd} = sprintf("%.0".$digits."f", $res);
print $Write "quantile(pyros, probs=c(0.95))\n";
$res = <$Read>;
$res = <$Read>; chomp $res; chop $res; # a space-t is leszedi
+a vegerol
$res = sqrt($z0 * $res) if $settings{power_density};
$result{quantile}{0.95} = sprintf("%.0".$digits."f", $res);
print $Write "pyros\n";
$res = <$Read>;
$res = <$Read>;
print $Write "length(outcome)-sum(cenoutcome)\n";
$res = <$Read>; chomp $res;
$res =~ s/^\[1\] //;
$result{n_cen} = sprintf("%.01d", $res);
}
print $Write "outcome=sqrt($z0*outcome)\n" if $settings{power_dens
+ity};
print $Write "mean(outcome, na.rm=TRUE)\n";
$res = <$Read>; chomp $res;
$res =~ s/^\[1\] //;
$result{mean} = sprintf("%.0".$digits."f", $res);
#print 3;
print $Write "sd(outcome, na.rm=TRUE)\n";
$res = <$Read>; chomp $res;
$res =~ s/^\[1\] //;
$result{sd} = sprintf("%.0".$digits."f", $res);
#print 4;
print $Write "exp(mean(log(outcome), na.rm=TRUE))\n";
$res = <$Read>; chomp $res;
$res =~ s/^\[1\] //;
$result{gmean} = sprintf("%.0".$digits."f", $res);
print $Write "max(outcome)\n";
$res = <$Read>; chomp $res;
$res =~ s/^\[1\] //;
$result{max} = sprintf("%.0".$digits."f", $res);
#print 7;
$result{n} = scalar @$arr;
+
print "< Thread $tid finished ch $package->{manifest}{ch}, job $pa
+ckage->{manifest}{job}, dist $package->{manifest}{district}, table $p
+ackage->{manifest}{table} in ".(time()-$time)."sec\n";
return nfreeze( { manifest => $package->{manifest}, result => \%re
+sult } );
}
sub open_db_simple {
local(*DBFN);
unless (open(DBFN, '<:encoding(UTF-8)', $settings{db_fn})) {
die;
}
die if check_db_syntax($settings{db_fn});
my $dbcode = do { local( $/ ) ; <DBFN> } ;
close DBFN;
eval $dbcode;
}
sub check_db_syntax {
my $file = shift;
system("perl", "-c", $file);
return $? >> 8;
}
|