Beefy Boxes and Bandwidth Generously Provided by pair Networks
"be consistent"
 
PerlMonks  

Mixing asynchronous data feed with synchronous program flow control

by Your Mother (Archbishop)
on Jan 02, 2020 at 01:06 UTC ( [id://11110853]=perlquestion: print w/replies, xml ) Need Help??

Your Mother has asked for the wisdom of the Perl Monks concerning the following question:

Long story short: I want to write client code that consumes a webservice, asynchronously receiving data, but need to be able to synchronously make program flow decisions by sending messages to the server.

I have an approach that works but I am certain it’s hacky and The Wrong Way®. It’s achieved by making the websocket transaction variable, $TX, available from outside the transaction. Any guidance on the right way to mix async with sync code here would be much appreciated. Note, the client is all I care about. I provide the server to match just to help test. Modules: Net::WebSocket::Server, Mojo::IOLoop, and Mojo::UserAgent. Thank you!

Sample server and client…

Save and run the server–

#!/usr/bin/env perl use utf8; use 5.10.2; use Net::WebSocket::Server; my $Running = 0; Net::WebSocket::Server->new( listen => 8080, tick_period => 1, on_tick => sub { my $server = shift; return unless $Running; $_->send_utf8("Running") for $server->connections; }, on_connect => sub { my ( $server, $connection ) = @_; $connection->on( ready => sub { my $connnection = shift; $connection->send_utf8("Connected… RUN|STOP"); }, utf8 => sub { my ( $connection, $message ) = @_; if ( uc $message eq "RUN" ) { $connection->send_utf8( $Running ? "Already runnin +g" : "Starting to run…"); $Running = 1; } elsif ( uc $message eq "STOP" ) { $connection->send_utf8($Running ? "Stopping…" : "N +ot running"); $Running = 0; } else { $connection->send_utf8("Unknown command: $message" +); } }); }) ->start;

Then the client–

#!/usr/bin/env perl use 5.10.2; use utf8; use strictures; use Mojo::UserAgent; my $ua = Mojo::UserAgent->new; my $TX; # This is what has me thinking this is hacky/kludgy. my $conn = $ua->websocket("ws://localhost:8080/" => sub { my $ua = shift; $TX = shift; say "WebSocket handshake failed!" and return unless $TX->is_websocket; $TX->on(finish => sub { my ( $tx, $code, $reason ) = @_; say "WebSocket closed with status $code."; exit; }); $TX->on(text => sub { my $tx = shift; say "Server > ", +shift; }); }); my @command = qw/ RUN STOP /; Mojo::IOLoop->recurring(5 => sub { my $command = $command[rand@command]; say "Client > $command"; $TX->send($command); }); Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

Replies are listed 'Best First'.
Re: Mixing asynchronous data feed with synchronous program flow control
by haukex (Archbishop) on Jan 02, 2020 at 08:29 UTC

    If the client is as simple as shown, I think the code is ok - there are a couple of chances for race conditions in regards to $TX not being in the correct state when the timer fires, but for a command-line script that may be acceptable (I'd just add some $SIG{INT} handlers to server and client). If this is part of a larger app, here's how I would have coded it:

    • Tie the lifetime of the timer to the lifetime of the WebSocket client
    • Don't exit from inside the handler, allow for graceful shutdown
    my $ua = Mojo::UserAgent->new; my $conn = $ua->websocket("ws://localhost:8080/" => sub { my ($ua, $tx) = @_; say "WebSocket handshake failed!" and return unless $tx->is_websocket; my $id = Mojo::IOLoop->recurring(5 => sub { my @command = qw/ RUN STOP /; my $command = $command[rand @command]; say "Client > $command"; $tx->send($command); }); $tx->on(finish => sub { my ($tx, $code, $reason) = @_; say "WebSocket closed with status $code."; Mojo::IOLoop->remove($id); Mojo::IOLoop->stop_gracefully; }); $tx->on(text => sub { my ($tx, $bytes) = @_; say "Server > ", $bytes; }); }); Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
      race conditions in regards to $TX not being in the correct state when the timer fires

      In a program using a single event loop, that cannot happen — timers only produce a delay until some time, but do not interrupt other event handlers and timer events are not processed until other callbacks return. There are no races provided that all callbacks return all resources to "safe states" before returning.

        In a program using a single event loop, that cannot happen — timers only produce a delay until some time, but do not interrupt other event handlers and timer events are not processed until other callbacks return. There are no races provided that all callbacks return all resources to "safe states" before returning.

        Ummm what? Sorry, but no. Take the code exactly as posted in the root node, but in the server code, insert a "sleep 5;" as the first line in on_connect to simulate bad network delay, and then the client will, on many runs, throw the error "Mojo::Reactor::EV: Timer failed: Can't call method "send" on an undefined value", because $TX is still undef at that point. That demonstrates exactly the issue that I was talking about. The code I posted doesn't suffer from that particular issue, because it doesn't start the timer until the connection object is actually available.

        Update: I admit my wording "there are a couple of chances for race conditions" may have been a stretch, I initially thought I saw more than the one I described above, but I'm not so sure anymore. However, your claim "that cannot happen" is still incorrect.

Re: Mixing asynchronous data feed with synchronous program flow control
by bliako (Monsignor) on Jan 02, 2020 at 11:12 UTC

    I like to encapsulate the mess and use a class.

    The class first init() the websocket, then add() handlers and callbacks on receiving data from server. Finally, a send($message) sub to send whatever. Having a send() also deals with what haukex says about $TX being in the correct state.

    Depending on what you want to do, you can make this class store a lot of data internally for the callbacks to use it. i.e. avoid having a lot of $TX and @command hanging around outside your class. One way would be to create one class for each situation you want your client to be used. Another is to write the basic class and functionality and then extend it with per-situation classes.

    bw, bliako

Re: Mixing asynchronous data feed with synchronous program flow control
by jcb (Parson) on Jan 02, 2020 at 04:35 UTC

    It looks reasonable to me. Process the arriving data in an asynchronous callback and arrange timer events for any synchronous commands that need to be issued, sending the commands in the timer callback. Another option would be to have one process handling both data and commands asynchronously and a synchronous process that talks to that multiplexer and makes the decisions.

    Event-driven programming is just messy like this.

      Event-driven programming is just messy like this.

      It doesn't have to be, though, it's usually just that it requires more re-thinking of the structure of the code and scoping of variables, and thinking of everything in terms of events - IMHO Your Mother's instinct of "there must be a more elegant solution" was right :-)

Re: Mixing asynchronous data feed with synchronous program flow control (in Raku)
by holli (Abbot) on Jan 03, 2020 at 22:21 UTC
    Only FWIW and good measure.

    Server:
    use Cro::HTTP::Router; use Cro::HTTP::Server; use Cro::HTTP::Router::WebSocket; my $application = route { get -> { web-socket -> $incoming { supply { my Bool $Running; multi sub process-command( "RUN" ) { True, ( $Running ?? "Already running" !! "Starting + to run…" ); } multi sub process-command( "STOP" ) { False, ( $Running ?? "Stopping…" !! "Not running" +); } multi sub process-command( $command ) { $Running, "Unknown command: $command"; } emit "Connected… RUN|STOP\n" ~ "Service is { $Running ?? "already" !! "not" } ru +nning"; whenever $incoming -> $message { my $command = await $message.body-text; say "Command: $command"; ($Running, my $answer) = process-command( $command + ); emit $answer; } } } } } my Cro::Service $service = Cro::HTTP::Server.new( :host<localhost>, :port<8080>, :$application ); $service.start; say "Started"; react whenever signal(SIGINT) { say "Killed"; $service.stop; exit; }
    Client:
    use Cro::WebSocket::Client; say "Connecting..."; my $conn = try await Cro::WebSocket::Client.connect( 'ws://localhost:8 +080' ); say "WebSocket handshake failed!" and exit unless $conn; say "Connected"; react { # It is guarantueed, only one whenever block in this react block c +an be running at any given time # There are no race conditions, for $conn whenever $conn.messages -> $message { say "Incoming: ", await $message.body; } # Currently this part only works for me since I patched in the clo +sing - supply. # But I am confident my pull request will be accecpted in some for +m # Maybe not with that exact name whenever $conn.closing -> $reason { say "Closing: $reason"; done; } # Without the above, one would have to check here for # $conn.closed != True before calling `send` whenever Supply.interval(5) -> $tick { my $command = ("RUN", "STOP").pick; say "Sending: $command"; $conn.send( $command ); CATCH { say "Error sending <$command>: {.message}" } } };


    holli

    You can lead your users to water, but alas, you cannot drown them.

      Thank you very much. I’m interested in all (clean) approaches to this. What I’ve got, and haukex corrected, is working well but it’s a prototype still that is going to grow into quite a lot of code so I’m going to look into anything I have time to learn.

        Have you considered POE? (Learning POE is on my to-do list.)

        Or if other languages are an option and this needs to scale out to a large number of connections, have you considered Erlang/OTP? (Which, as I currently understand, effectively hides a multi-threaded event loop in the language runtime, but more-or-less forces you into a message-passing functional programming paradigm as a result. I have heard good things about it but have not yet used it for a non-trivial program myself.)

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: perlquestion [id://11110853]
Approved by LanX
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others lurking in the Monastery: (8)
As of 2024-03-28 17:07 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found