Re^2: shared scalar freed early
by chris212 (Scribe) on Feb 22, 2017 at 18:52 UTC
|
Yeah, is that a problem? Did you miss the "using a semaphore to limit the number of concurrent threads" part of that sentence? Millions of threads will will be started throughout the execution of the script, but not all at the same time! I can't pass too much data to the thread's sub, right?
UPDATE:
I just did the math, and the test data I am using to replicate the crash would create 766,747 worker threads, but only 32 at a time. I tried making the threads persistent and read data from a queue, but that was MUCH slower.
| [reply] [Watch: Dir/Any] |
|
use strict;
use warnings;
use feature qw( say );
use threads;
use Thread::Queue qw( );
use Thread::Semaphore qw( );
use Time::HiRes qw( time );
use constant MAX_WORKERS => 32;
use constant NUM_JOBS => 1_000;
{
my $sem = Thread::Semaphore->new(MAX_WORKERS);
my $s = time;
for my $job (1..NUM_JOBS) {
$sem->down();
$_->join() for threads->list(threads::joinable);
async {
# ...
$sem->up();
};
}
$_->join() for threads->list();
my $e = time;
say($e-$s); # 5.88567113876343
}
{
my $q = Thread::Queue->new();
for (1..MAX_WORKERS) {
async {
while (defined( my $job = $q->dequeue() )) {
# ...
}
};
}
my $s = time;
$q->enqueue($_) for 1..NUM_JOBS;
$q->end();
$_->join() for threads->list();
my $e = time;
say($e-$s); # 0.248196125030518
}
Things couldn't be any more ideal for creating threads (minimal amount of variables to clone), yet creating all those threads was 25x slower than reusing a few threads. (The factor will grow as the number of jobs increases.) | [reply] [Watch: Dir/Any] [d/l] |
|
Can you make "testb" faster than "testa" while still preserving the order? If you help me do that, then I can rewrite my script using queues and hope that it resolves my intermittent crashes.
#!/opt/perl/bin/perl
use strict;
use threads;
use Thread::Queue;
use Thread::Semaphore;
use threads::shared;
use Time::HiRes qw( time );
my $iterations = 100;
my $chunksize = 50;
my $threads = 5;
my $aoutput = "a.txt";
my $boutput = "b.txt";
my $q;
my $s;
my $outq;
my %data = ();
foreach('a'..'z') {
$data{$_} = $_ x 200;
}
testa();
testb();
system($^O eq 'MSWin32' ? 'fc' : 'diff',$aoutput,$boutput);
sub testa {
my $t0 = time;
$q = Thread::Queue->new;
$s = Thread::Semaphore->new($threads);
my ($outth) = threads->create(\&outputa);
inputa();
$q->end;
$outth->join();
printf STDERR "testa done in %0.02f seconds\n",time - $t0;
}
sub worka {
my ($data) = @_;
my @ret = ();
foreach my $chunk(@$data) {
my %output = ();
foreach my $key(keys %$chunk) {
if($key eq '.') {
$output{$key} = $$chunk{$key};
next;
}
my $val = $$chunk{$key};
my $uc = uc($key);
$val =~ s/$key/$uc/g;
$output{$key} = $val;
}
push(@ret,\%output);
}
return(\@ret);
}
sub inputa {
foreach my $a(1..$iterations) {
my @chunk = ();
foreach my $b(1..$chunksize) {
my %adata = %data;
$adata{'.'} = $a * $b;
push(@chunk,\%adata);
}
$s->down();
my ($th) = threads->create(\&worka,\@chunk);
$q->enqueue($th);
}
}
sub outputa {
open(my $fh,'>',$aoutput);
while(1) {
my $th = $q->dequeue();
last unless(defined $th);
my ($output) = $th->join();
$s->up();
foreach my $data(@$output) {
foreach my $key(sort keys %$data) {
print {$fh} $$data{$key};
}
print {$fh} "\n";
}
}
close($fh);
}
sub testb {
my $t0 = time;
$q = Thread::Queue->new;
$outq = Thread::Queue->new;
$s = Thread::Semaphore->new($threads);
my @threads = ();
foreach(1..$threads) {
my $th = threads->create(\&workb);
push(@threads,$th);
}
my ($outth) = threads->create(\&outputb);
inputb();
$q->end;
$outq->end;
$_->join() foreach(@threads,$outth);
printf STDERR "testb done in %0.02f seconds\n",time - $t0;
}
sub workb {
while(1) {
my $arr = $q->dequeue;
last unless(defined $arr);
$s->up();
my ($input,$output,$sem) = @$arr;
foreach my $data(@$input) {
my %ret :shared = ();
foreach my $key(keys %$data) {
if($key eq '.') {
$ret{$key} = $$data{$key};
next;
}
my $val = $$data{$key};
my $uc = uc($key);
$val =~ s/$key/$uc/g;
$ret{$key} = $val;
}
push(@$output,\%ret);
}
$sem->up();
}
}
sub inputb {
foreach my $a(1..$iterations) {
my @input = ();
foreach my $b(1..$chunksize) {
my %bdata = %data;
$bdata{'.'} = $a * $b;
push(@input,\%bdata);
}
my @output :shared = ();
my $sem = Thread::Semaphore->new(0);
$s->down();
$outq->enqueue([\@output,$sem]);
$q->enqueue([\@input,\@output,$sem]);
}
}
sub outputb {
open(my $fh,'>',$boutput);
while(1) {
my $arr = $outq->dequeue;
last unless(defined $arr);
my ($output,$sem) = @$arr;
$sem->down();
foreach my $data(@$output) {
foreach my $key(sort keys %$data) {
print {$fh} $$data{$key};
}
print {$fh} "\n";
}
}
close($fh);
}
| [reply] [Watch: Dir/Any] [d/l] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
I'll see if I can create a test script comparing the 2 methods in a way that better represents what I am trying to accomplish. The tricky part is keeping the output in the same order as input, and I don't remember how I accomplished that in my previous testing.
| [reply] [Watch: Dir/Any] |
|
| [reply] [Watch: Dir/Any] |
|
Please see reply to ikegami. That is not the results I get with my test script. Thanks.
| [reply] [Watch: Dir/Any] |
|
|
|
|
Yeah, is that a problem?
Oh, yes it is. Spawning a very large number of threads will most certainly take an heavy toll on the machine's resources and at best slowdown everything (or more probably bring your machine down), even if many of them are idle at any point of time.
Other monks have already explained while I was off-line that it is much better to have a relatively limited number of threads picking work from a job queue (or something similar), and I absolutely agree with ikegami and tye on that.
(And this is why I candidly asked the question in the first place, as well as to make sure I understood you correctly, because the idea seemed so extravagant to me.)
| [reply] [Watch: Dir/Any] |
|
So even after a thread is joined, it still consumes resources? I don't see memory usage growing or the number of threads growing as new threads are created to replace the finished ones. The CPU is utilized heavily the entire time it runs, but that is the point. Are you saying my CPU cycles are being wasted on threads that have already joined/returned?
| [reply] [Watch: Dir/Any] |
|
Please see reply to ikegami. I'm still interested in how using so many threads can be causing my crash, but if I can't get queues to work anywhere near as well, I don't see a practical alternative in Perl. Thanks.
| [reply] [Watch: Dir/Any] |