in reply to Re^3: adding a hash to a shared object
in thread adding a hash to a shared object

Could you please elaborate a bit?

I just read Thread::Queue but I'm not sure what you mean. Should I keep a single queue with a single element (e.g. a reference to a hash with all my data)? Then each method dequeues when it starts and enqueue the updated data structure when it finishes?

Replies are listed 'Best First'.
Re^5: adding a hash to a shared object
by Corion (Patriarch) on Aug 11, 2010 at 12:27 UTC

    See the queue as a pipeline. You pass each unit of work through the queue, and each thread takes one unit of work from its input queue and puts its output into the output queue. Once there is no more input, each thread puts undef into its output to signal to its successor thread that there will be no more input coming. A specific program could look something like this:

    #!perl -w use strict; use threads; use Thread::Queue; my $q1 = Thread::Queue->new(); my $q2 = Thread::Queue->new(); my $q3 = Thread::Queue->new(); my @running; sub step_1 { print "Step 1 thread launched\n"; return async { while (defined( my $payload = $q1->dequeue )) { sleep rand 10; # do work 1 my @results = ("step1: $payload => " . ($payload+1), "step +1: $payload => " . ($payload+2)); # store results for the next thread $q2->enqueue( $_ ) for @results; }; print "Step 1 finished\n"; }; }; sub step_2 { print "Step 2 thread launched\n"; return async { while (defined( my $payload = $q2->dequeue )) { sleep rand 10; # do work 2 my @results = ("step2: $payload/x", "step2: $payload/x"); # store results for the next thread $q3->enqueue( $_ ) for @results; }; print "Step 2 finished\n"; }; }; sub step_3 { # outputs the results print "Step 3 thread launched\n"; return async { while (defined( my $payload = $q3->dequeue )) { print "Final result: $payload\n"; }; print "Step 3 finished\n"; }; }; print "Launching processors\n"; push @running, step_1; push @running, step_2; push @running, step_3; print "Submitting a first batch of work\n"; for (1..20) { $q1->enqueue( $_ ); }; print "Submitting a second batch\n"; for (1..20) { $q1->enqueue( $_ ); }; # Signal end of jobs: $q1->enqueue(undef); print "Waiting for all threads to finish\n"; for (@running) { $_->join; }; print "done\n";

    Personally, I would abstract this into a driver that launches any subroutine in a loop while reading from a queue and writing the subroutine results into another queue, but I haven't had a real need for that.

      Thank you Corion. I am trying to understand thies pieces of code but it seems the program does not return (it does all the work but does not terminate).

        Ouch! Indeed, it should be that the first thread does terminate, but it never tells its next thread to terminate. So you need to add a $q2->enqueue(undef); to the end of the first thread, and do the same for the third part as well.