package Foo::Inbox2;
use strict;
use warnings;
our $VERSION = '0.003';
use MCE::Shared;
# $inbox = Foo::Inbox->new();
sub new {
my ( $class, @names ) = @_;
my %self = map { $_ => MCE::Shared->queue( fast => 1 ) } @names;
MCE::Shared->start() unless ( exists $INC{'IO/FDPass.pm'} );
bless \%self, $class;
}
# $scalar = $inbox->size( [ $key ] );
# %pairs = $inbox->size();
sub size {
my ( $self, $key ) = @_;
if ( defined $key ) {
exists $self->{$key} ? $self->{$key}->pending() : 0;
}
elsif ( wantarray ) {
local $_;
map { $_ => $self->{$_}->pending() } keys %{ $self };
}
else {
my $size = 0;
foreach my $key ( keys %{ $self } ) {
$size += $self->{$key}->pending();
}
$size;
}
}
# $inbox->send( $from, $to, $arg1, ... );
# $inbox->send( $from, \@list, $arg1, ... );
sub send {
my ( $self, $from, $to ) = ( shift, shift, shift );
my $mesg = [ $from, [ @_ ] ];
if ( ref $to eq 'ARRAY' ) {
$self->{$_ }->enqueue($mesg) for @{ $to };
} else {
$self->{$to}->enqueue($mesg);
}
return;
}
# $inbox->recv( $from );
sub recv {
my ( $self, $from ) = @_;
return () unless exists $self->{$from};
@{ $self->{$from}->dequeue() // [] };
}
# $inbox->end();
sub end {
my ( $self ) = @_;
foreach my $from ( values %{ $self } ) {
$from->end();
}
return;
}
1;
package Foo::Inbox4;
use strict;
use warnings;
our $VERSION = '0.003';
use Thread::Queue;
my ( $freeze, $thaw );
BEGIN {
if ( !exists $INC{'PDL.pm'} ) {
eval '
use Sereal::Encoder 3.015 qw( encode_sereal );
use Sereal::Decoder 3.015 qw( decode_sereal );
';
if ( !$@ ) {
my $encoder_ver = int( Sereal::Encoder->VERSION() );
my $decoder_ver = int( Sereal::Decoder->VERSION() );
# ensure the base version match e.g. 3
if ( $encoder_ver - $decoder_ver == 0 ) {
$freeze = sub { encode_sereal( @_, { freeze_callbacks => 1 } )
+ },
$thaw = \&decode_sereal;
}
}
}
if ( !defined $freeze ) {
require Storable;
$freeze = \&Storable::freeze,
$thaw = \&Storable::thaw;
}
}
# $inbox = Foo::Inbox->new();
sub new {
my ( $class, @names ) = @_;
my %self = map { $_ => Thread::Queue->new() } @names;
bless \%self, $class;
}
# $scalar = $inbox->size( [ $key ] );
# %pairs = $inbox->size();
sub size {
my ( $self, $key ) = @_;
if ( defined $key ) {
exists $self->{$key} ? $self->{$key}->pending() : 0;
}
elsif ( wantarray ) {
local $_;
map { $_ => $self->{$_}->pending() } keys %{ $self };
}
else {
my $size = 0;
foreach my $key ( keys %{ $self } ) {
$size += $self->{$key}->pending();
}
$size;
}
}
# $inbox->send( $from, $to, $arg1, ... );
# $inbox->send( $from, \@list, $arg1, ... );
sub send {
my ( $self, $from, $to ) = ( shift, shift, shift );
my $mesg = $freeze->( [ $from, [ @_ ] ] );
if ( ref $to eq 'ARRAY' ) {
$self->{$_ }->enqueue($mesg) for @{ $to };
} else {
$self->{$to}->enqueue($mesg);
}
return;
}
# $inbox->recv( $from );
sub recv {
my ( $self, $from ) = @_;
return () unless exists $self->{$from};
my $mesg = $self->{$from}->dequeue();
$mesg ? @{ $thaw->($mesg) } : ();
}
# $inbox->end();
sub end {
my ( $self ) = @_;
foreach my $from ( values %{ $self } ) {
$from->end();
}
return;
}
1;
A demo and benchmark will follow in the immediate post(s).