use strict;
use warnings;
use Engine qw( );
use constant NUM_WORKERS => 4;
sub fibonacci_task {
my ($engine, $on_complete, $n) = @_;
return $on_complete->(0) if $n == 0;
return $on_complete->(1) if $n == 1;
my ($x,$y);
$engine->process_group(
sub { $on_complete->($x+$y) },
[ \&fibonacci_task => (sub { $x = $_[0] }, $n-2) ],
[ \&fibonacci_task => (sub { $y = $_[0] }, $n-1) ],
);
}
sub fibonacci {
my ($engine, $n) = @_;
my $result;
$engine->process_and_wait(
\&fibonacci_task => ( sub { $result = $_[0] }, $n )
);
return $result;
}
{
my $engine = Engine->new(NUM_WORKERS);
printf("%s! = %s\n", $_, fibonacci($engine, $_)) for 1..10;
}
####
1! = 1
2! = 1
3! = 2
4! = 3
5! = 5
6! = 8
7! = 13
8! = 21
9! = 34
10! = 55
####
BEGIN {
package Semaphore;
use strict;
use warnings;
use Coro::Semaphore qw( );
sub new {
my ($class, $count) = @_;
my $s = Coro::Semaphore->new($count);
return bless(\$s, $class);
}
sub down { my ($self) = @_; my $s = $$self; return $s->guard(); }
sub up { my ($self) = @_; my $s = $$self; return $s->up(); }
sub wait { my ($self) = @_; my $s = $$self; return $s->wait(); }
$INC{'Semaphore.pm'} = __FILE__;
}
BEGIN {
package Queue;
use strict;
use warnings;
use Coro::Channel qw( );
sub new {
my ($class) = @_;
my $q = Coro::Channel->new();
return bless(\$q, $class);
}
sub enqueue { my ($self, $item) = @_; my $q = $$self; return $q->put($item); }
sub dequeue { my ($self) = @_; my $q = $$self; return $q->get(); }
$INC{'Queue.pm'} = __FILE__;
}
BEGIN {
package Engine;
use strict;
use warnings;
use Coro qw( async );
use Queue qw( );
use Semaphore qw( );
sub new {
my ($class, $num_workers) = @_;
my $q = Queue->new();
for (1..$num_workers) {
async {
for (;;) {
$q->dequeue()->();
}
};
}
return bless({ queue => $q }, $class);
}
sub process_group {
my ($self, $on_complete, @tasks) = @_;
my $mutex = Semaphore->new(1);
my $remaining = @tasks;
for my $task (@tasks) {
my ($task_sub, $task_on_complete, @task_args) = @$task;
$self->{queue}->enqueue(sub {
$task_sub->(
$self,
sub {
$task_on_complete->(@_);
if (do { my $lock = $mutex->down(); --$remaining == 0 }) {
$on_complete->();
}
},
@task_args,
);
});
}
}
# Must not be called from within a worker or task.
sub process_and_wait {
my $self = shift;
my $done = Semaphore->new(0);
$self->process_group(sub { $done->up() }, [ @_ ]);
$done->wait();
}
$INC{'Engine.pm'} = __FILE__;
}