in reply to Re: IPC Design problem
in thread IPC Design problem

So I think I made some headway. I have most of the main messaging daemon code down but I'm having a bit of a problem with nstore...

here is Messaging.pm :

package Security::Monitoring::Messaging; use Security::Monitoring::Messaging::Messenger; use 5.14.2; use strict; use warnings; use Proc::Daemon; use Carp qw(carp croak); use Storable qw(fd_retrieve); use POSIX qw(mkfifo); use File::Spec::Functions; use Cwd; use IO::Socket::UNIX; use IO::Socket::INET; my $class = "Security::Monitoring::Messaging::Messenger"; =head1 NAME Security::Monitoring::Messaging - The great new Security::Monitoring:: +Messaging! =head1 VERSION Version 0.01 =cut our $VERSION = '0.01'; =head1 SYNOPSIS you want to call this script using Proc::Daemon exec, that way it will + be able to do its work from the background. Whenever another script needs to a +sk for a messenger (most of theses orders should be made at start time dependin +g on the configuration) it should nstore a hash to the messenger unit named pip +e. my %params = ( input=>{name=>'filename', type=>'unix_socket|network_socket|named_pipe', #where it will listen; listen=>$queue_size_for_listen;#for sockets local_addr=>'what it says on the can',#for network socket local_port=>'idem',#for network socket proto=>'tcp|udp',#for network socket } output1=>{name=>'filename', type=>'unix_socket|network_socket|named_pipe, bind=>address,#if network socket Local=>'pathname to local buffer',#unix socket only Listen=>$queue_size_for_listen',#for sockets peer_addr=>'address", ... ... }, output2=>{...}, ... outputn=>{...}, ); nstore_fd \%params $messaging_named_pipe; a messaging daemon should be started. =head1 DESCRIPTION this module is a daemon that listens for messenger requests and spawn +little messenger daemon to handle IPC between units (such as report communica +tion between the reporting unit and the monitoring unit) it is done so to a +llow each unit to run on a different server. =head1 SUBROUTINES/METHODS run =head2 run this is the main function, even if this is a pm file it also is a self contained program. One can run the whole module on one server or only +part of it and have it interact with other servers. An Instance of the Messagi +ng daemon has to run on each and every server that supports one of the ma +in units (ie reporting unit, config unit, mailer unit, monitoring unit and db interaction unit) =cut =whatever =cut sub run { #setup part my $request_file_name = shift;#named pipe to which the Messenging +daemon will #listen for messenger spawn requests. my $log_file_name = shift; if (!defined($log_file_name)){ $log_file_name = '/dev/null'; } open my $log, '>>',$log_file_name or croak "could not open log_fil +e"; if (!defined($request_file_name)){ croak "can not run without something to listen to!\n"; } mkfifo($request_file_name,0777)||croak "could not open named pipe +: $!"; my %children = (); my $pid; open my $fh,'<',$request_file_name; local $/; my %messenger_params; print $log "starting to run\n"; #running part ABORT:while(<$fh>){ my $data = $_; my $params = fd_retrieve($fh); print "params input name = $params->{input}->{name}\n"; $messenger_params{input} = { name=>$params->{input}->{name}, }; given ($params->{input}->{type}){ when('unix_socket'||'network_socket'){ $messenger_params{input}->{type} = 'socket'; } default{ $messenger_params{input}->{type} = 'named_pipe'; } } #put the input data into the messenger params hash given ($params->{input}->{type}){ when('unix_socket'){ if(!exists($params->{input}->{peer})){ carp "can not summon unix socket demon without a p +eer buffer"; goto ABORT; } else{ $messenger_params{input}->{fh} = IO::Socket::UNIX: +:new( Type=>SOCK_STREAM, Local=>$params->{input}->{name}, Listen=>$params->{input}->{Listen}, ); } } when('network_socket'){ if(!exists($params->{input}->{local_addr})||!exists($p +arams->{input}->{local_port})){ carp "I need a peer addr and a peer port to create + a socket!"; goto ABORT; } $messenger_params{input}->{fh} = IO::Socket::INET->new +( Listen=>$params->{input}->{Listen}, LocalAddr=>$params->{input}->{loca__addr}, Proto=>$params->{input}->{proto}, LocalPort=>$params->{input}->{local_port}, ); } when('named_pipe'){ mkfifo($params->{input}->{name},0777)||carp "could not + open named pipe : $!"; open $messenger_params{input}->{fh}, '<',$params->{input}->{name} or carp "could not open n +amedpipe for input"; } } #putting the output subhashes into the messenger params hash my @keys = keys %$params; OUTPUT: foreach my $key (@keys){ if($key =~ /input/){ next OUTPUT; } else{ $messenger_params{output}->{$key}->{type} = $params->{$key}->{type}; $messenger_params{output}->{$key}->{name} = $params->{$key}->{name}; given ($params->{$key}->{type}){ when('unix_socket'){ $messenger_params{output}->{$key}->{fh} = IO::Socket::UNIX->new( Type=>SOCK_STREAM(), Local=>$params->{$key}->{name}, Listen=>$params->{$key}->{Listen}, ); } when('network_socket'){ $messenger_params{output}->{$key}->{fh} = IO::Socket::INET->new( PeerAddr=>$params->{$key}->{peer_addr} +, PeerPort=>$params->{$key}->{peer_port} +, Proto=>$params->{$key}->{proto}, ); } when('named_pipe'){ mkfifo($params->{output}->{$key}->{name},0777) +||carp "could not open named pipe : $!"; open $messenger_params{output}->{$key}->{fh}, '<',$params->{output}->{$key}->{name}; } } } } $pid = fork(); if (!$pid){#I'm the son my $messenger = $class->new(\%messenger_params); $messenger->relay; } else{ $children{$pid} = 1; } } } if(!defined(caller())){ my $filename = shift; my $logfile = shift; print "filename = $filename, logfile = $logfile\n"; &run($filename,$logfile); } 1; # End of Security::Monitoring::Messaging

here is Messenger (the little daemon class)



package Security::Monitoring::Messaging::Messenger; use 5.14.2; use strict; use warnings; use Carp qw(carp croak); use IO::Socket::INET; use IO::Socket::UNIX; =head1 NAME Security::Monitoring::Messaging::Messenger - The great new Security::M +onitoring::Messaging::Messenger! =head1 VERSION Version 0.01 =cut our $VERSION = '0.01'; =head1 DESCRIPTION this module provides the messenger class, instances of this class will + be used in small daemon to handle message passing between the main units of th +e program, thus allowing each unit to interact as if on the same machine + of each other. the Main Messaging daemon creates smaller daemons when asked to do so +(at start time such order will come from the config files, during run time + it will listen to a named pipe for new orders) =head1 SUBROUTINES/METHODS =head2 new summons a new instance of the Messenger class =head3 use my %outputs = ( $whatevername => $filehandle, $whatevername2=>$anotherfilehandle, ); my %params = ( input=>$filehandle, output = \%output; ); my $messenger = $class->new(\%params); =cut sub new { my ($class,$params) = @_; my $self = {}; bless $self, $class; if (!defined $params){ croak "no params, cant create new messenger\n"; } $self->_init($params); return $self; } =head2 _init init function for messenger instances =cut sub _init { my($self,$params) = @_; $self->{input}->{type} = $params->{input}->{type}; $self->{input}->{fh} = $params->{input}->{fh}; my @outputs = keys $params->{output}; foreach my $key (@outputs){ $self->{output}->{$key}->{type} = $params->{output}->{$key}->{ +type}; $self->{output}->{$key}->{fh} = $params->{output}->{$key}->{fh +}; } } =head2 _read this function is used by the relay sub to start doing the messaging jo +b =cut sub _read{ my $self = shift; if(!ref $self){ croak "can not do that as a class!"; } my $handle = $self->{input}->{fh}; given ($self->{input}->{type}){ when ('named_pipe'){ local $/; while(<$handle>){ my $output = $_; return $output; } } when('socket'){ my $socket = $handle->accept or croak ("could not accept connection"); local $/; while(<$socket>){ my $output=$_; return $output; } } } } =head2 _write this function is used by the relay sub for the messaging job =cut sub _write{ my $self = shift; if(!ref $self){ croak "can not do that as a class!"; } my $input = shift; my @keys = keys $self->{output}; foreach my $key (@keys){ my $handle = $self->{output}->{$key}->{fh}; if(!defined($handle)){ print "undefined fh for key $key\n"; } print $handle $input; } } =head2 relay this function is called just after forking : it starts the actual mess +aging, the daemon waits for input and transmit to all its outputs =head3 use $messenger->relay; =cut sub relay{ my $self = shift; if(!ref($self)){ croak "can not call relay from a class"; } $self->_write($self->_read); } 1; # End of Security::Monitoring::Messaging::Messenger

and finaly the code I'm using to test them. I first run messaging.pm with one parameter : the filename for the named pipe

then I run the following script :

use Security::Monitoring::Messaging::Messenger; use 5.14.2; use strict; use warnings; use Carp qw(carp croak); use Storable qw(nstore_fd); my $input = {name=>'inputfile',type=>'named_pipe'}; my $output = {name=>'outputfile',type=>'named_pipe'}; my %params = ( input=>$input, output=>{out1=>$output} ); open my $fh,'>','listenfile'or croak"could not open pipe : $!"; nstore_fd(\%params, $fh); close $fh; while(! -e 'inputfile' && ! -e 'outputfile'){ print "waiting\n"; } open my $write, '>','inputfile'; print $write "trololol"; `/bin/tailf outputfile`;

has soon as it is done writing to the pipe and has closed it I receive the following error message from my messaging daemon :

Magic number checking on storable file failed at /usr/local/lib/perl/5.14.2/Storable.pm line 401, <$fh> chunk 1, at ../lib/Security/Monitoring/Messaging.pm line 115

I find it most disturbing because it means that my previous tests on the Messaging.pm file must have been cheating someway, if they had not they would have created the same kind of error yet they run fine! ->those tests, I mean

#!perl -T use 5.14.2; use strict; use warnings; use Test::More; use Test::Exception; use Security::Monitoring::Messaging; use Storable qw(nstore_fd); use POSIX qw(mkfifo); use Carp qw(croak); use Security::Monitoring::Utils; BEGIN { plan tests => 5; use_ok( 'Security::Monitoring::Messaging' ) || print "Bail out!\n" +; } diag( "Testing Security::Monitoring::Messaging $Security::Monitoring:: +Messaging::VERSION, Perl $], $^X" ); my $pid; sub testing{ open my $fh, '>','testlog.log'; my %outputs = ( out1 => "sillyvar", ); my %params = ( input=>"anothersillyvar", output=>\%outputs, ); $pid = fork; if(!$pid){ Security::Monitoring::Messaging::run("listenfile"); #I'll list +en } else{ TEST:while(1){ if(! -e 'listenfile'){ goto TEST; } print $fh "father : gonna write to listenfile\n"; open my $sfh, '>', 'listenfile'; nstore_fd \%params, $sfh; print $fh "father :stored to listenfile\n"; print $fh "father :now I wait for my son $pid to die\n"; kill 'KILL', $pid; last; } wait; print $fh "father :my son is dead\n"; kill 'KILL',$pid; die; } } dies_ok(sub{testing;},"the testing sub has run its course pid id") +; dies_ok(sub{Security::Monitoring::Messaging::run('')},"messagi +ng demon wont run with empty listenfile, pid = $pid"); dies_ok(sub{Security::Monitoring::Messaging::run(undef)},"mess +aging daemon wont run with undef filename"); unlink 'listenfile'; open my $fh, '>', 'listenfile'; print $fh "trololol"; close $fh; dies_ok(sub{Security::Monitoring::Messaging::run('listenfile') +},"messaging daemon wont open already existing named piped "); unlink 'listenfile'; unlink 'testlog.log';


If you have an inkling of what's wrong, even an intuition I would be most grateful because I'm quite lost...

Also in the test file I used a shotgun to scatter some kill commands everywhere in the subroutine because if I did not the test output will be out of sequence and appear three times.
I tried to find out how I could prevent my children from doing that whatever the scheduler decided but I could not, so if you have any better/more elegant way to do it I'm all ears!

Thank you for reading :)

Replies are listed 'Best First'.
Re: IPC named pipe issue with nstore_fd
by sbakker (Initiate) on Aug 26, 2015 at 12:15 UTC

    I think your read loop in the Messaging class is a little too greedy:

    ABORT:while(<$fh>){ my $data = $_; my $params = fd_retrieve($fh); ...

    This first reads a line, then does an fd_retrieve(). Your test script first does an nstore_fd(), then writes some text, i.e. exactly the other way around.

    This is not the only problem with your code. Apart from style/idiom issues, the fact that you are mixing plain text with binary text in the same channel is asking for trouble. If you reverse the writing order in your test script, the fact that the text string in our test script does not contain a newline will most likely trip up the <$fh> statement and slurp up the nstore-d data as well. (And if you have a string with more than one newline you also have a problem.) You should really consider using some appropriate encapsulation, like a hash with both the data and the parameters, written out with nstore:

    # Writing a message: my $message = { data => "Some message\nthat can contain newlines\n", params => \%params, }; nstore_fd($message, $fh); # Reading a message: my $message = fd_retrieve($fh); my $params = $message->{'params'}; my $data = $message->{'data'};

    ... Or something like that.

    Hope this helps!

    -- Steven

      Thanks for the tips, would you please expand on the "style/idiom issues"?

      I did not put much effort in applying best practices in the test script but I tried to do better in the code, what should I try to improve?

      also, I finaly nailed it down I think. I am still looking for a way to send serialized hash without bad things happening due to newline though...