use strict; use warnings; use Engine qw( ); use constant NUM_WORKERS => 4; sub fibonacci_task { my ($engine, $on_complete, $n) = @_; return $on_complete->(0) if $n == 0; return $on_complete->(1) if $n == 1; my ($x,$y); $engine->process_group( sub { $on_complete->($x+$y) }, [ \&fibonacci_task => (sub { $x = $_[0] }, $n-2) ], [ \&fibonacci_task => (sub { $y = $_[0] }, $n-1) ], ); } sub fibonacci { my ($engine, $n) = @_; my $result; $engine->process_and_wait( \&fibonacci_task => ( sub { $result = $_[0] }, $n ) ); return $result; } { my $engine = Engine->new(NUM_WORKERS); printf("%s! = %s\n", $_, fibonacci($engine, $_)) for 1..10; } #### 1! = 1 2! = 1 3! = 2 4! = 3 5! = 5 6! = 8 7! = 13 8! = 21 9! = 34 10! = 55 #### BEGIN { package Semaphore; use strict; use warnings; use Coro::Semaphore qw( ); sub new { my ($class, $count) = @_; my $s = Coro::Semaphore->new($count); return bless(\$s, $class); } sub down { my ($self) = @_; my $s = $$self; return $s->guard(); } sub up { my ($self) = @_; my $s = $$self; return $s->up(); } sub wait { my ($self) = @_; my $s = $$self; return $s->wait(); } $INC{'Semaphore.pm'} = __FILE__; } BEGIN { package Queue; use strict; use warnings; use Coro::Channel qw( ); sub new { my ($class) = @_; my $q = Coro::Channel->new(); return bless(\$q, $class); } sub enqueue { my ($self, $item) = @_; my $q = $$self; return $q->put($item); } sub dequeue { my ($self) = @_; my $q = $$self; return $q->get(); } $INC{'Queue.pm'} = __FILE__; } BEGIN { package Engine; use strict; use warnings; use Coro qw( async ); use Queue qw( ); use Semaphore qw( ); sub new { my ($class, $num_workers) = @_; my $q = Queue->new(); for (1..$num_workers) { async { for (;;) { $q->dequeue()->(); } }; } return bless({ queue => $q }, $class); } sub process_group { my ($self, $on_complete, @tasks) = @_; my $mutex = Semaphore->new(1); my $remaining = @tasks; for my $task (@tasks) { my ($task_sub, $task_on_complete, @task_args) = @$task; $self->{queue}->enqueue(sub { $task_sub->( $self, sub { $task_on_complete->(@_); if (do { my $lock = $mutex->down(); --$remaining == 0 }) { $on_complete->(); } }, @task_args, ); }); } } # Must not be called from within a worker or task. sub process_and_wait { my $self = shift; my $done = Semaphore->new(0); $self->process_group(sub { $done->up() }, [ @_ ]); $done->wait(); } $INC{'Engine.pm'} = __FILE__; }