package Net::YAML_IPC; use strict; use warnings; use YAML; use IO::Socket; use Net::hostent; use Carp; sub new { my ($class, $remote) = @_; $remote->autoflush(1); binmode $remote; my $self = { handle => $remote, filenovec => q{}, rxd => q{}, close_conn => 0, }; vec($self->{filenovec}, fileno($remote), 1) = 1; bless $self, $class; return $self; } sub new_client { my ($class, $PeerAddr, $PeerPort) = @_; my $remote = IO::Socket::INET->new( Proto => 'tcp', PeerAddr => $PeerAddr, PeerPort => $PeerPort, #~ Timeout => 0.001, #~ Blocking => 0, ); unless ($remote) { die "cannot connect to slave"; } return Net::YAML_IPC->new($remote); } sub peername { my $self = shift; my $hostinfo = gethostbyaddr($self->{handle}->peeraddr); return $hostinfo->name || $self->{handle}->peerhost; } sub poll { # returns a hash ref or undef my $self = shift; if (!$self->{handle}->connected) { $self->{close_conn} = 1; return undef; } if ($self->{close_conn}) { return undef; } my $rxdoc = undef; my ($rbits, $ebits); select($rbits = $self->{filenovec}, undef, $ebits = $self->{filenovec}, 0); if ($rbits eq $self->{filenovec}) { my $newdata; recv($self->{handle}, $newdata, 256, 0); if ($newdata eq q{}) { $self->{close_conn} = 1; print "empty newdata: closing connection.\n"; binmode STDOUT; return undef; } $self->{rxd} .= $newdata; if ($newdata ne q{} && $self->{rxd} =~ /\A (.*? ^) [.]{3} .*? $ (.*) \z/xms) { ($rxdoc, $self->{rxd}) = ($1, $2); #~ print "rxdoc = '$rxdoc'\n"; eval { $rxdoc = Load($rxdoc); }; if ($@) { warn "bad YAML document received: $@\n"; print {$self->{handle}} Dump({ message_type => 'error', errormessage => "bad YAML document received: $@"}); $rxdoc = undef; $self->{close_conn} = 1; } elsif (ref $rxdoc ne 'HASH') { warn "bad YAML document received: not a hash ref\n"; print {$self->{handle}} Dump({ message_type => 'error', errormessage => "bad YAML document received: not a hash ref\n"}); $rxdoc = undef; $self->{close_conn} = 1; } elsif (!exists $rxdoc->{message_type}) { warn "bad YAML document received: no message_type key\n"; print {$self->{handle}} Dump({ message_type => 'error', errormessage => "bad YAML document received: no message_type key\n"}); $rxdoc = undef; $self->{close_conn} = 1; } } } if ($ebits eq $self->{filenovec}) { warn "connection has error - closing\n"; $self->{close_conn} = 1; } #~ if ($rxdoc) { #~ print "received $rxdoc->{message_type} message\n"; binmode STDOUT; #~ } return $rxdoc; } sub send { my ($self, $message) = @_; croak 'Net::YAML_IPC->send must be passed a hash ref' if ref $message ne 'HASH'; croak 'message passed to Net::YAML_IPC->send must have a message_type key' if !exists $message->{message_type}; #~ syswrite $self->{handle}, Dump($message); print {$self->{handle}} Dump($message) . "\n...\n"; #~ print "sent $message->{message_type} message\n"; binmode STDOUT; return; } sub want_close { return $_[0]->{close_conn}; } sub DESTROY { my $self = shift; $self->send({ message_type => '_close' }); binmode $self->{handle}; # flush output close $self->{handle}; } sub run_server { my ($ListenPort, $message_handler, $idle_handler) = @_; croak "message-handler argument must be a hash ref or a subroutine ref" if ref $message_handler ne 'HASH' && ref $message_handler ne 'CODE'; croak "idle-handler argument must be a hash ref or undefined" if defined $idle_handler && ref $idle_handler ne 'CODE'; if (ref $message_handler eq 'HASH') { while (my ($mt, $handler) = each %{$message_handler}) { croak "invalid message-handler for '$mt': all handlers must be a subroutine ref or undefined" if defined $handler && ref $handler ne 'CODE'; } } my $server = IO::Socket::INET->new( Proto => "tcp", LocalPort => $ListenPort, Listen => 1, # SOMAXCONN, ReuseAddr => 1, #~ Timeout => 0.001, #~ Blocking => 0, ); unless ($server) { croak "cannot set up listening socket" } while (1) { my $client; while (!defined $client) { $client = $server->accept(); } $client = Net::YAML_IPC->new($client); _handle_message($client, {message_type => '_open'}, $message_handler); my $close_conn = 0; while (!$client->want_close && !$close_conn) { my $rxmess = $client->poll; if (!defined $rxmess) { $idle_handler->() if defined $idle_handler; } else { _handle_message($client, $rxmess, $message_handler); if ($rxmess->{message_type} eq '_close') { $close_conn = 1; } } } $client = undef; } return; } sub _handle_message { my ($client, $rxmess, $message_handler) = @_; my $use_handler; if (ref $message_handler eq 'HASH') { my $message_type = $rxmess->{message_type}; if (defined $message_handler->{$message_type}) { $use_handler = $message_handler->{$message_type}; } elsif (defined $message_handler->{_other}) { $use_handler = $message_handler->{_other}; } } else { # if it's not a hash ref it should be a sub ref $use_handler = $message_handler; } if (defined $use_handler) { eval { $use_handler->($client, $rxmess); }; if ($@) { my $errmess = $@; warn $errmess; $client->send({message_type => '_error', message => $errmess, handling => $rxmess}); } } } 1;