in reply to How to share a non-blocking IO::Socket::INET server/pass "listening" control?
For the sake of completeness, I want to share the resolution I ended up using POE (worlds of gratitude to Rocco for his input and willingness to answer my questions).
Here is the boilerplate code for the solution I created and I hope others find this useful as well! Side note: For my purposes, I needed to use POE::Filter::Stream but the default is POE::Filter::Line
Though this is my current fix to the problem, I'm not very satisfied with the workaround using log files which can be cumbersome. If anyone has any suggestions/tips how I could improve it and/or make it more efficient (pipes, etc.) I would love to hear ideas!
#!/usr/bin/perl -w ## This example establishes a full duplex ## TCP Client/Server connection in one file ## ## In order to facilitate two-way communication, ## log files located in /var/log/ are written to by ## a client program and POE uses POE::Wheel::FollowTail ## to signal that there is an information update ## (and suggestions on how to improve/optimize that ## would be very welcome) use strict; use warnings; use POE qw(Component::Server::TCP Component::Client::TCP Wheel::FollowTail Filter::Stream); use constant INACTIVITY_TIMEOUT => 10; # seconds use constant RECONNECT_TIMEOUT => 15; # seconds # this layout was inspired by this entry # from the POE Cookbook: # http://poe.perl.org/?POE_Cookbook/Watching_Logs my %logs_to_watch = ( log_one => "/var/log/log_one.log"; log_two => "/var/log/log_two.log"; ); POE::Session->create( inline_states => { _start => \&begin_watchers, log_one_record => \&log_one_got_record, log_two_record => \&log_two_got_record, log_reset => \&generic_log_reset, log_error => \&generic_log_error, }, ); my $server = POE::Component::Server::TCP->new( Alias =>'TCPServer', Port => 8000, ClientConnected => sub { print 'Client connected! (' . localtime . ")\n"; }, ClientDisconnected => sub { print 'Client disconnected! (' . localtime . "\n"; }, ClientInput => sub { my $input = $_[ARG0]; print "Server got: $input\n"; my $reply = '000'; # example successful ACK response $_[HEAP{client}->put($reply); }, # any of the POE::Filter options can be used here # depending on remote input ClientFilter => "POE::Filter::Stream", Stopped => sub { my ($kernel) = $_[KERNEL]; # terminates TCPClient's connection with # remote server if the local server shuts down $kernel->post('TCPClient', 'shutdown'); }, ); my $client = POE::Component::Client::TCP->new( Alias => 'TCPClient', RemoteAddress => 'example.com', RemotePort => 8001, Connected => sub { my ($kernel) = $_[KERNEL]; # triggers a keepalive timer for a custom keepalive method # to be fired after the set interval while connected $kernel->delay(keepalive => INACTIVITY_TIMEOUT); }, Disconnected => sub { my ($kernel) = $_[KERNEL]; # sets keepalive to undef to stop the delay timer $kernel->delay(keepalive => undef); # sets timer to attempt reconnection to address $kernel->delay(reconnect => RECONNECT_TIMEOUT); }, Filter => "POE::Filter::Stream", InlineStates => { keepalive => \&keepalive, send_request => \&send_request, }, ServerInput => sub { my ($kernel, $input) = $_[KERNEL, ARG0]; # $input would be the remote server's ACK/NAK response..handle + accordingly $kernel->delay(keepalive => INACTIVITY_TIMEOUT); }, ServerError => sub { my ($operation, $errno, $errstr, $id) = @_[ARG0..ARG3]; if ($operation eq "read" and $errno == 0 { print "EOF encountered!\n"; } else { print "$id encountered $operation error $errno: $errstr\n" +; } }, ); sub begin_watchers { my ($kernel, $heap) = @_[KERNEL, HEAP]; while (my ($service, $log_file) each %logs_to_watch) { my $log_watcher = POE::Wheel::FollowTail->new( Filename => $log_file, # based on what the purpose is Filter => POE::Filter::Stream->new(); InputEvent => $service . "_record", ResetEvent => "log_reset", ErrorEvent => "log_error", ); $heap->{services}->{$log_watcher->ID} = $service; $heap->{watchers}->{$log_watcher->ID} = $log_watcher; } } sub generic_log_reset { my ($heap, $wheel_id) = @_[HEAP, ARG0]; my $service = $heap->{services}->{$wheel_id}; print "--- $service log reset at ", scalar(gmtime), " GMT\n"; } sub generic_log_error { my ($heap, $operation, $errno, $error_string, $wheel_id) = @_[HEAP +, ARG0, ARG1, ARG2, ARG3]; my $service = $heap->{services}->{$wheel_id}; print "--- $service log $operation error $errno: $error_string\n"; print "--- Shutting down $service log watcher.\n"; delete $heap->{services}->{$wheel_id}; delete $heap->{watchers}->{$wheel_id}; } sub log_one_got_record { my $log_record = $_[ARG0]; "log_one_got_record received: $log_record\n"; if ($log_record =~ /log_one_string/) { # notifies $client of the information added my $postrequest = POE::Kernel->post('TCPClient', "send_request +", $log_record); } } sub log_two_got_record { my $log_record = $_[ARG0]; "log_two_got_record received: $log_record\n"; if ($log_record =~ /log_two_string/) { # notifies $client of the information added my $postrequest = POE::Kernel->post('TCPClient', "send_request +", $log_record); } } sub keepalive { #format keepalive(i.e. ping) message here my $keepalive = ''; $_[HEAP{server}->put($keepalive); $KERNEL->delay(keepalive => INACTIVITY_TIMEOUT); } sub send_request { my $request = $_[ARG0]; print "Sent from KohaClient to EMS: $request\n"; $_[HEAP]{server}->put($request); } POE::Kernel->run();
|
|---|