Greetings

Update : This can be used to make interprocess communication transparent : with the appropriate setup it should allow communication between process that might be on different machines as if they were on the same

next thing up, change the way the summoner works so he can return file handles to his callers, also make it so he can have any number of callers and treat their requests one after another

To do that I was thinking about having a "call me back" file handle inside the parameter hash passed to him so he can send a reply with the needed file handle

After much nagging, harassing this community with questions regarding IPC I finally have some code to show you.

Even better, This time it's code that works!
Please keep in mind that I have not yet gotten rid of my eye-gouging coding style (I made some effort on that one to provide with a coherent synopsis, description and a better documentation

so, here is the archidaemon, sitting upon his throne of undelivered messages, I give you Messaging!

package Security::Monitoring::Messaging; use Security::Monitoring::Messaging::Messenger; use 5.14.2; use strict; use warnings; use Proc::Daemon; use Carp qw(carp croak); use Storable qw(fd_retrieve); use POSIX qw(mkfifo); use File::Spec::Functions; use Cwd; use IO::Socket::UNIX; use IO::Socket::INET; my $class = "Security::Monitoring::Messaging::Messenger"; =head1 NAME Security::Monitoring::Messaging - The great new Security::Monitoring:: +Messaging! =head1 VERSION Version 0.01 =cut our $VERSION = '0.01'; =head1 SYNOPSIS you want to call this script using Proc::Daemon exec, that way it will + be able to do its work from the background. Whenever another script needs to a +sk for a messenger (most of theses orders should be made at start time dependin +g on the configuration) it should nstore a hash to the messenger unit named pip +e. my %params = ( input=>{name=>'filename', type=>'unix_socket|network_socket|named_pipe', #where it will listen; listen=>$queue_size_for_listen;#for sockets Local=>'local_buffer_file_name',#unix sockets local_addr=>'what it says on the can',#for network socket local_port=>'idem',#for network socket } output1=>{name=>'filename', type=>'unix_socket|network_socket|named_pipe, Local=>'pathname to local buffer',#unix socket only Listen=>$queue_size_for_listen',#for sockets ... ... }, output2=>{...}, ... outputn=>{...}, ); run the daemon, with a name for the inputfile (it will be a named +pipe) then use that kind of code : open my $fh,'>','listenfile'or croak"could not open pipe : $!"; nstore_fd(\%params, $fh); close $fh; the daemon will spawn the needed messenger subdaemon. a messaging daemon should be started. =head1 DESCRIPTION this module is a daemon that listens for messenger requests and spawn +little messenger daemon to handle IPC between units (such as report communica +tion between the reporting unit and the monitoring unit) it is done so to a +llow each unit to run on a different server. =head1 SUBROUTINES/METHODS run =head2 run this is the main function, even if this is a pm file it also is a self contained program. One can run the whole module on one server or only +part of it and have it interact with other servers. An Instance of the Messagi +ng daemon has to run on each and every server that supports one of the ma +in units (ie reporting unit, config unit, mailer unit, monitoring unit and db interaction unit) =cut =head3 sample param hash my %params = ( input=>{name=>'filename', type=>'unix_socket|network_socket|named_pipe', #where it will listen; listen=>$queue_size_for_listen;#for sockets Local=>'local_buffer_file_name',#unix sockets local_addr=>'what it says on the can',#for network socket local_port=>'idem',#for network socket } output1=>{name=>'filename', type=>'unix_socket|network_socket|named_pipe, Local=>'pathname to local buffer',#unix socket only Listen=>$queue_size_for_listen',#for sockets ... ... }, output2=>{...}, ... outputn=>{...}, ); =cut sub run($$) { #setup part my $request_file_name = shift;#named pipe to which the Messenging +daemon will #listen for messenger spawn requests. my $log_file_name = shift; if (!defined($log_file_name)){ $log_file_name = '/dev/null'; } open my $log, '>>',$log_file_name or croak "could not open log_fil +e"; if (!defined($request_file_name)){ croak "can not run without something to listen to!\n"; } mkfifo($request_file_name,0777)||croak "could not open named pipe +: $!"; my %children = (); my $pid; my %messenger_params; print $log "I'm $$ and starting to run\n"; #running part ABORT:while(1){ open my $fh,'<',$request_file_name; my $params; eval{$params = fd_retrieve($fh)}; if($@){ print $log "continuing after error $@"; close $fh; redo; } close $fh; print $log "params input name = $params->{input}->{name}\n"; $messenger_params{input}->{name} = $params->{input}->{name}; $messenger_params{input}->{type} = $params->{input}->{type}; #put the input data into the messenger params hash given ($params->{input}->{type}){ when('unix_socket'){ $messenger_params{input}->{fh} = IO::Socket::UNIX::->n +ew( Type=>SOCK_STREAM, Local=>$params->{input}->{name}, Listen=>$params->{input}->{Listen}, ) or croak "$!"; } when('network_socket'){ $messenger_params{input}->{fh} = IO::Socket::INET->new +( Listen=>$params->{input}->{Listen}, LocalAddr=>$params->{input}->{loca_addr}, Type=>SOCK_STREAM, Proto=>'tcp', Reuse=>1, LocalPort=>$params->{input}->{local_port}, )or croak "$!"; } when('named_pipe'){ mkfifo($params->{input}->{name},0777)||carp "could not + open named pipe : $!"; open $messenger_params{input}->{fh}, '<',$params->{input}->{name} or carp "could not open n +amedpipe for input: $!"; } } #putting the output subhashes into the messenger params hash my @keys = keys %{$params->{output}}; OUTPUT: foreach my $key (@keys){ $messenger_params{output}->{$key}->{type} = $params->{output}->{$key}->{type}; $messenger_params{output}->{$key}->{name} = $params->{output}->{$key}->{name}; given ($params->{output}->{$key}->{type}){ when('unix_socket'){ $messenger_params{output}->{$key}->{fh} = IO::Socket::UNIX->new( Type=>SOCK_STREAM, Local=>$params->{output}->{$key}->{name}, Listen=>$params->{output}->{$key}->{listen +}, ); } when('network_socket'){ $messenger_params{output}->{$key}->{fh} = IO::Socket::INET->new( LocalAddr=>$params->{output}->{$key}->{loc +al_addr}, LocalPort=>$params->{output}->{$key}->{loc +al_port}, Type=>SOCK_STREAM, Proto=>'tcp', Reuse=>1, Listen=>$params->{output}->{$key}->{listen +}, )or croak "$!"; } when('named_pipe'){ mkfifo($messenger_params{output}->{$key}->{name},0 +777)||carp "could not open named pipe : $!"; } } } $pid = fork(); if (!$pid){#I'm the son my $messenger = $class->new(\%messenger_params); print "starting to relay\n"; $messenger->relay; } else{ print "$pid forked\n"; $children{$pid} = 1; } } } if(!defined(caller())){ my $filename = shift; my $logfile = shift; &run($filename,$logfile); }

now the class he uses to summon his little slaves, the messenger daemons (each summoned has his TrueName-ie his pid) in the archi daemon Great Book Of Names (ie his %children hash)so he can be sent back whence he came if need arises

package Security::Monitoring::Messaging::Messenger; use 5.14.2; use strict; use warnings; use Carp qw(carp croak); use IO::Socket::INET; use IO::Socket::UNIX; =head1 NAME Security::Monitoring::Messaging::Messenger - The great new Security::M +onitoring::Messaging::Messenger! =head1 VERSION Version 0.01 =cut our $VERSION = '0.01'; =head1 SYNOPSIS $messenger->new($param_hash_ref); $messenger->relay; =head1 DESCRIPTION this module provides the messenger class, instances of this class will + be used in small daemon to handle message passing between the main units of th +e program, thus allowing each unit to interact as if on the same machine + of each other. the Main Messaging daemon creates smaller daemons when asked to do so +(at start time such order will come from the config files, during run time + it will listen to a named pipe for new orders) =head1 SUBROUTINES/METHODS =head2 new summons a new instance of the Messenger class =head3 use my %outputs = ( $whatevername => $filehandle, $whatevername2=>$anotherfilehandle, ); my %params = ( input=>$filehandle, output = \%output; ); my $messenger = $class->new(\%params); =cut sub new(\%) { my ($class,$params) = @_; my $self = {}; bless $self, $class; if (!defined $params){ croak "no params, cant create new messenger\n"; } $self->_init($params); return $self; } =head2 _init init function for messenger instances =cut sub _init { my($self,$params) = @_; $self->{input}->{type} = $params->{input}->{type}; $self->{input}->{fh} = $params->{input}->{fh}; $self->{input}->{name} = $params->{input}->{name}; my @outputs = keys $params->{output}; foreach my $key (@outputs){ print "init for $params->{output}->{$key}->{name}"; $self->{output}->{$key}->{type} = $params->{output}->{$key}->{ +type}; $self->{output}->{$key}->{fh} = $params->{output}->{$key}->{fh +}; $self->{output}->{$key}->{name} = $params->{output}->{$key}->{ +name}; } } =head2 _write this function is used by the relay sub for the messaging job =cut sub _write{ my $self = shift; if(!ref $self){ croak "can not do that as a class!"; } my $input = shift; #writes the input to every output using different method depending + on the #type of output my @keys = keys $self->{output}; foreach my $key (@keys){ if(!defined($self->{output}->{$key}->{fh}) && !$self->{output} +->{$key}->{type} eq 'named_pipe'){ croak "undefined fh for key $key\n"; } given($self->{output}->{$key}->{type}){ when('named_pipe'){ open my $handle ,'>',$self->{output}->{$key}->{name} o +r croak "could not open output handle"; print $handle $input; close $handle; } when('unix_socket'){ my $server = $self->{output}->{$key}->{fh}; if(!exists $self->{output}->{$key}->{socket}){ $self->{output}->{$key}->{socket}= $server->accept +() or croak "$!"; } my $socket = $self->{output}->{$key}->{socket}; print $socket $input; } when('network_socket'){ my $server = $self->{output}->{$key}->{fh}; if(!exists $self->{output}->{$key}->{socket}){ $self->{output}->{$key}->{socket} = $server->accep +t() or croak "$!"; } print "printing to output"; my $socket = $self->{output}->{$key}->{socket}; print $socket $input; } } } } =head2 relay this function is called just after forking : it starts the actual mess +aging, the daemon waits for input and transmit to all its outputs =head3 use $messenger->relay; =cut sub relay{ my $self = shift; if(!ref($self)){ croak "can not call relay from a class"; } my $handle = $self->{input}->{fh}; #starts reading from input given ($self->{input}->{type}){ when ('named_pipe'){ open $handle,'<',$self->{input}->{name}; while(<$handle>){ $self->_write($_); } } when('unix_socket'){ my $socket = $handle->accept()||croak "$!"; while(<$socket>){ my $data = $_; print "I receive $data\n"; $self->_write($data); } } when('network_socket'){ print "gonna try and listen on network socket named $self->{input}->{name}\n"; my $socket = $handle->accept()||croak"$!"; print "listening\n"; while(<$socket>){ my $data = $_; print "I receive $data\n"; $self->_write($data); } } } }

And now, ladies and gentlemen, have a look at this beast, this entity summoned from the deepest pits of madness as it goes about jumping through those poorly cobbled together testing hoops and does (hopefully) not crash and burn with an otherwordly Croak of despair!

*Cracks his whip

use Security::Monitoring::Messaging::Messenger; use 5.14.2; use strict; use warnings; use Carp qw(carp croak); use Storable qw(nstore_fd); use IO::Socket::UNIX; print "I'm $$\n"; my $input = {name=>'inputfile',type=>'network_socket',local_addr=>'localhost',loca +l_port=>'1065',listen=>5}; my $output = {name=>'outputfile',type=>'unix_socket'}; my %params = ( input=>$input, output=>{out1=>$output} ); LISTEN:if(! -e 'listenfile'){ goto LISTEN; } open my $fh,'>','listenfile'or croak"could not open pipe : $!"; nstore_fd(\%params, $fh); close $fh; my $pid; $pid = fork(); if(!$pid){ my $recsock; while(!defined $recsock){ eval{$recsock = IO::Socket::UNIX->new(Peer=>'outputfile',Type=>SOCK_STREAM)||c +roak"$!"}; } print "son here, opening inbound socket\n"; WHI:while(<$recsock>){ print "received $_"; } goto WHI; print "I die, son\n"; } else{ print "I'm the father of $pid\n"; my $c = 0; print"creating socket\n"; my $sock; while(!defined($sock)){ eval{$sock = IO::Socket::INET->new(PeerAddr=>'localhost',PeerPort=>1065,Proto=> +'tcp')or croak "$!"}; } WHAR: my $data = "trololol ".$c."\n"; print $sock $data or croak "$!"; $c++; goto WHAR; print "seems I'm done, father\n"; }
use Security::Monitoring::Messaging::Messenger; use 5.14.2; use strict; use warnings; use Carp qw(carp croak); use Storable qw(nstore_fd); my $input = {name=>'inputfile',type=>'named_pipe'}; my $output = {name=>'outputfile',type=>'network_socket',local_addr=>'localhost',loc +al_port=>1066,Proto=>'tcp',listen=>1}; my %params = ( input=>$input, output=>{out1=>$output} ); open my $fh,'>','listenfile'or croak"could not open pipe : $!"; nstore_fd(\%params, $fh); close $fh; WAIT:if(!-e 'inputfile'){ goto WAIT; } my $pid; $pid = fork(); if(!$pid){ my $recsock; while(!defined $recsock){ eval{$recsock = IO::Socket::INET->new(PeerAddr=>'localhost',PeerPort=>1066,Pro +to=>'tcp')||croak"$!"}; } print "son here, opening inbound socket\n"; WHI:while(<$recsock>){ print "received $_"; } goto WHI; print "I die, son\n"; } else{ print "I'm the father\n"; my $c = 0; WAIT: open $fh, '>','inputfile' or croak "could not open pipe :$!"; print $fh "trololol $c\n"; $c++; goto WAIT; }
use Security::Monitoring::Messaging::Messenger; use 5.14.2; use strict; use warnings; use Carp qw(carp croak); use Storable qw(nstore_fd); my $input = {name=>'inputfile',type=>'named_pipe'}; my $output = {name=>'outputfile',type=>'named_pipe'}; my %params = ( input=>$input, output=>{out1=>$output} ); open my $fh,'>','listenfile'or croak"could not open pipe : $!"; nstore_fd(\%params, $fh); close $fh; WAIT:if(!-e 'inputfile'){ goto WAIT; } my $pid; $pid = fork(); if(!$pid){ print "forked\n"; WAIT2:if(! -e 'outputfile'){ goto WAIT2; } open $fh,'<','outputfile'; print "son listening to outputfile\n"; while(<$fh>){ my $out = $_; print "got : $out"; } } else{ print "I'm the father\n"; my $c = 0; WAIT: open $fh, '>','inputfile' or croak "could not open pipe :$!"; print $fh "trololol $c\n"; $c++; goto WAIT; }
use Security::Monitoring::Messaging::Messenger; use 5.14.2; use strict; use warnings; use Carp qw(carp croak); use Storable qw(nstore_fd); use IO::Socket::UNIX; print "I'm $$\n"; my $input = {name=>'inputfile',type=>'unix_socket',listen=>1}; my $output = {name=>'outputfile',type=>'named_pipe'}; my %params = ( input=>$input, output=>{out1=>$output} ); open my $fh,'>','listenfile'or croak"could not open pipe : $!"; nstore_fd(\%params, $fh); close $fh; WAIT:if(!-e 'inputfile'){ goto WAIT; } my $pid; $pid = fork(); if(!$pid){ WAIT2:if(! -e 'outputfile'){ goto WAIT2; } LOOP:open $fh,'<','outputfile'; while(<$fh>){ print "got $_"; } goto LOOP; } else{ print "I'm the father of $pid\n"; my $c = 0; WAIT:if(! -e 'inputfile'){ goto WAIT; } print"creating socket\n"; my $sock = IO::Socket::UNIX->new(Peer=>'inputfile',Type=>SOCK_STREAM) or croak"$!"; while(1){ $sock->print("$c\n") or croak"$!"; $c++; sleep 1; } print "seems I'm done, father\n"; }
use Security::Monitoring::Messaging::Messenger; use 5.14.2; use strict; use warnings; use Carp qw(carp croak); use Storable qw(nstore_fd); use IO::Socket::UNIX; print "I'm $$\n"; my $input = {name=>'inputfile',type=>'network_socket',local_addr=>'localhost',loca +l_port=>'1065',listen=>5}; my $output = {name=>'outputfile',type=>'network_socket',local_addr=>'localhost',loc +al_port=>'1066',listen=>5}; my %params = ( input=>$input, output=>{out1=>$output} ); LISTEN:if(! -e 'listenfile'){ goto LISTEN; } open my $fh,'>','listenfile'or croak"could not open pipe : $!"; nstore_fd(\%params, $fh); close $fh; my $pid; $pid = fork(); if(!$pid){ my $recsock; while(!defined $recsock){ eval{$recsock = IO::Socket::INET->new(PeerAddr=>'localhost',PeerPort=>1066,Pro +to=>'tcp')||croak"$!"}; } print "son here, opening inbound socket\n"; WHI:while(<$recsock>){ print "received $_"; } goto WHI; print "I die, son\n"; } else{ print "I'm the father of $pid\n"; my $c = 0; print"creating socket\n"; my $sock; while(!defined($sock)){ eval{$sock = IO::Socket::INET->new(PeerAddr=>'localhost',PeerPort=>1065,Proto=> +'tcp')or croak "$!"}; } WHAR: my $data = "trololol ".$c."\n"; print $sock $data or croak "$!"; $c++; goto WHAR; print "seems I'm done, father\n"; }
use Security::Monitoring::Messaging::Messenger; use 5.14.2; use strict; use warnings; use Carp qw(carp croak); use Storable qw(nstore_fd); use IO::Socket::UNIX; print "I'm $$\n"; my $input = {name=>'inputfile',type=>'network_socket',local_addr=>'localhost',loca +l_port=>'1065',listen=>1}; my $output = {name=>'outputfile',type=>'named_pipe',local=>'outputfile',listen=>1}; my %params = ( input=>$input, output=>{out1=>$output} ); LISTEN:if(! -e 'listenfile'){ goto LISTEN; } open my $fh,'>','listenfile'or croak"could not open pipe : $!"; nstore_fd(\%params, $fh); close $fh; my $pid; $pid = fork(); if(!$pid){ WAITONOUTPUT:if(!-e 'outputfile'){ goto WAITONOUTPUT; } print "son here, opening outputfile\n"; open my $pipe, '<', 'outputfile'; WHI:while(<$pipe>){ print $_; } goto WHI; print "I die, son\n"; } else{ print "I'm the father of $pid\n"; my $c = 0; print"creating socket\n"; my $sock = IO::Socket::INET->new(PeerAddr=>'localhost',PeerPort=>1065,Proto=> +'tcp') or croak"$!"; WHAR: my $data = "trololol ".$c."\n"; print $sock $data; sleep 1; $c++; goto WHAR; print "seems I'm done, father\n"; }
use Security::Monitoring::Messaging::Messenger; use 5.14.2; use strict; use warnings; use Carp qw(carp croak); use Storable qw(nstore_fd); use IO::Socket::UNIX; print "I'm $$\n"; my $input = {name=>'inputfile',type=>'unix_socket',local=>'inputfile',listen=>1}; my $output = {name=>'outputfile',type=>'named_pipe',local=>'outputfile',listen=>1}; my %params = ( input=>$input, output=>{out1=>$output} ); LISTEN:if(! -e 'listenfile'){ goto LISTEN; } open my $fh,'>','listenfile'or croak"could not open pipe : $!"; nstore_fd(\%params, $fh); close $fh; WAIT:if(!-e 'inputfile'){ goto WAIT; } my $pid; $pid = fork(); if(!$pid){ WAITONOUTPUT:if(!-e 'outputfile'){ goto WAITONOUTPUT; } open my $pipe, '<', 'outputfile'; WHI:while(<$pipe>){ print $_; } goto WHI; print "I die, son\n"; } else{ print "I'm the father of $pid\n"; my $c = 0; WAIT:if(! -e 'inputfile'){ goto WAIT; } print"creating socket\n"; WHAR: my $sock = IO::Socket::UNIX->new(Peer=>'inputfile',Type=>SOCK_STRE +AM) or croak"$!"; my $data = "trololol ".$c."\n"; print $sock $data; sleep 1; $c++; goto WHAR; print "seems I'm done, father\n"; }
use Security::Monitoring::Messaging::Messenger; use 5.14.2; use strict; use warnings; use Carp qw(carp croak); use Storable qw(nstore_fd); use IO::Socket::UNIX; print "I'm $$\n"; my $input = {name=>'inputfile',type=>'unix_socket',local=>'inputfile',listen=>1}; my $output = {name=>'outputfile',type=>'unix_socket',local=>'outputfile',listen=>1} +; my %params = ( input=>$input, output=>{out1=>$output} ); LISTEN:if(! -e 'listenfile'){ goto LISTEN; } open my $fh,'>','listenfile'or croak"could not open pipe : $!"; nstore_fd(\%params, $fh); close $fh; WAIT:if(!-e 'inputfile'){ goto WAIT; } my $pid; $pid = fork(); if(!$pid){ WAITONOUTPUT:if(!-e 'outputfile'){ goto WAITONOUTPUT; } WHI: my $stock = IO::Socket::UNIX->new(Peer=>'outputfile',Type=>SOC +K_STREAM)||croak "$!"; while(<$stock>){ print "I receive $_"; } goto WHI; print "I die, son\n"; } else{ print "I'm the father of $pid\n"; my $c = 0; WAIT:if(! -e 'inputfile'){ goto WAIT; } print"creating socket\n"; WHAR: my $sock = IO::Socket::UNIX->new(Peer=>'inputfile',Type=>SOCK_STRE +AM) or croak"$!"; my $data = "trololol ".$c."\n"; print $sock $data; $c++; goto WHAR; print "seems I'm done, father\n"; }
use Security::Monitoring::Messaging::Messenger; use 5.14.2; use strict; use warnings; use Carp qw(carp croak); use Storable qw(nstore_fd); use IO::Socket::UNIX; print "I'm $$\n"; my $input = {name=>'inputfile',type=>'unix_socket',local=>'inputfile',listen=>1}; my $output = {name=>'outputfile',type=>'network_socket',local_addr=>'localhost',loc +al_port=>'1066',listen=>1}; my %params = ( input=>$input, output=>{out1=>$output} ); LISTEN:if(! -e 'listenfile'){ goto LISTEN; } open my $fh,'>','listenfile'or croak"could not open pipe : $!"; nstore_fd(\%params, $fh); close $fh; WAIT:if(!-e 'inputfile'){ goto WAIT; } my $pid; $pid = fork(); if(!$pid){ my $recsock; while(!defined $recsock){ eval{$recsock = IO::Socket::INET->new(PeerAddr=>'localhost',PeerPort=>1066,Pro +to=>'tcp')||croak"$!"}; } print "son here, opening inbound socket\n"; WHI:while(<$recsock>){ print "received $_"; } goto WHI; print "I die, son\n"; } else{ print "I'm the father of $pid\n"; my $c = 0; WAIT:if(! -e 'inputfile'){ goto WAIT; } print"creating socket\n"; WHAR: my $sock = IO::Socket::UNIX->new(Peer=>'inputfile',Type=>SOCK_STRE +AM) or croak"$!"; my $data = "trololol ".$c."\n"; print $sock $data; sleep 1; $c++; goto WHAR; print "seems I'm done, father\n"; }

Those slaves will happily pass around simple text messages, but what about more complicated serialized data with newline characters?

I am still looking for advice on how to handle those cases, thank you for reading this far :)


In reply to Messenger Daemon summoner by QuillMeantTen

Title:
Use:  <p> text here (a paragraph) </p>
and:  <code> code here </code>
to format your post, it's "PerlMonks-approved HTML":



  • Posts are HTML formatted. Put <p> </p> tags around your paragraphs. Put <code> </code> tags around your code and data!
  • Titles consisting of a single word are discouraged, and in most cases are disallowed outright.
  • Read Where should I post X? if you're not absolutely sure you're posting in the right place.
  • Please read these before you post! —
  • Posts may use any of the Perl Monks Approved HTML tags:
    a, abbr, b, big, blockquote, br, caption, center, col, colgroup, dd, del, details, div, dl, dt, em, font, h1, h2, h3, h4, h5, h6, hr, i, ins, li, ol, p, pre, readmore, small, span, spoiler, strike, strong, sub, summary, sup, table, tbody, td, tfoot, th, thead, tr, tt, u, ul, wbr
  • You may need to use entities for some characters, as follows. (Exception: Within code tags, you can put the characters literally.)
            For:     Use:
    & &amp;
    < &lt;
    > &gt;
    [ &#91;
    ] &#93;
  • Link using PerlMonks shortcuts! What shortcuts can I use for linking?
  • See Writeup Formatting Tips and other pages linked from there for more info.