in reply to IO::Async::Loop in multiple modules all loaded

I think this is what you want:

async sub fetch { my ( $self, $params ) = @_; my $loop = IO::Async::Loop->new; my $received = Future->new(); my @received; my $client = Net::Async::WebSocket::Client->new( on_text_frame => sub { my ( $clientself, $frame ) = @_; push @data_received, $frame; if ( $num_items_received >= $MAX_HELLOS ) { $received->done(); } } ); $loop->add( $client ); defer { $client->closenow; } ...; # $client->connect. ...; # $client->send_text_frame loop. $loop->await( $received ); return \@data_received; }

Equivalent:

async sub fetch { my ( $self, $params ) = @_; my $loop = IO::Async::Loop->new; my $received = $loop->new_future(); # <--- my @received; my $client = Net::Async::WebSocket::Client->new( on_text_frame => sub { my ( $clientself, $frame ) = @_; push @data_received, $frame; if ( $num_items_received >= $MAX_HELLOS ) { $received->done(); } } ); $loop->add( $client ); defer { $client->closenow; } ...; # $client->connect. ...; # $client->send_text_frame loop. await $received; # <--- return \@data_received; }

Completely untested.

Replies are listed 'Best First'.
Re^2: IO::Async::Loop in multiple modules all loaded
by bliako (Abbot) on Feb 19, 2026 at 22:29 UTC

    Thank you ikegami for the code. I have used your 2nd version and it works fine (except s/closenow/close_now/ and use Syntax::Keyword::Defer;). Although Corion did mention the singleton I did not realise that this was enforced by IO::Async::Loop, internally.

    So, when one realises that the $loop is a singleton using $loop->stop and $loop->run is totally wrong (when adding more than one tasks/websockets [edit, after comment]: AND running in parallel). And thus the use of a blocking future until the websocket has run out.

    With your method, I now have (repository updated) the following which so far works OK except for glitches with just one of my client websockets where something goes wrong and tries to read from the stream when I already closed it and dieing. So I had to modify IO::Async::Stream's _sysread() and IO::Async::SSL's sslread() so as to return unless the handle is defined. I guess this solution is very unorthodox.

    I have also added a $loop->remove($client) in the defer{} block. And a timer to watch the socket for timeout. The code is now:

    package MY::A; use strict; use warnings; use IO::Async::Loop; use Syntax::Keyword::Defer; use Future; use Future::AsyncAwait; use Net::Async::WebSocket::Client; our $VERSION = '0.01'; sub new { my ($class, $params) = @_; $params //= {}; my $self = { 'max-hellos' => (exists($params->{'max-hellos'})&&defined($par +ams->{'max-hellos'})) ? $params->{'max-hellos'} : 4, 'websocket-uri' => (exists($params->{'websocket-uri'})&&define +d($params->{'websocket-uri'})) ? $params->{'websocket-uri'} : 'wss://echo.websocket.org', }; bless $self => $class; return $self; } # Modified as per the suggestions of ikegami # at https://perlmonks.org/?node_id=11167325 async sub fetch { my ($self, $params) = @_; my $URI = $self->{'websocket-uri'}; my $MAX_HELLOS = $self->{'max-hellos'}; my $request = Protocol::WebSocket::Request->new( host => $URI, resource_name => __PACKAGE__ ); my $timer_id; my $loop = IO::Async::Loop->new; my $received = $loop->new_future(); my @data_received; my $num_items_received = 0; my $client = Net::Async::WebSocket::Client->new( on_text_frame => sub { my ( $clientself, $frame ) = @_; return unless $frame =~ /hello/; $num_items_received++; push @data_received, $frame; print __PACKAGE__." : on_text_frame() : got # ${num_items_rece +ived}: ${frame}\n"; # the 1st item is sent from the websocket like 'Request served + by 4d896d95b55478' # so we will save this but don't count it in the below test if( $num_items_received >= $MAX_HELLOS ){ print __PACKAGE__." : on_text_frame() : CLOSING client $cl +ientself.\n"; # we have received enough items, # stop the loop, is this the right way to do it? $received->done(__PACKAGE__.' : enough items received, loo +p is done.'); # even if we stopped the loop, # data is kept added unless we return() here #return; } }, ); print __PACKAGE__." : adding client $client to the loop ...\n"; $loop->add( $client ); defer { $loop->unwatch_time($timer_id); # if not in the loop it croaks! $loop->remove( $client ); print __PACKAGE__." : closing the websocket ...\n"; $client->close_now; print __PACKAGE__." : websocket is now closed.\n"; print __PACKAGE__." : end of defer code.\n"; } print __PACKAGE__." : waiting to connect to websocket ...\n"; await eval { $client->connect( url => $URI, ) }; if( $@ ){ print STDERR __PACKAGE__." : failed to connect to URI '$ +{URI}'.\n"; return undef } print __PACKAGE__." : now connected to the websocket.\n"; # NOTE: $client will CLOSE after 4 items received # here we send 8 items, $client does not seem to be closed, # neither does $loop! for my $i (1..$MAX_HELLOS){ my $ts = "hello from ".__PACKAGE__." #${i}"; print __PACKAGE__." : sending data to socket : $ts\n"; $client->send_text_frame($ts); sleep(1); } # add this timer which will close the client and # in case the communication takes too long ... $timer_id = $loop->watch_time( after => 20, code => sub { $received->done({c=>0}); print __PACKAGE__." : timer was activated to end the loop +...\n"; } ); my $status = await $received; print __PACKAGE__." : loop is done with this message: ${status}\n" +; #my $retdata = Future->done(\@data_received); #return $retdata; return \@data_received; } 1;

    Many thanks, bw, bliako

      using $loop->stop and $loop->run is totally wrong

      You said the modules were run sequentially, so not definitively wrong. But I prefer a solution that doesn't use that approach if possible.

      use Syntax::Keyword::Defer;

      If you have Perl 5.36+, you can use use experimental qw( defer ); for the builtin version. (Syntax::Keyword::Defer was a proving ground for the builtin feature, I believe.)

      Updated to reflect that it's still experimental.