package Thread::Queue;
use threads::shared;
use strict;
our $VERSION = '2.00';
sub new {
my $class = shift;
my @q : shared = @_;
return bless \@q, $class;
}
sub dequeue {
my $q = shift;
lock(@$q);
cond_wait @$q until @$q;
cond_signal @$q if @$q > 1;
return shift @$q;
}
sub dequeue_nb {
my $q = shift;
lock(@$q);
return shift @$q;
}
sub enqueue {
my $q = shift;
lock(@$q);
push @$q, @_ and cond_signal @$q;
}
sub pending {
my $q = shift;
lock(@$q);
return scalar(@$q);
}
1;
####
#! perl -slw
use strict;
use Time::HiRes qw[ time ];
use threads;
use Thread::Queue;
our $T //= 10;
our $N //= 100;
our $I //= 1e4;
my $Q = new Thread::Queue;
my @threads = map async{
while( my $item = $Q->dequeue ) {
my @a = split $;, $item;
}
}, 1 .. $T;
my $start = time();
my @a = 1.. $N;
for( 1 .. $I ) {
$Q->enqueue( join $;, @a );
}
$Q->enqueue( (undef) x @threads );
$_->join for @threads;
printf "Took %.6f seconds\n", time() - $start;
print join ' ', times;
####
#! perl -slw
use strict;
use Time::HiRes qw[ time ];
use threads;
use Thread::Queue;
our $T //= 10;
our $N //= 100;
our $I //= 1e4;
my $Q = new Thread::Queue;
my @threads = map async{
while( my $item = $Q->dequeue ) {
my @a = @{ $item };
}
}, 1 .. $T;
my $start = time();
my @a = 1.. $N;
for( 1 .. $I ) {
$Q->enqueue( \@a );
}
$Q->enqueue( (undef) x @threads );
$_->join for @threads;
printf "Took %.6f seconds\n", time() - $start;
print join ' ', times;
####
c:\test>junk51 -T=10 -N=100 -I=1e4
Took 2.544000 seconds
4.742 2.137 0 0
c:\test>junk52 -T=10 -N=100 -I=1e4
Took 22.028000 seconds
16.832 29.296 0 0
c:\test>junk51 -T=10 -N=1000 -I=1e3
Took 2.148000 seconds
3.541 1.809 0 0
c:\test>junk52 -T=10 -N=1000 -I=1e3
Took 21.426000 seconds
15.303 29.156 0 0