in reply to Re^2: Nonblocking read server
in thread Nonblocking read server
Just to show you why I recommended an Async package, here's your problem done using my own Async::Tiny. It should be similarly short in any of the other Async packages.
#!/usr/bin/perl use strict; use warnings; use Async::Tiny; use Path::Tiny; use constant PORT1 => 5000; my $connID; my $t = Async::Tiny->new; $t->addListenCallback( PORT1, sub { my $sock = shift; $t->addReadCallback($sock, \&process_message, $sock->peerhost, ++$co +nnID); $t->changeReadMode($sock, 'full'); }); $t->eventloop; sub process_message { my ($data, $peerhost, $id) = @_; my $filename = "rcv_${id}_$peerhost.txt"; path($filename)->spew_raw($data); print "[save process_message] wrote $filename\n"; }
See - nice, simple, clean, short :)
Where Async::Tiny is
package Async::Tiny; ################################################# +# use Time::HiRes qw(time); use IO::Select; use IO::Socket; use warnings; use strict; my (@timerqueue, %reads, %writes, %listens, $rsel, $wsel, $wait, $udp) +; INIT{ $udp = 17 } # or not :) sub isudp { "$_[0]" =~ /IO::Socket/ && $_[0]->protocol == $udp } sub new { $rsel = IO::Select->new; $wsel = IO::Select->new; my $self = shift; bless { }, ref $self || $self; } sub status { my $t = @timerqueue; my $r = keys %reads; my $l = keys %listens; my $w = keys %writes; my $h = $rsel->handles + $wsel->handles; return "timers: $t reads: $r listens: $l writes: $w handles: $h\n"; } sub addWaitCallback { (undef, $wait) = @_ } sub addReadCallback { my ($self, $handle, $callback, @args) = @_; if( defined $callback ) { $rsel->add($handle); $reads{$handle} = { callback => $callback, args => \@args, mode => + '', ( isudp($handle) ? (packets => []) : (data => '')) }; } else { delete $reads{$handle}; $rsel->remove($handle); } } sub changeReadMode { my ($self, $handle, $mode) = @_; $reads{$handle} and $reads{$handle}{mode} = $mode; } sub addListenCallback { my ($self, $socket, $callback, @args) = @_; if( defined $callback ) { ref $socket or $socket = { ($socket =~ /:/ ? 'LocalAddr' : 'LocalPort') => $socket }; # s +calar is port if( ref $socket eq 'HASH' ) { my %args = ( Listen => 10, Reuse => 1, %$socket ); # add default +s $socket = IO::Socket::INET->new(%args) or die $@; } $rsel->add($socket); $listens{$socket} = { callback => $callback, args => \@args}; } else { $rsel->remove($socket); delete $listens{$socket}; } } sub queue # binary insertion into timer queue { for my $event ( @_ ) { my $time = $event->[0]; my $low = 0; my $high = @timerqueue; my $mid; $timerqueue[$mid = $low + $high >> 1][0] <= $time ? ($low = $mid + 1) : ($high = $mid) while $low < $high; splice @timerqueue, $low, 0, $event; } } sub repeater { my ($delay, $callback, @args) = @_; ($callback->(@args) // '') =~ /end *repeat/i or queue [ $delay + time, \&repeater, $delay, $callback, @args ]; } sub addRepeatCallback { my ($self, $delay, $callback, @args) = @_; queue [ $delay + time, \&repeater, $delay, $callback, @args ]; } sub addNowAndRepeatCallback { my ($self, $delay, $callback, @args) = @_; queue [ time, \&repeater, $delay, $callback, @args ]; } sub addDelayCallback { my ($self, $delay, $callback, @args) = @_; queue [ $delay + time, $callback, @args ]; } sub write { my ($self, $handle, @data) = @_; for my $fh (ref $handle eq 'ARRAY' ? @$handle : $handle) { if( isudp($fh) ) { $writes{$fh} ||= {packets => []}; push @{ $writes{$fh}{packets} }, @data; @{ $writes{$fh}{packets} } and $wsel->add($fh); } else { $writes{$fh} ||= {data => ''}; $writes{$fh}{data} .= join '', @data; $writes{$fh}{shutdown} = @data == 0 && "$fh" =~ /IO::Socket/; length $writes{$fh}{data} and $wsel->add($fh); } } } sub addErrorCallback { my ($self, $handle, $callback, @args) = @_; if( exists $writes{$handle} ) { $writes{$handle}{error} = $callback; $writes{$handle}{errorargs} = \@args; } } sub mtimecheck { my ($interval, $file, $oldtime, $callback, @args) = @_; my $mtime = (stat $file)[9] || 0; my $return = $mtime && $oldtime < $mtime && $callback->(@args); defined $return && $return =~ /end *repeat/i or queue [ $interval + time, \&mtimecheck, $interval, $file, $mtime, $callback, @args ]; } sub addMtimeCallback { my ($self, $interval, $file, $callback, @args) = @_; my $mtime = (stat $file)[9]; queue [ $interval + time, \&mtimecheck, $interval, $file, $mtime, $callback, @args ]; } sub eventloop ######################################################## +# { while( @timerqueue || %reads || %writes || %listens ) { my $waitfor = @timerqueue ? $timerqueue[0][0] - time : 1e6; $waitfor < 0 and $waitfor = 0; defined $wait and $waitfor > 0 and $wait->(); ############################################################ my ($reads, $writes) = IO::Select->select($rsel, $wsel, undef, $wa +itfor); ############################################################ for my $fh ( @{ $writes // [] } ) # ready writes { if( !defined $fh or not exists $writes{$fh} ) { warn "leftover write handle"; $wsel->remove($fh); } elsif( isudp($fh) ) { if( @{ $writes{$fh}{packets} } ) { $fh->send( shift @{ $writes{$fh}{packets} } ) or die "send error $!"; } else { $wsel->remove($fh); delete $writes{$fh}; # no packet to write } } else # tcp or other { my $have = length $writes{$fh}{data}; if(defined $have and $have > 0) { my $len = syswrite $fh, $writes{$fh}{data}; if( not defined $len ) { if( $writes{$fh}{error} ) { $writes{$fh}{error}->("$!", @{ $writes{$fh}{errorargs} } +); } else { warn "write error $!"; } } elsif( $len == $have ) { $writes{$fh}{shutdown} and $fh->shutdown(1); $wsel->remove($fh); delete $writes{$fh}; # all data has been written } elsif( $len > 0 ) { substr $writes{$fh}{data}, 0, $len, ''; } else { die "zero length write"; } } else { $writes{$fh}{shutdown} and $fh->shutdown(1); $wsel->remove($fh); delete $writes{$fh}; # no data to write #die "had no data to write"; } } } for my $fh ( @{ $reads // [] } ) # ready reads { if( not (exists $reads{$fh} || exists $listens{$fh}) ) { warn "leftover read handle"; $rsel->remove($fh); } elsif( isudp($fh) ) { $fh->recv(my $packet, 1500); $reads{$fh}{callback}->( $packet, @{ $reads{$fh}{args} } ); } elsif( exists $listens{$fh} ) # new tcp connection { $listens{$fh}{callback}-> (scalar($fh->accept), @{ $listens{$fh}{args} } ); } elsif(sysread($fh, $reads{$fh}{data}, 8192, length $reads{$fh}{data})) { if( $reads{$fh}{mode} =~ /^char/i ) { $reads{$fh}{callback}->( $reads{$fh}{data}, @{ $reads{$fh}{args} } ); $reads{$fh}{data} = ''; } elsif( $reads{$fh}{mode} =~ /^full/i ) { # no callbacks until eof } else { $reads{$fh}{callback}->( $1, @{ $reads{$fh}{args} } ) while $reads{$fh}{data} =~ s/(.*\n)//; } } else # end of file { $reads{$fh}{callback}->( $reads{$fh}{mode} =~ /^full/i ? $reads{$fh}{data} : '', @{ $reads{$fh}{args} } ); $rsel->remove($fh); delete $reads{$fh}; delete $writes{$fh}; # dump output if EOF } } $waitfor = @timerqueue ? $timerqueue[0][0] - time : 1e6; if($waitfor <= 0) # a timer has expired { my (undef, $callback, @args) = @{ shift @timerqueue }; $callback->(@args); } } } 1; # return true for this module __END__ =head1 NAME Async::Tiny - Tiny? async eventloop module mostly for example purposes =head1 SYNOPSIS use Async::Tiny; my $tiny = Async::Tiny->new; $tiny->addReadCallback( *STDIN, sub { print "input: @_" } ); $tiny->addDelayCallback( 10, sub { print "stop\n" } ); $tiny->addDelayCallback( 0, sub { print "start\n" } ); $tiny->eventloop; =head1 DESCRIPTION Async::Tiny implements a simple "select" based async/callback style kernel (sort of like POE or AnyEvent, maybe) that handles the messy work and lets the user just define callbacks to handle his special nee +ds. =head1 METHODS =over 4 =item new Constructor. =item addDelayCallback ( DELAY, CALLBACK, ARGS ) Add a timer to expire after DELAY seconds. CALLBACK(ARGS) =item addRepeatCallback ( INTERVAL, CALLBACK, ARGS ) Add a timer that does callback every INTERVAL seconds. CALLBACK(ARGS) =item addReadCallback ( HANDLE, CALLBACK, ARGS ) Add a callback for each line of text (but see below) input on HANDLE +. CALLBACK(LINE, ARGS) =item changeReadMode ( HANDLE, MODE ) There are three read modes: 'character' - return all new characters 'linebyline' - return each "\n" terminated line (default) 'full' - only return once at EOF, with complete buffer =item addListenCallback ( HANDLEorPORT, CALLBACK, ARGS ) Add a callback for new connections on a listen socket. CALLBACK(NEWCONNECTION, ARGS) =item write ( HANDLE, DATA ) Queues DATA for writing to HANDLE. =item write ( [HANDLES], DATA ) Queues DATA for writing to all HANDLES in []. (Think IRCD) =item write ( HANDLE ) Perform HANDLE->shutdown(1) after all queued data is written. =item addWaitCallback ( CALLBACK ) Specify a callback to be called just before the select call. This ca +n be used to update the display in a GUI. CALLBACK() =item eventloop Start the event loop processing. The event loop ends when no callbac +ks are left. =back =head1 EXAMPLE tinyirc.pl #!/usr/bin/perl # tinyirc.pl - simple tiny irc client use Async::Tiny; use IO::Socket; use strict; my $t = Async::Tiny->new; sub sendline # works for both directions :) { my ($line, $to) = @_; $line eq '' ? exit : $t->write( $to, $line ); } my $s = IO::Socket::INET->new(shift // 'localhost:6667') or die $@; $t->addReadCallback( *STDIN, \&sendline, $s ); $t->addReadCallback( $s, \&sendline, *STDOUT ); $t->eventloop; =head1 EXAMPLE tinyircd.pl #!/usr/bin/perl # tinyircd.pl - minimal tiny ircd with Async::Tiny # multiple telnets or tinyirc.pl to it, echos to the others (*very* +basic ircd) use Async::Tiny; use strict; my %clients; my $t = Async::Tiny->new; for my $port (@ARGV ? @ARGV : 6667) { $t->addListenCallback($port, sub { my ($socket) = @_; $clients{$socket} = $socket; $t->addReadCallback($socket, \&clientline, $socket ); }); $t->addDelayCallback(0, sub{print "listening on $port...\n"} ); } $t->addRepeatCallback( 600, sub { print localtime() . " clients: @{[ scalar keys %clients, times ]}\ +n"}); $t->eventloop; sub clientline { my ($line, $me) = @_; if( $line eq '' ) { delete $clients{$me}; # was closed } else { $t->write( [grep $me != $_, values %clients], $line); } } =head1 EXAMPLE math.pl #!/usr/bin/perl # # math.pl - a math server using Async::Tiny use Async::Tiny; use strict; my $t = Async::Tiny->new; $t->addListenCallback( shift // 8081, sub { my ($socket) = @_; $t->addReadCallback( $socket, sub { my ($line) = @_; tr#0-9()*/+-# #c for $line; # strip non-math for safety :) $t->write( $socket, eval $line // "$line is an invalid expressio +n $@", "\n"); }); }); $t->eventloop; =head1 EXAMPLE key.pl #!/usr/bin/perl # # key.pl - read single key-at-a-time with Async::Tiny use Term::ReadKey; use Async::Tiny; use Data::Dump; use strict; $| = 1; my $t = Async::Tiny->new; $t->addReadCallback( *STDIN, sub { my ($string) = @_; dd $string; #printf "%vd\n", $string; $string =~ /^[q\e]$/i and die "quit character entered\n"; }); $t->changeReadMode( *STDIN, 'character' ); $t->addListenCallback( 8081, sub { $t->addReadCallback( $_[0], sub { print @_ }); }); my $count = 0; $t->addRepeatCallback( 60, sub { print ++$count, " tick\n" }); ReadMode 'raw'; eval { $t->eventloop }; my $answer = $@; ReadMode 'restore'; print $answer; =head1 EXAMPLE curses.pl #!/usr/bin/perl # # curses.pl - client using Async::Tiny use Curses; use Term::ReadKey; use Async::Tiny; use IO::Socket; use Data::Dump qw(pp dd); use strict; $| = 1; my @lines; my $input = ''; my ($width, $height) = GetTerminalSize; my $t = Async::Tiny->new; $t->addWaitCallback( sub { ($width, $height) = GetTerminalSize; my $row = 0; for ( @lines = ( ('') x $height, @lines )[2 - $height .. -1], # top pa +rt '#' x ($width - 25) . ' ' . localtime(), # divide +r line $input) # bottom + line { addstr($row++, 0, substr $_, 0, $width); clrtoeol; } refresh; }); my $s = IO::Socket::INET->new(shift // 'arch:8081') or die $@; $t->addReadCallback( $s, sub { my ($line) = @_; $line eq '' and die "server closed connection\n"; push @lines, "< $line" =~ s/\n//r; }); $t->addReadCallback( *STDIN, sub { my ($string) = @_; #dd $string; for ($string =~ /\e(?:\[M...|[O\[][0-9;]*[A-~])|./gs ) # keep esc +seq together { /^[ -~]\z/ ? $input .= $_ : /^[\cc\cd\e]\z/ ? die "quit character @{[ pp $_ ]} entered\n" : /^[\010\177]\z/ ? chop $input : /^[\r\n]\z/ ? do { if( length $input ) { push @lines, " > $input"; $t->write( $s, $input, "\n" ); $input = ''; } } : do { $input .= pp $_ }; $input &= "\xff" x ($width - 3); # trim to size } }); $t->changeReadMode( *STDIN, 'character' ); $t->addRepeatCallback( 1, sub { }); # for incrementing clock $t->addRepeatCallback( 3600, sub { push @lines, ' ' . localtime() +} ); initscr(); clear; ReadMode 'cbreak'; mousemask(BUTTON1_CLICKED, my $oldmask); eval { $t->eventloop }; my $errormsg = $@; ReadMode 'restore'; endwin(); print $errormsg; =head1 EXAMPLE animate.pl #!/usr/bin/perl # # animate.pl - spin things use Async::Tiny; use strict; $| = 1; my $t = Async::Tiny->new; my $test = shift // 5; my $count = 0; if($test == 1 || $test > 4) { print "test 1 - auto repeat\n"; $t->addRepeatCallback( 0.05, sub { print "\r", qw(- \\ | /)[$count % 4]; ++$count < 60 or 'endrepeat'; }); } if($test == 2 || $test > 4) { print "test 2 - chained delays\n"; sub tick { print "\r", qw(- \\ | /)[$count % 4]; ++$count < 60 and $t->addDelayCallback( .05, \&tick ); } $t->addDelayCallback( 0, \&tick ); } if($test == 3 || $test > 4) { print "test 3 - queue all up beforehand\n"; for (0..59) { my $char = qw(- \\ | /)[$_ % 4]; $t->addDelayCallback( $_ / 20, sub { print "\r$char" }); } } if($test == 4 || $test > 4) { print "test 4 - dual auto repeat\n"; my $secondcount = 0; $t->addRepeatCallback( 0.12, sub { print "\r", map qw(- \\ | /)[$_ % 4], $count, $secondcount; ++$count < 3/.12 or 'endrepeat'; }); $t->addRepeatCallback( 0.05, sub { print "\b", qw(- \\ | /)[$secondcount % 4]; ++$secondcount < 3/.05 or 'endrepeat'; }); } $t->eventloop; print "\n"; =head1 EXAMPLE bars.pl #!/usr/bin/perl # # bars.pl - multiple independent timers using Async::Tiny use Curses; use Term::ReadKey; use Async::Tiny; use strict; $| = 1; my ($width, $height) = GetTerminalSize; my @lines = ( '-' x $width ) x $height; my $t = Async::Tiny->new; $t->addWaitCallback( sub { my $row = 0; for ( @lines ) { addstr($row++, 0, substr $_, 0, $width); clrtoeol; } refresh; }); for my $line (@lines) { $t->addRepeatCallback( (3 + rand 20) / 50 , sub { s/-/#/ or s/#(?!#)/=/ or tr/=/-/ for $line; (times)[0] + rand() > 2 and 'end repeat'; } ); } initscr(); clear; $t->eventloop; endwin(); =head1 EXAMPLE echoall.pl #!/usr/bin/perl # Async::Tiny version of # https://blog.afoolishmanifesto.com/posts/concurrency-and-async-in- +perl/ use experimental 'signatures'; use Async::Tiny; use strict; my %clients; my $t = Async::Tiny->new; $t->addListenCallback( 9935, sub ($socket) { $t->addReadCallback( $clients{$socket} = $socket, sub ($line, $me) { $t->write( $me, $line); $line eq '' and delete $clients{$me}; # was closed }, $socket ); }); $t->addRepeatCallback(5, sub {$t->write([ values %clients ], "ping!\ +n" )}); $t->addDelayCallback(0, sub { warn "ready on port 9935\n" }); #$t->addRepeatCallback(60, sub { warn $t->status }); $t->eventloop; =cut
|
|---|
| Replies are listed 'Best First'. | |
|---|---|
|
Re^4: Nonblocking read server
by choroba (Cardinal) on Aug 30, 2019 at 15:03 UTC | |
by tybalt89 (Monsignor) on Aug 30, 2019 at 15:22 UTC | |
by perl-diddler (Chaplain) on Aug 31, 2019 at 00:28 UTC |