my $request_q; my $response_q; sub testd { my $t0 = time; $request_q = Thread::Queue->new(); $response_q = Thread::OrderedQueue->new(); my $merge_thread = threads->create(\&outputd); my @worker_threads; for (1..$threads) { push @worker_threads, async { while ( my $job = $request_q->dequeue() ) { $response_q->enqueue( $job->[0], worka($job->[1]) ); } }; } inputd(); $request_q->end(); $_->join() for @worker_threads; $response_q->end(); $merge_thread->join(); printf STDERR "testd done in %0.02f seconds\n",time - $t0; } sub inputd { my $id = -1; foreach my $a(1..$iterations) { my @chunk = (); foreach my $b(1..$chunksize) { my %adata = %data; $adata{'.'} = $a * $b; push(@chunk,\%adata); } $request_q->enqueue([ $response_q->get_token(), \@chunk ]); } } sub outputd { open(my $fh,'>',$aoutput); while( my $job = $response_q->dequeue() ) { foreach my $data(@$job) { foreach my $key(sort keys %$data) { print {$fh} $$data{$key}; } print {$fh} "\n"; } } close($fh); } #### package Thread::OrderedQueue; use strict; use warnings; use threads::shared; sub new { my $class = shift; my $self = bless(\( my %self :shared ), $class); $self->{ q } = \( my %q: shared ); $self->{ last_given_ref } = \( my $last_given :shared = -1 ); $self->{ next_read } = 0; $self->{ ended } = 0; return $self; } sub get_token { my $self = shift; lock(${ $self->{last_given_ref} }); # We can't check if ended because we didn't lock %$self. return ++${ $self->{last_given_ref} }; } sub end { my $self = shift; lock(%$self); $self->{ended} = 1; cond_signal(%$self); } sub enqueue { my $self = shift; lock(%$self); die("!!!") if $self->{ended}; $self->{q}{ $_[0] } = shared_clone( $_[1] ); cond_signal(%$self) if $_[0] == $self->{next_read}; } sub dequeue { my $self = shift; lock(%$self); while (1) { if (exists( $self->{q}{ $self->{next_read} } )) { my $rv = delete( $self->{q}{ $self->{next_read}++ } ); cond_signal(%$self); return $rv; } if ($self->{ended}) { cond_signal(%$self); return undef; } cond_wait(%$self); } } 1;