$ time perl maketables_threads_broken_minimal.pl 2> dev/null
Spawning threads... 1 statistics threads started
[snip]
> Thread 1 starting ch ch_fm, job *, dist district_2, table sum, size 63223
< Thread 1 finished ch ch_fm, job *, dist district_2, table sum in 11sec
> Thread 1 starting ch ch_fm, job *, dist district_12, table sum, size 77721
< Thread 1 finished ch ch_fm, job *, dist district_12, table sum in 17sec
> Thread 1 starting ch ch_fm, job *, dist district_22, table sum, size 54784
< Thread 1 finished ch ch_fm, job *, dist district_22, table sum in 8sec
> Thread 1 starting ch ch_fm, job *, dist district_22, table sum, size 54784
< Thread 1 finished ch ch_fm, job *, dist district_22, table sum in 8sec
> Thread 1 starting ch ch_fm, job *, dist *, table sum, size 195728
< Thread 1 finished ch ch_fm, job *, dist *, table sum in 112sec
[snip]
Thread 1 done 16 jobs
!!! 16 packages pushed, 16 result received
real 3m39.706s
user 0m5.948s
sys 0m0.964s
$ time perl maketables_threads_broken_minimal.pl 2> dev/null
Spawning threads... 2 statistics threads started
[snip]
> Thread 2 starting ch ch_fm, job *, dist district_2, table sum, size 63223
< Thread 1 finished ch ch_fm, job onkorm, dist *, table sum in 48sec
> Thread 1 starting ch ch_fm, job *, dist district_12, table sum, size 77721
< Thread 2 finished ch ch_fm, job *, dist district_2, table sum in 19sec
> Thread 2 starting ch ch_fm, job *, dist district_22, table sum, size 54784
< Thread 2 finished ch ch_fm, job *, dist district_22, table sum in 15sec
> Thread 2 starting ch ch_fm, job *, dist *, table sum, size 195728
< Thread 1 finished ch ch_fm, job *, dist district_12, table sum in 34sec
Thread 1 done 10 jobs
< Thread 2 finished ch ch_fm, job *, dist *, table sum in 113sec
Thread 2 done 6 jobs
!!! 16 packages pushed, 16 result received
real 3m35.557s
user 0m6.100s
sys 0m1.008s
####
print $Write "pyros=cenros(outcome,cenoutcome)\n";
print $Write "mean(pyros)\n";
$res = <$Read>;
####
#!/usr/bin/perl
use Storable qw(retrieve nfreeze thaw );
use Data::Dumper;
use IPC::Open2;
use threads;
#use threads::shared;
use Thread::Queue;
use strict;
use warnings;
use utf8;
my %settings;
my %db;
#my @data;
my %frozen_data;
#my $dref;
my @canned_texts;
$|++;
$settings{db_fn} = '/home/kikuchiyo/meresek/t-mobil_exp/t-mobil.dbt';
$settings{data_dir} = '/home/kikuchiyo/meresek/t-mobil_exp/data';
$settings{data_ext} = ".dat";
$settings{hungry_hungry_hippo} = 1;
$settings{tables_fn} = 'tables.tex';
$settings{power_density} = 1;
$settings{max_threads} = 2;
my $input_queue = Thread::Queue->new();
my $result_queue = Thread::Queue->new();
my @channels = qw(ch_fm);
print "Spawning threads... ";
for (1..$settings{max_threads}) {
threads->create(\&work_thread, $_)->detach();
}
print "$settings{max_threads} statistics threads started\n";
my $type = 'all'; # O, M, U, E, A, all
my %filter;
my $result;
my @data;
my $ar;
my %arrays;
my %desc;
my $manifest;
my $package;
my @tables;
my @conditions;
my $number_packages = 0;
my $number_results = 0;
open_db_simple();
foreach my $ch (@channels) {
@data = ();
foreach my $job qw(ovoda iskola onkorm ) {
foreach my $district qw(district_2 district_12 district_22 ) {
%filter = (district => $district, job => $job);
$ar = collect_data($ch, \%filter, $type);
$arrays{$job}{$district} = $ar;
print "\n***Ch: $ch, Job: $job, District: $district\n";
$manifest = { ch => $ch, job => $job, district => $district, type => $type, table => "cat" };
$package = { manifest => $manifest, ar => $ar };
$input_queue->enqueue(nfreeze($package)); $number_packages++;
}
}
print "Appending table $ch\_all_cat\n";
@data = ();
foreach my $job qw(ovoda iskola onkorm ) {
$ar = [];
foreach my $district qw(district_2 district_12 district_22 ) {
push @$ar, @{ $arrays{$job}{$district} };
}
print "\n***Ch: $ch, Job: $job\n";
$manifest = { ch => $ch, job => $job, district => "*", type => $type, table => "sum" };
$package = { manifest => $manifest, ar => $ar };
$input_queue->enqueue(nfreeze($package)); $number_packages++;
}
foreach my $district qw(district_2 district_12 district_22 ) {
$ar = [];
foreach my $job qw(ovoda iskola onkorm ) {
push @$ar, @{ $arrays{$job}{$district} };
}
print "\n***Ch: $ch, District: $district\n";
$manifest = { ch => $ch, job => "*", district => $district, type => $type, table => "sum" };
$package = { manifest => $manifest, ar => $ar };
$input_queue->enqueue(nfreeze($package)); $number_packages++;
}
$ar = [];
foreach my $job qw(ovoda iskola onkorm ) {
foreach my $district qw(district_2 district_12 district_22 ) {
push @$ar, @{ $arrays{$job}{$district} };
}
}
print "\n***\nCh: $ch, All\n";
$manifest = { ch => $ch, job => "*", district => "*", type => $type, table => "sum" };
$package = { manifest => $manifest, ar => $ar };
$input_queue->enqueue(nfreeze($package)); $number_packages++;
print "Appending table $ch\_all_sum\n";
}
for (1..$settings{max_threads}){
$input_queue->enqueue("DONE");
}
print "Done assigning jobs, waiting...\n";
while (my $to_be_processed = $input_queue->pending() > 0) {
sleep(1);
}
while (my $remaining_results = $result_queue->pending() > 0) {
$package = thaw($result_queue->dequeue());
$number_results++;
}
print "!!! $number_packages packages pushed, $number_results result received\n";
sleep 1;
exit 0;
sub work_thread {
my ($tid) = shift;
my ($work, $result);
my $number = 0;
my ($ReadFH, $WriteFH);
my $pid = open2($ReadFH, $WriteFH, "R", "--slave");
print $WriteFH "library(splines)\nlibrary(survival)\nlibrary(NADA)\n";
while (1) {
my $work = $input_queue->dequeue();
# If end-of-queue marker reached then cleanup and exit.
if ($work eq "DONE"){
close $WriteFH;
print "Thread $tid done $number jobs\n";
kill 9, $pid;
return 0;
} elsif ($work) {
$number++;
$result = do_R_stat($work, $ReadFH, $WriteFH, $tid);
$result_queue->enqueue($result);
}
}
}
sub collect_data {
my ($ch, $filter, $type) = @_;
my @ids;
my $dref;
foreach my $id (sort keys %db) {
my $condition = 1;
foreach my $key (keys %$filter) {
$condition &= $db{$id}{record}{$key} eq $filter->{$key};
}
push @ids, $id if $condition;
}
my @arr;
foreach my $id (@ids) {
if ($settings{hungry_hungry_hippo} and exists($frozen_data{$id})) {
$dref = $frozen_data{$id};
print "MEM $id\t";
} else {
$dref = retrieve($settings{data_dir}.'/'.$id.$settings{data_ext});
$frozen_data{$id} = $dref;
print "DISK $id\t";
}
foreach my $i (0..$#$dref) {
if ($dref->[$i]{$ch} and ($type eq $dref->[$i]{type} or $type eq 'all')) {
push @arr, $dref->[$i]{$ch};
}
}
}
return \@arr;
}
sub do_R_stat {
my $mirelit = shift;
my $Read = shift;
my $Write = shift;
my $tid = shift;
my $package = thaw($mirelit);
my $arr = $package->{ar};
my $digits = 3;
my $dl_nomeas = 0.005;
my $dl_fewmeas = 0.01;
my $z0 = 3.769911;
unless (scalar @$arr) {
my %result = (
'n' => 0,
'max' => 0,
'ros_sd' => 0,
'quantile' => {
'0.95' => 0
},
'n_cen' => 0,
'sd' => 0,
'mean' => 0,
'ros_mean' => 0,
#'gsd' => 0,
'gmean' => 0,
) ;# print "FAIL $tid\n";
return nfreeze( { manifest => $package->{manifest}, result => \%result } );
}
print "> Thread $tid starting ch $package->{manifest}{ch}, job $package->{manifest}{job}, dist $package->{manifest}{district}, table $package->{manifest}{table}, size ".(scalar @$arr)."\n";
my $time = time();
my %result;
my $res;
{
local $, = ",";
#print 1;
print $Write "outcome <- c(";
print $Write @$arr[0..19];
print $Write ")\n";
for my $i (0..int($#$arr/20 - 1)) {
print $Write "outcome <- c(outcome, ";
print $Write @$arr[($i*20)..($i*20+19)];
print $Write ")\n";
}
print $Write "outcome <- c(outcome, ";
print $Write @$arr[(20*int($#$arr/20))..$#$arr];
print $Write ")\n";
}
print $Write "outcome=outcome**2/$z0\n" if $settings{power_density};
print $Write "cenoutcome=rep(FALSE, length(outcome))\n".
"cenoutcome[outcome==min(outcome)]=TRUE\n";
print $Write "sum(cenoutcome)==length(outcome)\n";
my $nomeas = <$Read>;
print $Write "sum(cenoutcome)>=length(outcome)-minmeas\n";
my $fewmeas = <$Read>;
if ($nomeas =~ /TRUE/) {
$result{ros_mean} = $dl_nomeas;
$result{ros_sd} = "NA";
$result{n_cen} = 0;
$result{quantile}{0.95} = "NA";
} elsif ($fewmeas =~ /TRUE/) {
$result{ros_mean} = $dl_fewmeas;
$result{ros_sd} = "NA";
print $Write "length(outcome)-sum(cenoutcome)\n";
$res = <$Read>; chomp $res;
$res =~ s/^\[1\] //;
$result{n_cen} = $res;
$result{quantile}{0.95} = "NA";
} else {
print $Write "pyros=cenros(outcome,cenoutcome)\n";
print $Write "mean(pyros)\n";
$res = <$Read>; chomp $res;
$res =~ s/^\[1\] //;
$res = sqrt($z0 * $res) if $settings{power_density};
$result{ros_mean} = sprintf("%.0".$digits."f", $res);
print $Write "sd(pyros)\n";
$res = <$Read>; chomp $res;
$res =~ s/^\[1\] //;
$res = sqrt($z0 * $res) if $settings{power_density};
$result{ros_sd} = sprintf("%.0".$digits."f", $res);
print $Write "quantile(pyros, probs=c(0.95))\n";
$res = <$Read>;
$res = <$Read>; chomp $res; chop $res; # a space-t is leszedi a vegerol
$res = sqrt($z0 * $res) if $settings{power_density};
$result{quantile}{0.95} = sprintf("%.0".$digits."f", $res);
print $Write "pyros\n";
$res = <$Read>;
$res = <$Read>;
print $Write "length(outcome)-sum(cenoutcome)\n";
$res = <$Read>; chomp $res;
$res =~ s/^\[1\] //;
$result{n_cen} = sprintf("%.01d", $res);
}
print $Write "outcome=sqrt($z0*outcome)\n" if $settings{power_density};
print $Write "mean(outcome, na.rm=TRUE)\n";
$res = <$Read>; chomp $res;
$res =~ s/^\[1\] //;
$result{mean} = sprintf("%.0".$digits."f", $res);
#print 3;
print $Write "sd(outcome, na.rm=TRUE)\n";
$res = <$Read>; chomp $res;
$res =~ s/^\[1\] //;
$result{sd} = sprintf("%.0".$digits."f", $res);
#print 4;
print $Write "exp(mean(log(outcome), na.rm=TRUE))\n";
$res = <$Read>; chomp $res;
$res =~ s/^\[1\] //;
$result{gmean} = sprintf("%.0".$digits."f", $res);
print $Write "max(outcome)\n";
$res = <$Read>; chomp $res;
$res =~ s/^\[1\] //;
$result{max} = sprintf("%.0".$digits."f", $res);
#print 7;
$result{n} = scalar @$arr;
print "< Thread $tid finished ch $package->{manifest}{ch}, job $package->{manifest}{job}, dist $package->{manifest}{district}, table $package->{manifest}{table} in ".(time()-$time)."sec\n";
return nfreeze( { manifest => $package->{manifest}, result => \%result } );
}
sub open_db_simple {
local(*DBFN);
unless (open(DBFN, '<:encoding(UTF-8)', $settings{db_fn})) {
die;
}
die if check_db_syntax($settings{db_fn});
my $dbcode = do { local( $/ ) ; } ;
close DBFN;
eval $dbcode;
}
sub check_db_syntax {
my $file = shift;
system("perl", "-c", $file);
return $? >> 8;
}