Interesting. Will have to ponder that (later). Some factors, ranked in estimated descending order of effect:
- Because the thread does basically no work, the output thread is the bottleneck, and there's definitely less work being done there in testa.
- By using a hash instead of thread->join, we're moving work from the OS into Perl, which is going to be slower.
- Later tests are hurt by the variables created by the earlier tests. This should be tiny, though.
- You have next to no variables in your test program. That's going to help testa.
Here's another solution, but I doubt it'll catch up to testa.
my $request_q;
my $response_q;
sub testd {
my $t0 = time;
$request_q = Thread::Queue->new();
$response_q = Thread::OrderedQueue->new();
my $merge_thread = threads->create(\&outputd);
my @worker_threads;
for (1..$threads) {
push @worker_threads, async {
while ( my $job = $request_q->dequeue() ) {
$response_q->enqueue( $job->[0], worka($job->[1]) );
}
};
}
inputd();
$request_q->end();
$_->join() for @worker_threads;
$response_q->end();
$merge_thread->join();
printf STDERR "testd done in %0.02f seconds\n",time - $t0;
}
sub inputd {
my $id = -1;
foreach my $a(1..$iterations) {
my @chunk = ();
foreach my $b(1..$chunksize) {
my %adata = %data;
$adata{'.'} = $a * $b;
push(@chunk,\%adata);
}
$request_q->enqueue([ $response_q->get_token(), \@chunk ]);
}
}
sub outputd {
open(my $fh,'>',$aoutput);
while( my $job = $response_q->dequeue() ) {
foreach my $data(@$job) {
foreach my $key(sort keys %$data) {
print {$fh} $$data{$key};
}
print {$fh} "\n";
}
}
close($fh);
}
Thread/OrderedDequeue.pm:
package Thread::OrderedQueue;
use strict;
use warnings;
use threads::shared;
sub new {
my $class = shift;
my $self = bless(\( my %self :shared ), $class);
$self->{ q } = \( my %q: shared );
$self->{ last_given_ref } = \( my $last_given :shared = -1 );
$self->{ next_read } = 0;
$self->{ ended } = 0;
return $self;
}
sub get_token {
my $self = shift;
lock(${ $self->{last_given_ref} });
# We can't check if ended because we didn't lock %$self.
return ++${ $self->{last_given_ref} };
}
sub end {
my $self = shift;
lock(%$self);
$self->{ended} = 1;
cond_signal(%$self);
}
sub enqueue {
my $self = shift;
lock(%$self);
die("!!!") if $self->{ended};
$self->{q}{ $_[0] } = shared_clone( $_[1] );
cond_signal(%$self) if $_[0] == $self->{next_read};
}
sub dequeue {
my $self = shift;
lock(%$self);
while (1) {
if (exists( $self->{q}{ $self->{next_read} } )) {
my $rv = delete( $self->{q}{ $self->{next_read}++ } );
cond_signal(%$self);
return $rv;
}
if ($self->{ended}) {
cond_signal(%$self);
return undef;
}
cond_wait(%$self);
}
}
1;
|