package CBStats::MultiChannel; use 5.12.2; use Coro::Channel; use Scalar::Util qw(weaken); sub new { my $class = shift; my $self = bless [], $class; $self; } # clear out any channels that have gone away sub clean { my $self = shift; @$self = grep { defined } @$self; # when we pull out the refs this way, they're # no longer weakened, so re-weaking everything. # (easier than using splice to pull undef items out - # if we get too many readers, we'll re-evaluate if this # is slow.) weaken($_) for @$self; } sub count { my $self = shift; $self->clean(); scalar @$self; } sub status { my $self = shift; $self->clean(); 'Channel=size :: ' . join ":", map { $_ . "=" . $self->[$_]->size() } 0..$#$self; } # create new channel, add it to $self, ensure it's weakened, and return # the non-weak version. sub add { my $self = shift; my $channel = Coro::Channel->new(); push @$self, $channel; $self->clean(); $channel; } # pass a message to all listeners. sub put { my $self = shift; $self->clean(); for my $msg (@_) { # if we were really multi-threaded, we'd still # have to check if $_ was defined, but Coro eliminates # that possibility since nothing else really runs between # the clean() above and this (we don't cede) $_->put($msg) for (@$self); } } 1;