$ time perl maketables_threads_broken_minimal.pl 2> dev/null Spawning threads... 1 statistics threads started [snip] > Thread 1 starting ch ch_fm, job *, dist district_2, table sum, size 63223 < Thread 1 finished ch ch_fm, job *, dist district_2, table sum in 11sec > Thread 1 starting ch ch_fm, job *, dist district_12, table sum, size 77721 < Thread 1 finished ch ch_fm, job *, dist district_12, table sum in 17sec > Thread 1 starting ch ch_fm, job *, dist district_22, table sum, size 54784 < Thread 1 finished ch ch_fm, job *, dist district_22, table sum in 8sec > Thread 1 starting ch ch_fm, job *, dist district_22, table sum, size 54784 < Thread 1 finished ch ch_fm, job *, dist district_22, table sum in 8sec > Thread 1 starting ch ch_fm, job *, dist *, table sum, size 195728 < Thread 1 finished ch ch_fm, job *, dist *, table sum in 112sec [snip] Thread 1 done 16 jobs !!! 16 packages pushed, 16 result received real 3m39.706s user 0m5.948s sys 0m0.964s $ time perl maketables_threads_broken_minimal.pl 2> dev/null Spawning threads... 2 statistics threads started [snip] > Thread 2 starting ch ch_fm, job *, dist district_2, table sum, size 63223 < Thread 1 finished ch ch_fm, job onkorm, dist *, table sum in 48sec > Thread 1 starting ch ch_fm, job *, dist district_12, table sum, size 77721 < Thread 2 finished ch ch_fm, job *, dist district_2, table sum in 19sec > Thread 2 starting ch ch_fm, job *, dist district_22, table sum, size 54784 < Thread 2 finished ch ch_fm, job *, dist district_22, table sum in 15sec > Thread 2 starting ch ch_fm, job *, dist *, table sum, size 195728 < Thread 1 finished ch ch_fm, job *, dist district_12, table sum in 34sec Thread 1 done 10 jobs < Thread 2 finished ch ch_fm, job *, dist *, table sum in 113sec Thread 2 done 6 jobs !!! 16 packages pushed, 16 result received real 3m35.557s user 0m6.100s sys 0m1.008s #### print $Write "pyros=cenros(outcome,cenoutcome)\n"; print $Write "mean(pyros)\n"; $res = <$Read>; #### #!/usr/bin/perl use Storable qw(retrieve nfreeze thaw ); use Data::Dumper; use IPC::Open2; use threads; #use threads::shared; use Thread::Queue; use strict; use warnings; use utf8; my %settings; my %db; #my @data; my %frozen_data; #my $dref; my @canned_texts; $|++; $settings{db_fn} = '/home/kikuchiyo/meresek/t-mobil_exp/t-mobil.dbt'; $settings{data_dir} = '/home/kikuchiyo/meresek/t-mobil_exp/data'; $settings{data_ext} = ".dat"; $settings{hungry_hungry_hippo} = 1; $settings{tables_fn} = 'tables.tex'; $settings{power_density} = 1; $settings{max_threads} = 2; my $input_queue = Thread::Queue->new(); my $result_queue = Thread::Queue->new(); my @channels = qw(ch_fm); print "Spawning threads... "; for (1..$settings{max_threads}) { threads->create(\&work_thread, $_)->detach(); } print "$settings{max_threads} statistics threads started\n"; my $type = 'all'; # O, M, U, E, A, all my %filter; my $result; my @data; my $ar; my %arrays; my %desc; my $manifest; my $package; my @tables; my @conditions; my $number_packages = 0; my $number_results = 0; open_db_simple(); foreach my $ch (@channels) { @data = (); foreach my $job qw(ovoda iskola onkorm ) { foreach my $district qw(district_2 district_12 district_22 ) { %filter = (district => $district, job => $job); $ar = collect_data($ch, \%filter, $type); $arrays{$job}{$district} = $ar; print "\n***Ch: $ch, Job: $job, District: $district\n"; $manifest = { ch => $ch, job => $job, district => $district, type => $type, table => "cat" }; $package = { manifest => $manifest, ar => $ar }; $input_queue->enqueue(nfreeze($package)); $number_packages++; } } print "Appending table $ch\_all_cat\n"; @data = (); foreach my $job qw(ovoda iskola onkorm ) { $ar = []; foreach my $district qw(district_2 district_12 district_22 ) { push @$ar, @{ $arrays{$job}{$district} }; } print "\n***Ch: $ch, Job: $job\n"; $manifest = { ch => $ch, job => $job, district => "*", type => $type, table => "sum" }; $package = { manifest => $manifest, ar => $ar }; $input_queue->enqueue(nfreeze($package)); $number_packages++; } foreach my $district qw(district_2 district_12 district_22 ) { $ar = []; foreach my $job qw(ovoda iskola onkorm ) { push @$ar, @{ $arrays{$job}{$district} }; } print "\n***Ch: $ch, District: $district\n"; $manifest = { ch => $ch, job => "*", district => $district, type => $type, table => "sum" }; $package = { manifest => $manifest, ar => $ar }; $input_queue->enqueue(nfreeze($package)); $number_packages++; } $ar = []; foreach my $job qw(ovoda iskola onkorm ) { foreach my $district qw(district_2 district_12 district_22 ) { push @$ar, @{ $arrays{$job}{$district} }; } } print "\n***\nCh: $ch, All\n"; $manifest = { ch => $ch, job => "*", district => "*", type => $type, table => "sum" }; $package = { manifest => $manifest, ar => $ar }; $input_queue->enqueue(nfreeze($package)); $number_packages++; print "Appending table $ch\_all_sum\n"; } for (1..$settings{max_threads}){ $input_queue->enqueue("DONE"); } print "Done assigning jobs, waiting...\n"; while (my $to_be_processed = $input_queue->pending() > 0) { sleep(1); } while (my $remaining_results = $result_queue->pending() > 0) { $package = thaw($result_queue->dequeue()); $number_results++; } print "!!! $number_packages packages pushed, $number_results result received\n"; sleep 1; exit 0; sub work_thread { my ($tid) = shift; my ($work, $result); my $number = 0; my ($ReadFH, $WriteFH); my $pid = open2($ReadFH, $WriteFH, "R", "--slave"); print $WriteFH "library(splines)\nlibrary(survival)\nlibrary(NADA)\n"; while (1) { my $work = $input_queue->dequeue(); # If end-of-queue marker reached then cleanup and exit. if ($work eq "DONE"){ close $WriteFH; print "Thread $tid done $number jobs\n"; kill 9, $pid; return 0; } elsif ($work) { $number++; $result = do_R_stat($work, $ReadFH, $WriteFH, $tid); $result_queue->enqueue($result); } } } sub collect_data { my ($ch, $filter, $type) = @_; my @ids; my $dref; foreach my $id (sort keys %db) { my $condition = 1; foreach my $key (keys %$filter) { $condition &= $db{$id}{record}{$key} eq $filter->{$key}; } push @ids, $id if $condition; } my @arr; foreach my $id (@ids) { if ($settings{hungry_hungry_hippo} and exists($frozen_data{$id})) { $dref = $frozen_data{$id}; print "MEM $id\t"; } else { $dref = retrieve($settings{data_dir}.'/'.$id.$settings{data_ext}); $frozen_data{$id} = $dref; print "DISK $id\t"; } foreach my $i (0..$#$dref) { if ($dref->[$i]{$ch} and ($type eq $dref->[$i]{type} or $type eq 'all')) { push @arr, $dref->[$i]{$ch}; } } } return \@arr; } sub do_R_stat { my $mirelit = shift; my $Read = shift; my $Write = shift; my $tid = shift; my $package = thaw($mirelit); my $arr = $package->{ar}; my $digits = 3; my $dl_nomeas = 0.005; my $dl_fewmeas = 0.01; my $z0 = 3.769911; unless (scalar @$arr) { my %result = ( 'n' => 0, 'max' => 0, 'ros_sd' => 0, 'quantile' => { '0.95' => 0 }, 'n_cen' => 0, 'sd' => 0, 'mean' => 0, 'ros_mean' => 0, #'gsd' => 0, 'gmean' => 0, ) ;# print "FAIL $tid\n"; return nfreeze( { manifest => $package->{manifest}, result => \%result } ); } print "> Thread $tid starting ch $package->{manifest}{ch}, job $package->{manifest}{job}, dist $package->{manifest}{district}, table $package->{manifest}{table}, size ".(scalar @$arr)."\n"; my $time = time(); my %result; my $res; { local $, = ","; #print 1; print $Write "outcome <- c("; print $Write @$arr[0..19]; print $Write ")\n"; for my $i (0..int($#$arr/20 - 1)) { print $Write "outcome <- c(outcome, "; print $Write @$arr[($i*20)..($i*20+19)]; print $Write ")\n"; } print $Write "outcome <- c(outcome, "; print $Write @$arr[(20*int($#$arr/20))..$#$arr]; print $Write ")\n"; } print $Write "outcome=outcome**2/$z0\n" if $settings{power_density}; print $Write "cenoutcome=rep(FALSE, length(outcome))\n". "cenoutcome[outcome==min(outcome)]=TRUE\n"; print $Write "sum(cenoutcome)==length(outcome)\n"; my $nomeas = <$Read>; print $Write "sum(cenoutcome)>=length(outcome)-minmeas\n"; my $fewmeas = <$Read>; if ($nomeas =~ /TRUE/) { $result{ros_mean} = $dl_nomeas; $result{ros_sd} = "NA"; $result{n_cen} = 0; $result{quantile}{0.95} = "NA"; } elsif ($fewmeas =~ /TRUE/) { $result{ros_mean} = $dl_fewmeas; $result{ros_sd} = "NA"; print $Write "length(outcome)-sum(cenoutcome)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{n_cen} = $res; $result{quantile}{0.95} = "NA"; } else { print $Write "pyros=cenros(outcome,cenoutcome)\n"; print $Write "mean(pyros)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $res = sqrt($z0 * $res) if $settings{power_density}; $result{ros_mean} = sprintf("%.0".$digits."f", $res); print $Write "sd(pyros)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $res = sqrt($z0 * $res) if $settings{power_density}; $result{ros_sd} = sprintf("%.0".$digits."f", $res); print $Write "quantile(pyros, probs=c(0.95))\n"; $res = <$Read>; $res = <$Read>; chomp $res; chop $res; # a space-t is leszedi a vegerol $res = sqrt($z0 * $res) if $settings{power_density}; $result{quantile}{0.95} = sprintf("%.0".$digits."f", $res); print $Write "pyros\n"; $res = <$Read>; $res = <$Read>; print $Write "length(outcome)-sum(cenoutcome)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{n_cen} = sprintf("%.01d", $res); } print $Write "outcome=sqrt($z0*outcome)\n" if $settings{power_density}; print $Write "mean(outcome, na.rm=TRUE)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{mean} = sprintf("%.0".$digits."f", $res); #print 3; print $Write "sd(outcome, na.rm=TRUE)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{sd} = sprintf("%.0".$digits."f", $res); #print 4; print $Write "exp(mean(log(outcome), na.rm=TRUE))\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{gmean} = sprintf("%.0".$digits."f", $res); print $Write "max(outcome)\n"; $res = <$Read>; chomp $res; $res =~ s/^\[1\] //; $result{max} = sprintf("%.0".$digits."f", $res); #print 7; $result{n} = scalar @$arr; print "< Thread $tid finished ch $package->{manifest}{ch}, job $package->{manifest}{job}, dist $package->{manifest}{district}, table $package->{manifest}{table} in ".(time()-$time)."sec\n"; return nfreeze( { manifest => $package->{manifest}, result => \%result } ); } sub open_db_simple { local(*DBFN); unless (open(DBFN, '<:encoding(UTF-8)', $settings{db_fn})) { die; } die if check_db_syntax($settings{db_fn}); my $dbcode = do { local( $/ ) ; } ; close DBFN; eval $dbcode; } sub check_db_syntax { my $file = shift; system("perl", "-c", $file); return $? >> 8; }