#!/usr/bin/perl use strict; use warnings; use threads; use Thread::Queue; sub task_a { my ($Qb, $Qc) = @_; for (1 .. 1e5) { $Qb->enqueue(1e3 + int(rand 1000)); $Qc->enqueue(2e3 + int(rand 1000)); } $Qb->enqueue(undef) for 1 .. 2; } sub task_b { my ($Qb, $Qc) = @_; while ( my $arg1 = $Qb->dequeue() ) { $Qc->enqueue($arg1 * 10); } $Qc->enqueue(undef) for 1 .. 3; } sub task_c { my ($Qc) = @_; while ( my $arg1 = $Qc->dequeue() ) { print "$arg1\n"; } } my $Qb = new Thread::Queue; my $Qc = new Thread::Queue; my @thrs; push @thrs, threads->new( \&task_a, $Qb, $Qc ) for 1 .. 1; push @thrs, threads->new( \&task_b, $Qb, $Qc ) for 1 .. 2; push @thrs, threads->new( \&task_c, $Qc ) for 1 .. 3; $_->join for @thrs; print "Done\n"; #### #!/usr/bin/perl use strict; use warnings; use MCE::Step; sub task_a { for (1 .. 1e5) { MCE->await('B', 200) if $_ % 200 == 0; MCE->enq('B', 1e3 + int(rand 1000)); MCE->await('C', 200) if $_ % 200 == 0; MCE->enq('C', 2e3 + int(rand 1000)); } } sub task_b { my ($mce, $arg1, $arg2, $argN) = @_; MCE->enq('C', $arg1 * 10); } sub task_c { my ($mce, $arg1, $arg2, $argN) = @_; print "$arg1\n"; } MCE::Step->init( task_name => [ 'A', 'B', 'C' ], max_workers => [ 1, 2, 3 ], ); mce_step \&task_a, \&task_b, \&task_c; print "Done\n";