#!perl -w use strict; use threads; use Thread::Queue; my $q1 = Thread::Queue->new(); my $q2 = Thread::Queue->new(); my $q3 = Thread::Queue->new(); my @running; sub step_1 { print "Step 1 thread launched\n"; return async { while (defined( my $payload = $q1->dequeue )) { sleep rand 10; # do work 1 my @results = ("step1: $payload => " . ($payload+1), "step1: $payload => " . ($payload+2)); # store results for the next thread $q2->enqueue( $_ ) for @results; }; print "Step 1 finished\n"; }; }; sub step_2 { print "Step 2 thread launched\n"; return async { while (defined( my $payload = $q2->dequeue )) { sleep rand 10; # do work 2 my @results = ("step2: $payload/x", "step2: $payload/x"); # store results for the next thread $q3->enqueue( $_ ) for @results; }; print "Step 2 finished\n"; }; }; sub step_3 { # outputs the results print "Step 3 thread launched\n"; return async { while (defined( my $payload = $q3->dequeue )) { print "Final result: $payload\n"; }; print "Step 3 finished\n"; }; }; print "Launching processors\n"; push @running, step_1; push @running, step_2; push @running, step_3; print "Submitting a first batch of work\n"; for (1..20) { $q1->enqueue( $_ ); }; print "Submitting a second batch\n"; for (1..20) { $q1->enqueue( $_ ); }; # Signal end of jobs: $q1->enqueue(undef); print "Waiting for all threads to finish\n"; for (@running) { $_->join; }; print "done\n";