Your Mother has asked for the wisdom of the Perl Monks concerning the following question:
Long story short: I want to write client code that consumes a webservice, asynchronously receiving data, but need to be able to synchronously make program flow decisions by sending messages to the server.
I have an approach that works but I am certain it’s hacky and The Wrong Way®. It’s achieved by making the websocket transaction variable, $TX, available from outside the transaction. Any guidance on the right way to mix async with sync code here would be much appreciated. Note, the client is all I care about. I provide the server to match just to help test. Modules: Net::WebSocket::Server, Mojo::IOLoop, and Mojo::UserAgent. Thank you!
Sample server and client…
Save and run the server–
#!/usr/bin/env perl
use utf8;
use 5.10.2;
use Net::WebSocket::Server;
my $Running = 0;
Net::WebSocket::Server->new(
listen => 8080,
tick_period => 1,
on_tick => sub {
my $server = shift;
return unless $Running;
$_->send_utf8("Running")
for $server->connections;
},
on_connect => sub {
my ( $server, $connection ) = @_;
$connection->on(
ready => sub {
my $connnection = shift;
$connection->send_utf8("Connected… RUN|STOP");
},
utf8 => sub {
my ( $connection, $message ) = @_;
if ( uc $message eq "RUN" )
{
$connection->send_utf8( $Running ? "Already runnin
+g" : "Starting to run…");
$Running = 1;
}
elsif ( uc $message eq "STOP" )
{
$connection->send_utf8($Running ? "Stopping…" : "N
+ot running");
$Running = 0;
}
else
{
$connection->send_utf8("Unknown command: $message"
+);
}
});
})
->start;
Then the client–
#!/usr/bin/env perl
use 5.10.2;
use utf8;
use strictures;
use Mojo::UserAgent;
my $ua = Mojo::UserAgent->new;
my $TX; # This is what has me thinking this is hacky/kludgy.
my $conn = $ua->websocket("ws://localhost:8080/" => sub {
my $ua = shift;
$TX = shift;
say "WebSocket handshake failed!"
and return unless $TX->is_websocket;
$TX->on(finish => sub {
my ( $tx, $code, $reason ) = @_;
say "WebSocket closed with status $code.";
exit;
});
$TX->on(text => sub {
my $tx = shift;
say "Server > ", +shift;
});
});
my @command = qw/ RUN STOP /;
Mojo::IOLoop->recurring(5 => sub {
my $command = $command[rand@command];
say "Client > $command";
$TX->send($command);
});
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
Re: Mixing asynchronous data feed with synchronous program flow control
by haukex (Archbishop) on Jan 02, 2020 at 08:29 UTC
|
If the client is as simple as shown, I think the code is ok - there are a couple of chances for race conditions in regards to $TX not being in the correct state when the timer fires, but for a command-line script that may be acceptable (I'd just add some $SIG{INT} handlers to server and client). If this is part of a larger app, here's how I would have coded it:
- Tie the lifetime of the timer to the lifetime of the WebSocket client
- Don't exit from inside the handler, allow for graceful shutdown
my $ua = Mojo::UserAgent->new;
my $conn = $ua->websocket("ws://localhost:8080/" => sub {
my ($ua, $tx) = @_;
say "WebSocket handshake failed!"
and return unless $tx->is_websocket;
my $id = Mojo::IOLoop->recurring(5 => sub {
my @command = qw/ RUN STOP /;
my $command = $command[rand @command];
say "Client > $command";
$tx->send($command);
});
$tx->on(finish => sub {
my ($tx, $code, $reason) = @_;
say "WebSocket closed with status $code.";
Mojo::IOLoop->remove($id);
Mojo::IOLoop->stop_gracefully;
});
$tx->on(text => sub {
my ($tx, $bytes) = @_;
say "Server > ", $bytes;
});
});
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
| [reply] [Watch: Dir/Any] [d/l] [select] |
|
race conditions in regards to $TX not being in the correct state when the timer fires
In a program using a single event loop, that cannot happen — timers only produce a delay until some time, but do not interrupt other event handlers and timer events are not processed until other callbacks return. There are no races provided that all callbacks return all resources to "safe states" before returning.
| [reply] [Watch: Dir/Any] [d/l] |
|
In a program using a single event loop, that cannot happen — timers only produce a delay until some time, but do not interrupt other event handlers and timer events are not processed until other callbacks return. There are no races provided that all callbacks return all resources to "safe states" before returning.
Ummm what? Sorry, but no. Take the code exactly as posted in the root node, but in the server code, insert a "sleep 5;" as the first line in on_connect to simulate bad network delay, and then the client will, on many runs, throw the error "Mojo::Reactor::EV: Timer failed: Can't call method "send" on an undefined value", because $TX is still undef at that point. That demonstrates exactly the issue that I was talking about. The code I posted doesn't suffer from that particular issue, because it doesn't start the timer until the connection object is actually available.
Update: I admit my wording "there are a couple of chances for race conditions" may have been a stretch, I initially thought I saw more than the one I described above, but I'm not so sure anymore. However, your claim "that cannot happen" is still incorrect.
| [reply] [Watch: Dir/Any] [d/l] [select] |
|
|
|
Re: Mixing asynchronous data feed with synchronous program flow control
by bliako (Monsignor) on Jan 02, 2020 at 11:12 UTC
|
I like to encapsulate the mess and use a class.
The class first init() the websocket, then add() handlers and callbacks on receiving data from server. Finally, a send($message) sub to send whatever. Having a send() also deals with what haukex says about $TX being in the correct state.
Depending on what you want to do, you can make this class store a lot of data internally for the callbacks to use it. i.e. avoid having a lot of $TX and @command hanging around outside your class. One way would be to create one class for each situation you want your client to be used. Another is to write the basic class and functionality and then extend it with per-situation classes.
bw, bliako
| [reply] [Watch: Dir/Any] |
Re: Mixing asynchronous data feed with synchronous program flow control
by jcb (Parson) on Jan 02, 2020 at 04:35 UTC
|
It looks reasonable to me. Process the arriving data in an asynchronous callback and arrange timer events for any synchronous commands that need to be issued, sending the commands in the timer callback. Another option would be to have one process handling both data and commands asynchronously and a synchronous process that talks to that multiplexer and makes the decisions.
Event-driven programming is just messy like this.
| [reply] [Watch: Dir/Any] |
|
Event-driven programming is just messy like this.
It doesn't have to be, though, it's usually just that it requires more re-thinking of the structure of the code and scoping of variables, and thinking of everything in terms of events - IMHO Your Mother's instinct of "there must be a more elegant solution" was right :-)
| [reply] [Watch: Dir/Any] [d/l] |
Re: Mixing asynchronous data feed with synchronous program flow control (in Raku)
by holli (Abbot) on Jan 03, 2020 at 22:21 UTC
|
Only FWIW and good measure.
Server:
use Cro::HTTP::Router;
use Cro::HTTP::Server;
use Cro::HTTP::Router::WebSocket;
my $application = route
{
get ->
{
web-socket -> $incoming
{
supply
{
my Bool $Running;
multi sub process-command( "RUN" )
{
True, ( $Running ?? "Already running" !! "Starting
+ to run…" );
}
multi sub process-command( "STOP" )
{
False, ( $Running ?? "Stopping…" !! "Not running"
+);
}
multi sub process-command( $command )
{
$Running, "Unknown command: $command";
}
emit "Connected… RUN|STOP\n" ~
"Service is { $Running ?? "already" !! "not" } ru
+nning";
whenever $incoming -> $message
{
my $command = await $message.body-text;
say "Command: $command";
($Running, my $answer) = process-command( $command
+ );
emit $answer;
}
}
}
}
}
my Cro::Service $service = Cro::HTTP::Server.new(
:host<localhost>, :port<8080>, :$application
);
$service.start;
say "Started";
react whenever signal(SIGINT) {
say "Killed";
$service.stop;
exit;
}
Client:
use Cro::WebSocket::Client;
say "Connecting...";
my $conn = try await Cro::WebSocket::Client.connect( 'ws://localhost:8
+080' );
say "WebSocket handshake failed!" and exit
unless $conn;
say "Connected";
react
{
# It is guarantueed, only one whenever block in this react block c
+an be running at any given time
# There are no race conditions, for $conn
whenever $conn.messages -> $message
{
say "Incoming: ", await $message.body;
}
# Currently this part only works for me since I patched in the clo
+sing - supply.
# But I am confident my pull request will be accecpted in some for
+m
# Maybe not with that exact name
whenever $conn.closing -> $reason
{
say "Closing: $reason";
done;
}
# Without the above, one would have to check here for
# $conn.closed != True before calling `send`
whenever Supply.interval(5) -> $tick
{
my $command = ("RUN", "STOP").pick;
say "Sending: $command";
$conn.send( $command );
CATCH { say "Error sending <$command>: {.message}" }
}
};
holli
You can lead your users to water, but alas, you cannot drown them.
| [reply] [Watch: Dir/Any] [d/l] [select] |
|
Thank you very much. I’m interested in all (clean) approaches to this. What I’ve got, and haukex corrected, is working well but it’s a prototype still that is going to grow into quite a lot of code so I’m going to look into anything I have time to learn.
| [reply] [Watch: Dir/Any] |
|
Have you considered POE? (Learning POE is on my to-do list.)
Or if other languages are an option and this needs to scale out to a large number of connections, have you considered Erlang/OTP? (Which, as I currently understand, effectively hides a multi-threaded event loop in the language runtime, but more-or-less forces you into a message-passing functional programming paradigm as a result. I have heard good things about it but have not yet used it for a non-trivial program myself.)
| [reply] [Watch: Dir/Any] |
|
|
|