in reply to Re^4: adding a hash to a shared object
in thread adding a hash to a shared object

See the queue as a pipeline. You pass each unit of work through the queue, and each thread takes one unit of work from its input queue and puts its output into the output queue. Once there is no more input, each thread puts undef into its output to signal to its successor thread that there will be no more input coming. A specific program could look something like this:

#!perl -w use strict; use threads; use Thread::Queue; my $q1 = Thread::Queue->new(); my $q2 = Thread::Queue->new(); my $q3 = Thread::Queue->new(); my @running; sub step_1 { print "Step 1 thread launched\n"; return async { while (defined( my $payload = $q1->dequeue )) { sleep rand 10; # do work 1 my @results = ("step1: $payload => " . ($payload+1), "step +1: $payload => " . ($payload+2)); # store results for the next thread $q2->enqueue( $_ ) for @results; }; print "Step 1 finished\n"; }; }; sub step_2 { print "Step 2 thread launched\n"; return async { while (defined( my $payload = $q2->dequeue )) { sleep rand 10; # do work 2 my @results = ("step2: $payload/x", "step2: $payload/x"); # store results for the next thread $q3->enqueue( $_ ) for @results; }; print "Step 2 finished\n"; }; }; sub step_3 { # outputs the results print "Step 3 thread launched\n"; return async { while (defined( my $payload = $q3->dequeue )) { print "Final result: $payload\n"; }; print "Step 3 finished\n"; }; }; print "Launching processors\n"; push @running, step_1; push @running, step_2; push @running, step_3; print "Submitting a first batch of work\n"; for (1..20) { $q1->enqueue( $_ ); }; print "Submitting a second batch\n"; for (1..20) { $q1->enqueue( $_ ); }; # Signal end of jobs: $q1->enqueue(undef); print "Waiting for all threads to finish\n"; for (@running) { $_->join; }; print "done\n";

Personally, I would abstract this into a driver that launches any subroutine in a loop while reading from a queue and writing the subroutine results into another queue, but I haven't had a real need for that.

Replies are listed 'Best First'.
Re^6: adding a hash to a shared object
by daverave (Scribe) on Aug 11, 2010 at 12:53 UTC
    Thank you Corion. I am trying to understand thies pieces of code but it seems the program does not return (it does all the work but does not terminate).

      Ouch! Indeed, it should be that the first thread does terminate, but it never tells its next thread to terminate. So you need to add a $q2->enqueue(undef); to the end of the first thread, and do the same for the third part as well.

        Thank you. I can see how this allows for the correct scheduling of the tasks, but what I'm still missing is how do I deal with data that each subroutine generates and I want to store somewhere outside, not just pass to the next function.

        For example, some subroutines create small hashes which I want to add up at the end of the program and write as a JSON file.

        Also, I should note my use of your scheme will be probably very degenerate since any subroutine is only called once (i.e. a single payload...).