See the queue as a pipeline. You pass each unit of work through the queue, and each thread takes one unit of work from its input queue and puts its output into the output queue. Once there is no more input, each thread puts undef into its output to signal to its successor thread that there will be no more input coming. A specific program could look something like this:
#!perl -w use strict; use threads; use Thread::Queue; my $q1 = Thread::Queue->new(); my $q2 = Thread::Queue->new(); my $q3 = Thread::Queue->new(); my @running; sub step_1 { print "Step 1 thread launched\n"; return async { while (defined( my $payload = $q1->dequeue )) { sleep rand 10; # do work 1 my @results = ("step1: $payload => " . ($payload+1), "step +1: $payload => " . ($payload+2)); # store results for the next thread $q2->enqueue( $_ ) for @results; }; print "Step 1 finished\n"; }; }; sub step_2 { print "Step 2 thread launched\n"; return async { while (defined( my $payload = $q2->dequeue )) { sleep rand 10; # do work 2 my @results = ("step2: $payload/x", "step2: $payload/x"); # store results for the next thread $q3->enqueue( $_ ) for @results; }; print "Step 2 finished\n"; }; }; sub step_3 { # outputs the results print "Step 3 thread launched\n"; return async { while (defined( my $payload = $q3->dequeue )) { print "Final result: $payload\n"; }; print "Step 3 finished\n"; }; }; print "Launching processors\n"; push @running, step_1; push @running, step_2; push @running, step_3; print "Submitting a first batch of work\n"; for (1..20) { $q1->enqueue( $_ ); }; print "Submitting a second batch\n"; for (1..20) { $q1->enqueue( $_ ); }; # Signal end of jobs: $q1->enqueue(undef); print "Waiting for all threads to finish\n"; for (@running) { $_->join; }; print "done\n";
Personally, I would abstract this into a driver that launches any subroutine in a loop while reading from a queue and writing the subroutine results into another queue, but I haven't had a real need for that.
In reply to Re^5: adding a hash to a shared object
by Corion
in thread adding a hash to a shared object
by daverave
| For: | Use: | ||
| & | & | ||
| < | < | ||
| > | > | ||
| [ | [ | ||
| ] | ] |