http://qs1969.pair.com?node_id=11111532


in reply to Architecture advice, proxy or rebroadcast websocket

Personally, I'd implement your second suggestion (a multiplexer in Mojo), with the thirdfourth suggestion (DB) in second place, and the other two sound too complicated for my tastes ;-) Here's how I might have implemented the second option, with the sample server and client based on your code in 11110853:

serv.pl:

#!/usr/bin/env perl use Mojolicious::Lite -signatures; use Data::Dump qw/dd pp/; # morbo --listen='http://localhost:3000' serv.pl # This code assumes a single thread / no forking! my $single_client; websocket '/' => sub ($c) { if ( defined $single_client ) { $c->app->log->debug('WebSocket opened, rejecting'); $c->send("Too many client connections"); $c->tx->finish; return; } $c->app->log->debug('WebSocket opened'); $c->inactivity_timeout(45); $single_client = $c->tx; my $timer_id; $c->on(message => sub ($c, $msg) { if ( uc $msg eq "RUN" ) { if ( !defined $timer_id ) { $c->send("Starting to run"); $timer_id = Mojo::IOLoop->recurring( 1 => sub { # pretend to be a fragile server if ( rand > 0.97 ) { $c->app->log->warn('Oops, disconnecting!'); $single_client->finish; } elsif ( rand > 0.95 ) { $c->app->log->warn('Oops, freezing!'); Mojo::IOLoop->remove($timer_id); } else { $c->send("Running ".(time-$^T)) } } ); } else { $c->send("Already running") } } elsif ( uc $msg eq "STOP" ) { if ( defined $timer_id ) { $c->send("Stopping..."); Mojo::IOLoop->remove($timer_id); $timer_id = undef; } else { $c->send("Not running") } } elsif ( uc $msg eq "HEARTBEAT" ) { } else { $c->send("Unknown command: $msg") } }); $c->on(finish => sub ($c, $code, $reason=undef) { $c->app->log->debug("WebSocket closed ".pp($code, $reason)); Mojo::IOLoop->remove($timer_id); $single_client = undef; }); $c->send("Connected... RUN|STOP"); }; app->start

cli.pl:

#!/usr/bin/env perl use Mojo::Base -strict, -signatures; use Mojo::UserAgent; use Mojo::IOLoop; use Data::Dump qw/dd pp/; # To connect to mux.pl: perl cli.pl ws://localhost:3001 my $ADDR = @ARGV ? $ARGV[0] : 'ws://localhost:3000'; say "Opening WebSocket..."; my $ua = Mojo::UserAgent->new; $ua->inactivity_timeout(10); # must be more than mux's timeout $ua->websocket($ADDR => sub ($ua, $tx) { die 'WebSocket handshake failed!'.pp($tx) unless $tx->is_websocket; my $timer_id = Mojo::IOLoop->recurring( 30 => sub { say "Sending heartbeat"; $tx->send('HEARTBEAT'); } ); $tx->on(finish => sub ($tx, $code, $reason=undef) { say "WebSocket closed ".pp($code, $reason); Mojo::IOLoop->remove($timer_id); }); $tx->on(message => sub ($tx, $msg) { say "RX: ".pp($msg); }); $tx->send('RUN'); }); Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

mux.pl:

#!/usr/bin/env perl use Mojolicious::Lite -signatures; use Mojo::UserAgent; use Mojo::EventEmitter; use Data::Dump qw/dd pp/; # morbo --listen='http://localhost:3001' mux.pl # This code assumes a single thread / no forking! my $event = Mojo::EventEmitter->new; sub open_server_conn { state $ua = Mojo::UserAgent->new; $ua->inactivity_timeout(5); # must be less than client's timeout app->log->debug("Opening WebSocket to server..."); $ua->websocket('ws://localhost:3000' => sub ($ua, $tx) { die 'WebSocket handshake failed!'.pp($tx) unless $tx->is_websocket; my $timer_id = Mojo::IOLoop->recurring( 30 => sub { app->log->debug("Sending heartbeat"); $tx->send('HEARTBEAT'); } ); $tx->on(finish => sub ($tx, $code, $reason=undef) { app->log->debug("Server WebSocket closed " .pp($code, $reason)); Mojo::IOLoop->remove($timer_id); open_server_conn(); # reopen }); $tx->on(message => sub ($tx, $msg) { # message from server return if $msg=~/^Connected\b|^Starting\b/; return unless $event->has_subscribers('message'); app->log->debug("Relaying ".pp($msg)); $event->emit( message => $msg ); }); $tx->send('RUN'); }); } Mojo::IOLoop->next_tick(\&open_server_conn); websocket '/' => sub ($c) { # client connections $c->app->log->debug('Client connected'); $c->inactivity_timeout(45); my $tx = $c->tx; # client transaction my $cb = $event->on(message => sub ($e,$msg) { $tx->send($msg) }); $c->on(message => sub ($c, $msg) { # message from client if ( uc $msg eq "RUN" ) { } # ignore client messages (for now) elsif ( uc $msg eq "STOP" ) { } elsif ( uc $msg eq "HEARTBEAT" ) { } else { $tx->send("Unknown command: $msg") } }); $c->on(finish => sub ($c, $code, $reason=undef) { $c->app->log->debug("Client disconn ".pp($code, $reason)); $event->unsubscribe(message => $cb); ($tx,$cb) = (); }); $c->send("Connected... RUN|STOP"); }; app->start

Note how in the multiplexer, I'm using a Mojo::EventEmitter to decouple the server messages from the client connections. Of course the code has a few assumptions that you are free to adapt, like whether you want to ignore or forward messages coming from each client to the server (in the latter case, you could also relay them via the event emitter using a different event name, for example), or and more generally how transparent you want to multiplexer to be (or not).