my $request_q; my $response_q; sub testd { my $t0 = time; $request_q = Thread::Queue->new(); $response_q = Thread::OrderedQueue->new(); my $merge_thread = threads->create(\&outputd); my @worker_threads; for (1..$threads) { push @worker_threads, async { while ( my $job = $request_q->dequeue() ) { $response_q->enqueue( $job->[0], worka($job->[1]) ); } }; } inputd(); $request_q->end(); $_->join() for @worker_threads; $response_q->end(); $merge_thread->join(); printf STDERR "testd done in %0.02f seconds\n",time - $t0; } sub inputd { my $id = -1; foreach my $a(1..$iterations) { my @chunk = (); foreach my $b(1..$chunksize) { my %adata = %data; $adata{'.'} = $a * $b; push(@chunk,\%adata); } $request_q->enqueue([ $response_q->get_token(), \@chunk ]); } } sub outputd { open(my $fh,'>',$aoutput); while( my $job = $response_q->dequeue() ) { foreach my $data(@$job) { foreach my $key(sort keys %$data) { print {$fh} $$data{$key}; } print {$fh} "\n"; } } close($fh); }