package Thread::Queue; use threads::shared; use strict; our $VERSION = '2.00'; sub new { my $class = shift; my @q : shared = @_; return bless \@q, $class; } sub dequeue { my $q = shift; lock(@$q); cond_wait @$q until @$q; cond_signal @$q if @$q > 1; return shift @$q; } sub dequeue_nb { my $q = shift; lock(@$q); return shift @$q; } sub enqueue { my $q = shift; lock(@$q); push @$q, @_ and cond_signal @$q; } sub pending { my $q = shift; lock(@$q); return scalar(@$q); } 1; #### #! perl -slw use strict; use Time::HiRes qw[ time ]; use threads; use Thread::Queue; our $T //= 10; our $N //= 100; our $I //= 1e4; my $Q = new Thread::Queue; my @threads = map async{ while( my $item = $Q->dequeue ) { my @a = split $;, $item; } }, 1 .. $T; my $start = time(); my @a = 1.. $N; for( 1 .. $I ) { $Q->enqueue( join $;, @a ); } $Q->enqueue( (undef) x @threads ); $_->join for @threads; printf "Took %.6f seconds\n", time() - $start; print join ' ', times; #### #! perl -slw use strict; use Time::HiRes qw[ time ]; use threads; use Thread::Queue; our $T //= 10; our $N //= 100; our $I //= 1e4; my $Q = new Thread::Queue; my @threads = map async{ while( my $item = $Q->dequeue ) { my @a = @{ $item }; } }, 1 .. $T; my $start = time(); my @a = 1.. $N; for( 1 .. $I ) { $Q->enqueue( \@a ); } $Q->enqueue( (undef) x @threads ); $_->join for @threads; printf "Took %.6f seconds\n", time() - $start; print join ' ', times; #### c:\test>junk51 -T=10 -N=100 -I=1e4 Took 2.544000 seconds 4.742 2.137 0 0 c:\test>junk52 -T=10 -N=100 -I=1e4 Took 22.028000 seconds 16.832 29.296 0 0 c:\test>junk51 -T=10 -N=1000 -I=1e3 Took 2.148000 seconds 3.541 1.809 0 0 c:\test>junk52 -T=10 -N=1000 -I=1e3 Took 21.426000 seconds 15.303 29.156 0 0