#!/usr/bin/env perl use Mojolicious::Lite -signatures; use Data::Dump qw/dd pp/; # morbo --listen='http://localhost:3000' serv.pl # This code assumes a single thread / no forking! my $single_client; websocket '/' => sub ($c) { if ( defined $single_client ) { $c->app->log->debug('WebSocket opened, rejecting'); $c->send("Too many client connections"); $c->tx->finish; return; } $c->app->log->debug('WebSocket opened'); $c->inactivity_timeout(45); $single_client = $c->tx; my $timer_id; $c->on(message => sub ($c, $msg) { if ( uc $msg eq "RUN" ) { if ( !defined $timer_id ) { $c->send("Starting to run"); $timer_id = Mojo::IOLoop->recurring( 1 => sub { # pretend to be a fragile server if ( rand > 0.97 ) { $c->app->log->warn('Oops, disconnecting!'); $single_client->finish; } elsif ( rand > 0.95 ) { $c->app->log->warn('Oops, freezing!'); Mojo::IOLoop->remove($timer_id); } else { $c->send("Running ".(time-$^T)) } } ); } else { $c->send("Already running") } } elsif ( uc $msg eq "STOP" ) { if ( defined $timer_id ) { $c->send("Stopping..."); Mojo::IOLoop->remove($timer_id); $timer_id = undef; } else { $c->send("Not running") } } elsif ( uc $msg eq "HEARTBEAT" ) { } else { $c->send("Unknown command: $msg") } }); $c->on(finish => sub ($c, $code, $reason=undef) { $c->app->log->debug("WebSocket closed ".pp($code, $reason)); Mojo::IOLoop->remove($timer_id); $single_client = undef; }); $c->send("Connected... RUN|STOP"); }; app->start #### #!/usr/bin/env perl use Mojo::Base -strict, -signatures; use Mojo::UserAgent; use Mojo::IOLoop; use Data::Dump qw/dd pp/; # To connect to mux.pl: perl cli.pl ws://localhost:3001 my $ADDR = @ARGV ? $ARGV[0] : 'ws://localhost:3000'; say "Opening WebSocket..."; my $ua = Mojo::UserAgent->new; $ua->inactivity_timeout(10); # must be more than mux's timeout $ua->websocket($ADDR => sub ($ua, $tx) { die 'WebSocket handshake failed!'.pp($tx) unless $tx->is_websocket; my $timer_id = Mojo::IOLoop->recurring( 30 => sub { say "Sending heartbeat"; $tx->send('HEARTBEAT'); } ); $tx->on(finish => sub ($tx, $code, $reason=undef) { say "WebSocket closed ".pp($code, $reason); Mojo::IOLoop->remove($timer_id); }); $tx->on(message => sub ($tx, $msg) { say "RX: ".pp($msg); }); $tx->send('RUN'); }); Mojo::IOLoop->start unless Mojo::IOLoop->is_running; #### #!/usr/bin/env perl use Mojolicious::Lite -signatures; use Mojo::UserAgent; use Mojo::EventEmitter; use Data::Dump qw/dd pp/; # morbo --listen='http://localhost:3001' mux.pl # This code assumes a single thread / no forking! my $event = Mojo::EventEmitter->new; sub open_server_conn { state $ua = Mojo::UserAgent->new; $ua->inactivity_timeout(5); # must be less than client's timeout app->log->debug("Opening WebSocket to server..."); $ua->websocket('ws://localhost:3000' => sub ($ua, $tx) { die 'WebSocket handshake failed!'.pp($tx) unless $tx->is_websocket; my $timer_id = Mojo::IOLoop->recurring( 30 => sub { app->log->debug("Sending heartbeat"); $tx->send('HEARTBEAT'); } ); $tx->on(finish => sub ($tx, $code, $reason=undef) { app->log->debug("Server WebSocket closed " .pp($code, $reason)); Mojo::IOLoop->remove($timer_id); open_server_conn(); # reopen }); $tx->on(message => sub ($tx, $msg) { # message from server return if $msg=~/^Connected\b|^Starting\b/; return unless $event->has_subscribers('message'); app->log->debug("Relaying ".pp($msg)); $event->emit( message => $msg ); }); $tx->send('RUN'); }); } Mojo::IOLoop->next_tick(\&open_server_conn); websocket '/' => sub ($c) { # client connections $c->app->log->debug('Client connected'); $c->inactivity_timeout(45); my $tx = $c->tx; # client transaction my $cb = $event->on(message => sub ($e,$msg) { $tx->send($msg) }); $c->on(message => sub ($c, $msg) { # message from client if ( uc $msg eq "RUN" ) { } # ignore client messages (for now) elsif ( uc $msg eq "STOP" ) { } elsif ( uc $msg eq "HEARTBEAT" ) { } else { $tx->send("Unknown command: $msg") } }); $c->on(finish => sub ($c, $code, $reason=undef) { $c->app->log->debug("Client disconn ".pp($code, $reason)); $event->unsubscribe(message => $cb); ($tx,$cb) = (); }); $c->send("Connected... RUN|STOP"); }; app->start