package MY::A; use strict; use warnings; use IO::Async::Loop; use Syntax::Keyword::Defer; use Future; use Future::AsyncAwait; use Net::Async::WebSocket::Client; our $VERSION = '0.01'; sub new { my ($class, $params) = @_; $params //= {}; my $self = { 'max-hellos' => (exists($params->{'max-hellos'})&&defined($params->{'max-hellos'})) ? $params->{'max-hellos'} : 4, 'websocket-uri' => (exists($params->{'websocket-uri'})&&defined($params->{'websocket-uri'})) ? $params->{'websocket-uri'} : 'wss://echo.websocket.org', }; bless $self => $class; return $self; } # Modified as per the suggestions of ikegami # at https://perlmonks.org/?node_id=11167325 async sub fetch { my ($self, $params) = @_; my $URI = $self->{'websocket-uri'}; my $MAX_HELLOS = $self->{'max-hellos'}; my $request = Protocol::WebSocket::Request->new( host => $URI, resource_name => __PACKAGE__ ); my $timer_id; my $loop = IO::Async::Loop->new; my $received = $loop->new_future(); my @data_received; my $num_items_received = 0; my $client = Net::Async::WebSocket::Client->new( on_text_frame => sub { my ( $clientself, $frame ) = @_; return unless $frame =~ /hello/; $num_items_received++; push @data_received, $frame; print __PACKAGE__." : on_text_frame() : got # ${num_items_received}: ${frame}\n"; # the 1st item is sent from the websocket like 'Request served by 4d896d95b55478' # so we will save this but don't count it in the below test if( $num_items_received >= $MAX_HELLOS ){ print __PACKAGE__." : on_text_frame() : CLOSING client $clientself.\n"; # we have received enough items, # stop the loop, is this the right way to do it? $received->done(__PACKAGE__.' : enough items received, loop is done.'); # even if we stopped the loop, # data is kept added unless we return() here #return; } }, ); print __PACKAGE__." : adding client $client to the loop ...\n"; $loop->add( $client ); defer { $loop->unwatch_time($timer_id); # if not in the loop it croaks! $loop->remove( $client ); print __PACKAGE__." : closing the websocket ...\n"; $client->close_now; print __PACKAGE__." : websocket is now closed.\n"; print __PACKAGE__." : end of defer code.\n"; } print __PACKAGE__." : waiting to connect to websocket ...\n"; await eval { $client->connect( url => $URI, ) }; if( $@ ){ print STDERR __PACKAGE__." : failed to connect to URI '${URI}'.\n"; return undef } print __PACKAGE__." : now connected to the websocket.\n"; # NOTE: $client will CLOSE after 4 items received # here we send 8 items, $client does not seem to be closed, # neither does $loop! for my $i (1..$MAX_HELLOS){ my $ts = "hello from ".__PACKAGE__." #${i}"; print __PACKAGE__." : sending data to socket : $ts\n"; $client->send_text_frame($ts); sleep(1); } # add this timer which will close the client and # in case the communication takes too long ... $timer_id = $loop->watch_time( after => 20, code => sub { $received->done({c=>0}); print __PACKAGE__." : timer was activated to end the loop ...\n"; } ); my $status = await $received; print __PACKAGE__." : loop is done with this message: ${status}\n"; #my $retdata = Future->done(\@data_received); #return $retdata; return \@data_received; } 1;