my $request_q; my $response_q; sub testc { my $t0 = time; $request_q = Thread::Queue->new(); $response_q = Thread::Queue->new(); my $merge_thread = threads->create(\&outputc); my @worker_threads; for (1..$threads) { push @worker_threads, async { while ( my $job = $request_q->dequeue() ) { $response_q->enqueue([ $job->[0], worka($job->[1]) ]); } }; } inputc(); $request_q->end(); $_->join() for @worker_threads; $response_q->end(); $merge_thread->join(); printf STDERR "testc done in %0.02f seconds\n",time - $t0; } sub inputc { my $id = -1; foreach my $a(1..$iterations) { my @chunk = (); foreach my $b(1..$chunksize) { my %adata = %data; $adata{'.'} = $a * $b; push(@chunk,\%adata); } $request_q->enqueue([ ++$id, \@chunk ]); } } sub outputc { my $next_id = 0; my %cache; open(my $fh,'>',$aoutput); while( my $job = $response_q->dequeue() ) { if ($job->[0] != $next_id) { $cache{$job->[0]} = $job; next; } do { foreach my $data(@{ $job->[1] }) { foreach my $key(sort keys %$data) { print {$fh} $$data{$key}; } print {$fh} "\n"; } $job = delete($cache{++$next_id}); } while $job; } close($fh); }