in reply to Re^2: adding a hash to a shared object
in thread adding a hash to a shared object

Pass all the data between your subroutines via Thread::Queue. Each subroutine can be its own thread that reads from one queue and writes to the next queue. That way you have no shared data at all.

Replies are listed 'Best First'.
Re^4: adding a hash to a shared object
by daverave (Scribe) on Aug 11, 2010 at 11:48 UTC
    Could you please elaborate a bit?

    I just read Thread::Queue but I'm not sure what you mean. Should I keep a single queue with a single element (e.g. a reference to a hash with all my data)? Then each method dequeues when it starts and enqueue the updated data structure when it finishes?

      See the queue as a pipeline. You pass each unit of work through the queue, and each thread takes one unit of work from its input queue and puts its output into the output queue. Once there is no more input, each thread puts undef into its output to signal to its successor thread that there will be no more input coming. A specific program could look something like this:

      #!perl -w use strict; use threads; use Thread::Queue; my $q1 = Thread::Queue->new(); my $q2 = Thread::Queue->new(); my $q3 = Thread::Queue->new(); my @running; sub step_1 { print "Step 1 thread launched\n"; return async { while (defined( my $payload = $q1->dequeue )) { sleep rand 10; # do work 1 my @results = ("step1: $payload => " . ($payload+1), "step +1: $payload => " . ($payload+2)); # store results for the next thread $q2->enqueue( $_ ) for @results; }; print "Step 1 finished\n"; }; }; sub step_2 { print "Step 2 thread launched\n"; return async { while (defined( my $payload = $q2->dequeue )) { sleep rand 10; # do work 2 my @results = ("step2: $payload/x", "step2: $payload/x"); # store results for the next thread $q3->enqueue( $_ ) for @results; }; print "Step 2 finished\n"; }; }; sub step_3 { # outputs the results print "Step 3 thread launched\n"; return async { while (defined( my $payload = $q3->dequeue )) { print "Final result: $payload\n"; }; print "Step 3 finished\n"; }; }; print "Launching processors\n"; push @running, step_1; push @running, step_2; push @running, step_3; print "Submitting a first batch of work\n"; for (1..20) { $q1->enqueue( $_ ); }; print "Submitting a second batch\n"; for (1..20) { $q1->enqueue( $_ ); }; # Signal end of jobs: $q1->enqueue(undef); print "Waiting for all threads to finish\n"; for (@running) { $_->join; }; print "done\n";

      Personally, I would abstract this into a driver that launches any subroutine in a loop while reading from a queue and writing the subroutine results into another queue, but I haven't had a real need for that.

        Thank you Corion. I am trying to understand thies pieces of code but it seems the program does not return (it does all the work but does not terminate).