Update: after reading
Re: What is the fastest way to download a bunch of web pages? i've reworked this function to follow the same pattern. Notice that Thread::Queue only takes scalars, so if you want to push funky data through the queue, one way is to freeze/thaw it.
use strict;
use warnings;
use Data::Dumper;
use threads;
use Thread::Queue;
use Storable qw(freeze thaw);
my $slow_matches_b = sub { sleep 1;
return unless $_[0];
return 1 if $_[0] =~ /b/;
};
my $test_strings = [ ('blee','blah','bloo', 'qoo', 'fwee' ) ];
my @results = threaded_map( 2, $slow_matches_b, @$test_strings );
print Dumper \@results;
sub threaded_map
{
my $thread_count = shift;
my $function = shift;
#
# setup work queues
#
my $id = 0;
my $input_q = new Thread::Queue;
my $result_q = new Thread::Queue;
for (map { [ $id++, $_ ] } @_)
{
$input_q->enqueue(freeze($_));
}
#
# define worker function to run in each thread
#
my $worker_function = sub
{
while ( my $frozen_work = $input_q->dequeue_nb )
{
my $work = thaw($frozen_work);
my $id = $work->[0];
my $input_data = $work->[1];
my $result = $function->($input_data);
my $frozen_result = freeze( [ $id, $result ] );
$result_q->enqueue($frozen_result);
}
};
#
# create workers that will read from shared input queue and wri
+te to shared output queue
#
my @threads = map{ threads->create( $worker_function ) } 1 .. $thr
+ead_count;
$_->join for @threads;
#
# join the results data together
#
my %results;
while ( my $frozen_result = $result_q->dequeue_nb )
{
my $result = thaw($frozen_result);
$results{$result->[0]} = $result->[1];
}
#
# return results in order received
#
return map { $results{$_} } sort keys %results;
}