bliako has asked for the wisdom of the Perl Monks concerning the following question:

Esteemed Minks

I have several modules which fetch data from various websockets (WS), processes it and returns back results. Fetching and processing for each websocket is different so I guess I need many Modules.

In order to fetch data from WS I use IO::Async::Loop and Net::Async::WebSocket::Client with something like this:

async sub fetch { my $loop = IO::Async::Loop->new; my @data; my $client = Net::Async::WebSocket::Client->new( on_text_frame => sub { my ( $clientself, $frame ) = @_; ... push @data, $frame; # when I had enough frames if( 10 < scalar @data ){ say "I am ".__PACKAGE__."::fetch() and we had enough data"; $clientself->close; $loop->stop; return; } } # on_text_frame ); $loop->add( $client ); await $client->connect(url => $myurl); # tell the WS that we need data $client->send_text_frame('send me data'); $loop->run; my $retdata = Future->done(\@data)->get; # save $retdata to file ... # and return return $retdata; } sub give_me_data { # collapse the future my $data = $self->fetch()->get(); # process data ... return $processed_data }

I thought that worked OK, until I needed more modules like the above for differrent websockets (which provide different data structures, and need different processing, hence the many modules which blackbox the fetch+process data into a give_me_data())

Now, these modules are loaded like:

# main use ModuleA; use ModuleB; sub give_me_data { # no Future here: my $dataA = ModuleA->new()->give_me_data(); my $dataB = ModuleB->new()->give_me_data(); return ... }

But there is a problem: $dataB is *empty* because the Loop in ModuleB thinks that we have received enough frames already and stops receiving (the check when enough frames are received is included above). But the debug message when this happens coming from ModuleB mentions ModuleA::fetch()! Note that the data returned from ModuleB is empty (whereas ModuleA returned lots of data).

1. Does the code-sketch for fetching data look OK to you? It will be difficult to provide running code but I can try if needed.

2. Can there be multiple IO::Async::Loop's each one on its own Module? (EDITAnswer: no, there is a singleton loop object, so don't use loop->run and loop->stop when you have many clients to this loop). All modules loaded but the loops activated *sequentially*. My intention is to dispense each loop when enough data is fetched and then continue to the next Module's loop (instantiate, loop and dispense) and so on. Am I stopping and dispensing the loops OK? Why does it remember ModuleA's fetch() when running ModuleB's loop?

3. If I have hit a wall, is there a simpler, possibly blocking, alternative for receiving data from websocket? (I have tried Mojo::UserAgent with some failures for some of the websockets). I do not really need non-blocking fetches because I need the total data from all modules before I proceed.

many thanks, bw bliako

Replies are listed 'Best First'.
Re: IO::Async::Loop in multiple modules all loaded
by Corion (Patriarch) on Feb 14, 2026 at 07:20 UTC

    In general, you only want a single event loop. Most event loop modules solve this by having a singleton event loop that gets initialized on first use.

    I can't really comment on your code and the sequence of your flow because you don't show it, but if the fetch() seems to come from ModuleA while you expect it to come from ModuleB, then that sounds to me as if you're mixing up things somewhere.

    If you really want the fetch() from ModuleA and ModuleB to be executed sequentially, maybe launch them as

    ... my $dataA = await ModuleA->new()->give_me_data(); my $dataB = await ModuleB->new()->give_me_data(); ...

      I have now created a repository with example code and tests in order to play with websockets and IO::Async::Loop. It is located here/github,

      TL;DR: I can have a loop in each module. There does not seem to be a clash when all modules are fetch()ing which means there are instantiated loop object in each fetch() running, not a singleton.. Edit: Totally wrong, there is only one loop, a singleton and this is enforced internally by IO::Async::Loop.

      The only tiny problem I found is when fetching (i.e. running each module's fetch()) asynchronously with wait_all of Future. It does not run the last fetch() and it only returns just one result. Of course, the main culprit can be me.

      So, if anyone has time and knowledge, to upload the repo and run the tests and offer advice on:

      • Is running IO::Async::Loop loops from different modules safe and sound? It looks to me it is.
      • Why wait_all of Future as used in t/50-A-B-C-D-E-parallel.t does not work as I expect it?

      All in all, creating this repository was a good exercise but I am left with the same problems I posted although they do not appear in the test exercise at all. So, it makes me think that they can be because of communication glitches, running out of memory or other bugs. For example, in a server with little memory (6GB) and low CPU capacity, In either weak or strong servers I sometimes get one of these:

      • Too many fragments at .../site_perl/5.38.2/Protocol/WebSocket/Frame.pm line 232. Increasing the hardcoded max value seems to work (filed a bug/feature-request on this).
      • Can't call method "sysread" on an undefined value at ../site_perl/5.38.2/IO/Async/Stream.pm line ~985 (method _sysread). Basically it tries to read from stream but handle is undef. Perhaps because I tend to close the client and maybe prematurely. (note: edited this to be more accurate)

      Anyway, these problems do not appear in the test exercise, so my initial worry that multiple IO::Async::Loop objects somehow clash must be unfounded. I am now concentrating in how I send data to the socket, receive it, close the socket when had enough.

      Here is how I do it (the file lib/MY/A.pm in the provided repository):

      package MY::A; use strict; use warnings; use IO::Async::Loop; use Future; use Future::AsyncAwait; use Net::Async::WebSocket::Client; our $VERSION = '0.01'; sub new { my ($class, $params) = @_; $params //= {}; my $self = { 'max-hellos' => (exists($params->{'max-hellos'})&&defined($par +ams->{'max-hellos'})) ? $params->{'max-hellos'} : 4, 'websocket-uri' => (exists($params->{'websocket-uri'})&&define +d($params->{'websocket-uri'})) ? $params->{'websocket-uri'} : 'wss://echo.websocket.org', }; bless $self => $class; return $self; } async sub fetch { my ($self, $params) = @_; my $URI = $self->{'websocket-uri'}; my $MAX_HELLOS = $self->{'max-hellos'}; my $request = Protocol::WebSocket::Request->new( host => $URI, resource_name => __PACKAGE__ ); my $loop = IO::Async::Loop->new; my @data_received; my $num_items_received = 0; my $client = Net::Async::WebSocket::Client->new( on_text_frame => sub { my ( $clientself, $frame ) = @_; return unless $frame =~ /hello/; $num_items_received++; push @data_received, $frame; print __PACKAGE__." : on_text_frame() : got # ${num_items_rece +ived}: ${frame}\n"; # the 1st item is sent from the websocket like 'Request served + by 4d896d95b55478' # so we will save this but don't count it in the below test if( $num_items_received >= $MAX_HELLOS ){ print __PACKAGE__." : on_text_frame() : CLOSING client $cl +ientself.\n"; # we have received enough items, # stop the loop, is this the right way to do it? $loop->stop; $clientself->close_when_empty; # even if we stopped the loop, # data is kept added unless we return() here #return; } }, ); print __PACKAGE__." : adding client $client to the loop ...\n"; $loop->add( $client ); await eval { $client->connect( url => $URI, ) }; if( $@ ){ print STDERR __PACKAGE__." : failed to connect to URI '$ +{URI}'.\n"; return undef } # NOTE: $client will CLOSE after 4 items received # here we send 8 items, $client does not seem to be closed, # neither does $loop! for my $i (1..$MAX_HELLOS){ my $ts = "hello from ".__PACKAGE__." #${i}"; print __PACKAGE__." : sending data to socket : $ts\n"; $client->send_text_frame($ts); sleep(1); } $loop->run; #$client->close_when_empty if $client; #my $retdata = Future->done(\@data_received); #return $retdata; return \@data_received; } 1;

      thank you, bw bliako

        which means there are instantiated loop object in each fetch() running, not a singleton.

        That's not right. IO::Async::Loop->new returns a singleton.

        sub new { return our $ONE_TRUE_LOOP ||= shift->really_new; }

        That shouldn't be a problem, though. If your code is correct, it doesn't matter if multiple modules use the same loop.

        TL;DR: I can have a loop in each module

        Can't comment on anything else in your question, but doesn't this line contradict the above quote?

      Thank you Corion. Despite what you said, I did not realise that the singleton loop is enforced by the module internally. So, the use of $loop->run and $loop->stop is not the right way when the singleton loop is used by many modules, hence my problems.

Re: IO::Async::Loop in multiple modules all loaded
by ikegami (Patriarch) on Feb 18, 2026 at 21:34 UTC

    I think this is what you want:

    async sub fetch { my ( $self, $params ) = @_; my $loop = IO::Async::Loop->new; my $received = Future->new(); my @received; my $client = Net::Async::WebSocket::Client->new( on_text_frame => sub { my ( $clientself, $frame ) = @_; push @data_received, $frame; if ( $num_items_received >= $MAX_HELLOS ) { $received->done(); } } ); $loop->add( $client ); defer { $client->closenow; } ...; # $client->connect. ...; # $client->send_text_frame loop. $loop->await( $received ); return \@data_received; }

    Equivalent:

    async sub fetch { my ( $self, $params ) = @_; my $loop = IO::Async::Loop->new; my $received = $loop->new_future(); # <--- my @received; my $client = Net::Async::WebSocket::Client->new( on_text_frame => sub { my ( $clientself, $frame ) = @_; push @data_received, $frame; if ( $num_items_received >= $MAX_HELLOS ) { $received->done(); } } ); $loop->add( $client ); defer { $client->closenow; } ...; # $client->connect. ...; # $client->send_text_frame loop. await $received; # <--- return \@data_received; }

    Completely untested.

      Thank you ikegami for the code. I have used your 2nd version and it works fine (except s/closenow/close_now/ and use Syntax::Keyword::Defer;). Although Corion did mention the singleton I did not realise that this was enforced by IO::Async::Loop, internally.

      So, when one realises that the $loop is a singleton using $loop->stop and $loop->run is totally wrong (when adding more than one tasks/websockets [edit, after comment]: AND running in parallel). And thus the use of a blocking future until the websocket has run out.

      With your method, I now have (repository updated) the following which so far works OK except for glitches with just one of my client websockets where something goes wrong and tries to read from the stream when I already closed it and dieing. So I had to modify IO::Async::Stream's _sysread() and IO::Async::SSL's sslread() so as to return unless the handle is defined. I guess this solution is very unorthodox.

      I have also added a $loop->remove($client) in the defer{} block. And a timer to watch the socket for timeout. The code is now:

      package MY::A; use strict; use warnings; use IO::Async::Loop; use Syntax::Keyword::Defer; use Future; use Future::AsyncAwait; use Net::Async::WebSocket::Client; our $VERSION = '0.01'; sub new { my ($class, $params) = @_; $params //= {}; my $self = { 'max-hellos' => (exists($params->{'max-hellos'})&&defined($par +ams->{'max-hellos'})) ? $params->{'max-hellos'} : 4, 'websocket-uri' => (exists($params->{'websocket-uri'})&&define +d($params->{'websocket-uri'})) ? $params->{'websocket-uri'} : 'wss://echo.websocket.org', }; bless $self => $class; return $self; } # Modified as per the suggestions of ikegami # at https://perlmonks.org/?node_id=11167325 async sub fetch { my ($self, $params) = @_; my $URI = $self->{'websocket-uri'}; my $MAX_HELLOS = $self->{'max-hellos'}; my $request = Protocol::WebSocket::Request->new( host => $URI, resource_name => __PACKAGE__ ); my $timer_id; my $loop = IO::Async::Loop->new; my $received = $loop->new_future(); my @data_received; my $num_items_received = 0; my $client = Net::Async::WebSocket::Client->new( on_text_frame => sub { my ( $clientself, $frame ) = @_; return unless $frame =~ /hello/; $num_items_received++; push @data_received, $frame; print __PACKAGE__." : on_text_frame() : got # ${num_items_rece +ived}: ${frame}\n"; # the 1st item is sent from the websocket like 'Request served + by 4d896d95b55478' # so we will save this but don't count it in the below test if( $num_items_received >= $MAX_HELLOS ){ print __PACKAGE__." : on_text_frame() : CLOSING client $cl +ientself.\n"; # we have received enough items, # stop the loop, is this the right way to do it? $received->done(__PACKAGE__.' : enough items received, loo +p is done.'); # even if we stopped the loop, # data is kept added unless we return() here #return; } }, ); print __PACKAGE__." : adding client $client to the loop ...\n"; $loop->add( $client ); defer { $loop->unwatch_time($timer_id); # if not in the loop it croaks! $loop->remove( $client ); print __PACKAGE__." : closing the websocket ...\n"; $client->close_now; print __PACKAGE__." : websocket is now closed.\n"; print __PACKAGE__." : end of defer code.\n"; } print __PACKAGE__." : waiting to connect to websocket ...\n"; await eval { $client->connect( url => $URI, ) }; if( $@ ){ print STDERR __PACKAGE__." : failed to connect to URI '$ +{URI}'.\n"; return undef } print __PACKAGE__." : now connected to the websocket.\n"; # NOTE: $client will CLOSE after 4 items received # here we send 8 items, $client does not seem to be closed, # neither does $loop! for my $i (1..$MAX_HELLOS){ my $ts = "hello from ".__PACKAGE__." #${i}"; print __PACKAGE__." : sending data to socket : $ts\n"; $client->send_text_frame($ts); sleep(1); } # add this timer which will close the client and # in case the communication takes too long ... $timer_id = $loop->watch_time( after => 20, code => sub { $received->done({c=>0}); print __PACKAGE__." : timer was activated to end the loop +...\n"; } ); my $status = await $received; print __PACKAGE__." : loop is done with this message: ${status}\n" +; #my $retdata = Future->done(\@data_received); #return $retdata; return \@data_received; } 1;

      Many thanks, bw, bliako

        using $loop->stop and $loop->run is totally wrong

        You said the modules were run sequentially, so not definitively wrong. But I prefer a solution that doesn't use that approach if possible.

        use Syntax::Keyword::Defer;

        If you have Perl 5.36+, you can use use experimental qw( defer ); for the builtin version. (Syntax::Keyword::Defer was a proving ground for the builtin feature, I believe.)

        Updated to reflect that it's still experimental.