#!/usr/bin/env perl use 5.028; use Mojolicious::Lite -signatures; use Mojo::IOLoop::Stream::Role::LineBuffer; use Mojo::SQLite; use Mojo::JSON qw/encode_json/; use Mojo::Util qw/dumper/; use Fcntl qw/:DEFAULT/; use IO::Termios (); # NOTICE: This script is designed to work in a single-threaded, # single-process server only! (morbo or Mojolicious::Command::daemon) # disable template cache in development mode (e.g. under morbo): app->renderer->cache->max_keys(0) if app->mode eq 'development'; helper sql => sub { state $sql = Mojo::SQLite->new(':memory:') }; app->sql->migrations->from_string(<<'END_MIGRATIONS')->migrate; -- 1 up CREATE TABLE DataLog ( Timestamp DATETIME, Source TEXT, ValueA TEXT, ValueB TEXT ); -- 1 down DROP TABLE IF EXISTS DataLog; END_MIGRATIONS my %serports = ( # this is essentially the configuration for the ports '/tmp/fakepty1' => sub { sysopen my $fh, '/tmp/fakepty1', O_RDONLY or die "sysopen /tmp/fakepty1: $!"; my $hnd = IO::Termios->new($fh) or die "IO::Termios->new: $!"; $hnd->set_mode('19200,8,n,1'); #use IO::Stty; # if needed #IO::Stty::stty($hnd, qw/ cs8 -parenb raw -echo /); return $hnd; }, '/tmp/fakepty2' => sub { sysopen my $fh, '/tmp/fakepty2', O_RDONLY or die "sysopen /tmp/fakepty2: $!"; my $hnd = IO::Termios->new($fh) or die "IO::Termios->new: $!"; $hnd->set_mode('19200,8,n,1'); return $hnd; }, ); my $retry_interval_s = 10; my $parse_re = qr{ ^ (? \w+ ) , (? \d+ ) , (? \d+ ) $ }msx; my $serial = Mojo::EventEmitter->new; # serial event dispatcher sub setup_streams { state %streams; for my $k (sort keys %serports) { next if $streams{$k}; my $handle = eval { $serports{$k}->() }; if (!$handle) { $serial->emit(error => "Stream $k Open: $@"); Mojo::IOLoop->timer($retry_interval_s => \&setup_streams); next; } $streams{$k} = Mojo::IOLoop::Stream->new($handle) ->with_roles('+LineBuffer')->watch_lines; $streams{$k}->on(read_line => sub ($strm, $line, $sep) { if ( $line =~ $parse_re ) { my %rec = ( %+, Timestamp => time ); app->sql->db->insert('DataLog', \%rec); $serial->emit(data => \%rec); } else { $serial->emit(error => "Stream $k: failed to parse ".dumper($line)) } }); $streams{$k}->on(close => sub ($strm) { $serial->emit(error => "Stream $k Closed"); $streams{$k}->stop; delete $streams{$k}; Mojo::IOLoop->timer($retry_interval_s => \&setup_streams); }); $streams{$k}->on(error => sub ($strm, $err) { $serial->emit(error => "Stream $k Error: $err"); }); $streams{$k}->on(timeout => sub ($strm) { $serial->emit(error => "Stream $k Timeout"); # could possibly close & re-open stream here }); $streams{$k}->start; } } Mojo::IOLoop->next_tick(\&setup_streams); get '/' => sub ($c) { $c->render(template => 'index') } => 'index'; get '/q' => sub ($c) { $c->render(template => 'query') } => 'query'; get '/events' => sub ($c) { $c->inactivity_timeout(300); $c->res->headers->content_type('text/event-stream'); $c->write; my $cb_data = $serial->on(data => sub ($ser,$data) { my $json = encode_json($data) =~ s/\n//gr; $c->write("event: ser_data\ndata: $json\n\n"); } ); my $cb_err = $serial->on(error => sub ($ser,$err) { $err =~ s/\n//g; $c->write("event: ser_err\ndata: $err\n\n"); } ); $c->on(finish => sub ($c) { $serial->unsubscribe(data => $cb_data); $serial->unsubscribe(error => $cb_err); }); } => 'events'; app->start; __DATA__ @@ index.html.ep % layout 'main', title => 'Hello, World!';
Source Timestamp ValueA ValueB
Foo - - -
Bar - - -
@@ query.html.ep % layout 'main', title => 'Query';
% my @fields = qw/ Timestamp Source ValueA ValueB /; % for my $f (@fields) { % } %# NOTE: This database code doesn't normally belong in the view % my $results = sql->db->select('DataLog', % \@fields, undef, ['Timestamp','Source']); % while ( my $row = $results->hash ) { % for my $f (@fields) { % } % }
<%= $f %>
<%= $row->{$f} %>
@@ layouts/main.html.ep <%= title %> %= content