package Security::Monitoring::Messaging;
use Security::Monitoring::Messaging::Messenger;
use 5.14.2;
use strict;
use warnings;
use Proc::Daemon;
use Carp qw(carp croak);
use Storable qw(fd_retrieve);
use POSIX qw(mkfifo);
use File::Spec::Functions;
use Cwd;
use IO::Socket::UNIX;
use IO::Socket::INET;
my $class = "Security::Monitoring::Messaging::Messenger";
=head1 NAME
Security::Monitoring::Messaging - The great new Security::Monitoring::Messaging!
=head1 VERSION
Version 0.01
=cut
our $VERSION = '0.01';
=head1 SYNOPSIS
you want to call this script using Proc::Daemon exec, that way it will be able
to do its work from the background. Whenever another script needs to ask for a
messenger (most of theses orders should be made at start time depending on the
configuration) it should nstore a hash to the messenger unit named pipe.
my %params = (
input=>{name=>'filename',
type=>'unix_socket|network_socket|named_pipe',
#where it will listen;
listen=>$queue_size_for_listen;#for sockets
Local=>'local_buffer_file_name',#unix sockets
local_addr=>'what it says on the can',#for network socket
local_port=>'idem',#for network socket
}
output1=>{name=>'filename',
type=>'unix_socket|network_socket|named_pipe,
Local=>'pathname to local buffer',#unix socket only
Listen=>$queue_size_for_listen',#for sockets
...
...
},
output2=>{...},
...
outputn=>{...},
);
run the daemon, with a name for the inputfile (it will be a named pipe)
then use that kind of code :
open my $fh,'>','listenfile'or croak"could not open pipe : $!";
nstore_fd(\%params, $fh);
close $fh;
the daemon will spawn the needed messenger subdaemon.
a messaging daemon should be started.
=head1 DESCRIPTION
this module is a daemon that listens for messenger requests and spawn little
messenger daemon to handle IPC between units (such as report communication
between the reporting unit and the monitoring unit) it is done so to allow
each unit to run on a different server.
=head1 SUBROUTINES/METHODS
run
=head2 run
this is the main function, even if this is a pm file it also is a self
contained program. One can run the whole module on one server or only part of
it and have it interact with other servers. An Instance of the Messaging
daemon has to run on each and every server that supports one of the main units
(ie reporting unit, config unit, mailer unit, monitoring unit and db
interaction unit)
=cut
=head3 sample param hash
my %params = (
input=>{name=>'filename',
type=>'unix_socket|network_socket|named_pipe',
#where it will listen;
listen=>$queue_size_for_listen;#for sockets
Local=>'local_buffer_file_name',#unix sockets
local_addr=>'what it says on the can',#for network socket
local_port=>'idem',#for network socket
}
output1=>{name=>'filename',
type=>'unix_socket|network_socket|named_pipe,
Local=>'pathname to local buffer',#unix socket only
Listen=>$queue_size_for_listen',#for sockets
...
...
},
output2=>{...},
...
outputn=>{...},
);
=cut
sub run($$) {
#setup part
my $request_file_name = shift;#named pipe to which the Messenging daemon will
#listen for messenger spawn requests.
my $log_file_name = shift;
if (!defined($log_file_name)){
$log_file_name = '/dev/null';
}
open my $log, '>>',$log_file_name or croak "could not open log_file";
if (!defined($request_file_name)){
croak "can not run without something to listen to!\n";
}
mkfifo($request_file_name,0777)||croak "could not open named pipe : $!";
my %children = ();
my $pid;
my %messenger_params;
print $log "I'm $$ and starting to run\n";
#running part
ABORT:while(1){
open my $fh,'<',$request_file_name;
my $params;
eval{$params = fd_retrieve($fh)};
if($@){
print $log "continuing after error $@";
close $fh;
redo;
}
close $fh;
print $log "params input name = $params->{input}->{name}\n";
$messenger_params{input}->{name} = $params->{input}->{name};
$messenger_params{input}->{type} = $params->{input}->{type};
#put the input data into the messenger params hash
given ($params->{input}->{type}){
when('unix_socket'){
$messenger_params{input}->{fh} = IO::Socket::UNIX::->new(
Type=>SOCK_STREAM,
Local=>$params->{input}->{name},
Listen=>$params->{input}->{Listen},
) or croak "$!";
}
when('network_socket'){
$messenger_params{input}->{fh} = IO::Socket::INET->new(
Listen=>$params->{input}->{Listen},
LocalAddr=>$params->{input}->{loca_addr},
Type=>SOCK_STREAM,
Proto=>'tcp',
Reuse=>1,
LocalPort=>$params->{input}->{local_port},
)or croak "$!";
}
when('named_pipe'){
mkfifo($params->{input}->{name},0777)||carp "could not open named pipe : $!";
open $messenger_params{input}->{fh},
'<',$params->{input}->{name} or carp "could not open namedpipe
for input: $!";
}
}
#putting the output subhashes into the messenger params hash
my @keys = keys %{$params->{output}};
OUTPUT: foreach my $key (@keys){
$messenger_params{output}->{$key}->{type} =
$params->{output}->{$key}->{type};
$messenger_params{output}->{$key}->{name} =
$params->{output}->{$key}->{name};
given ($params->{output}->{$key}->{type}){
when('unix_socket'){
$messenger_params{output}->{$key}->{fh}
= IO::Socket::UNIX->new(
Type=>SOCK_STREAM,
Local=>$params->{output}->{$key}->{name},
Listen=>$params->{output}->{$key}->{listen},
);
}
when('network_socket'){
$messenger_params{output}->{$key}->{fh}
= IO::Socket::INET->new(
LocalAddr=>$params->{output}->{$key}->{local_addr},
LocalPort=>$params->{output}->{$key}->{local_port},
Type=>SOCK_STREAM,
Proto=>'tcp',
Reuse=>1,
Listen=>$params->{output}->{$key}->{listen},
)or croak "$!";
}
when('named_pipe'){
mkfifo($messenger_params{output}->{$key}->{name},0777)||carp "could not open named pipe : $!";
}
}
}
$pid = fork();
if (!$pid){#I'm the son
my $messenger = $class->new(\%messenger_params);
print "starting to relay\n";
$messenger->relay;
}
else{
print "$pid forked\n";
$children{$pid} = 1;
}
}
}
if(!defined(caller())){
my $filename = shift;
my $logfile = shift;
&run($filename,$logfile);
}
####
package Security::Monitoring::Messaging::Messenger;
use 5.14.2;
use strict;
use warnings;
use Carp qw(carp croak);
use IO::Socket::INET;
use IO::Socket::UNIX;
=head1 NAME
Security::Monitoring::Messaging::Messenger - The great new Security::Monitoring::Messaging::Messenger!
=head1 VERSION
Version 0.01
=cut
our $VERSION = '0.01';
=head1 SYNOPSIS
$messenger->new($param_hash_ref);
$messenger->relay;
=head1 DESCRIPTION
this module provides the messenger class, instances of this class will be used
in small daemon to handle message passing between the main units of the
program, thus allowing each unit to interact as if on the same machine of each
other.
the Main Messaging daemon creates smaller daemons when asked to do so (at
start time such order will come from the config files, during run time it will
listen to a named pipe for new orders)
=head1 SUBROUTINES/METHODS
=head2 new
summons a new instance of the Messenger class
=head3 use
my %outputs = (
$whatevername => $filehandle,
$whatevername2=>$anotherfilehandle,
);
my %params = (
input=>$filehandle,
output = \%output;
);
my $messenger = $class->new(\%params);
=cut
sub new(\%) {
my ($class,$params) = @_;
my $self = {};
bless $self, $class;
if (!defined $params){
croak "no params, cant create new messenger\n";
}
$self->_init($params);
return $self;
}
=head2 _init
init function for messenger instances
=cut
sub _init {
my($self,$params) = @_;
$self->{input}->{type} = $params->{input}->{type};
$self->{input}->{fh} = $params->{input}->{fh};
$self->{input}->{name} = $params->{input}->{name};
my @outputs = keys $params->{output};
foreach my $key (@outputs){
print "init for $params->{output}->{$key}->{name}";
$self->{output}->{$key}->{type} = $params->{output}->{$key}->{type};
$self->{output}->{$key}->{fh} = $params->{output}->{$key}->{fh};
$self->{output}->{$key}->{name} = $params->{output}->{$key}->{name};
}
}
=head2 _write
this function is used by the relay sub for the messaging job
=cut
sub _write{
my $self = shift;
if(!ref $self){
croak "can not do that as a class!";
}
my $input = shift;
#writes the input to every output using different method depending on the
#type of output
my @keys = keys $self->{output};
foreach my $key (@keys){
if(!defined($self->{output}->{$key}->{fh}) && !$self->{output}->{$key}->{type} eq
'named_pipe'){
croak "undefined fh for key $key\n";
}
given($self->{output}->{$key}->{type}){
when('named_pipe'){
open my $handle ,'>',$self->{output}->{$key}->{name} or croak "could
not open output handle";
print $handle $input;
close $handle;
}
when('unix_socket'){
my $server = $self->{output}->{$key}->{fh};
if(!exists $self->{output}->{$key}->{socket}){
$self->{output}->{$key}->{socket}= $server->accept() or croak "$!";
}
my $socket = $self->{output}->{$key}->{socket};
print $socket $input;
}
when('network_socket'){
my $server = $self->{output}->{$key}->{fh};
if(!exists $self->{output}->{$key}->{socket}){
$self->{output}->{$key}->{socket} = $server->accept() or croak "$!";
}
print "printing to output";
my $socket = $self->{output}->{$key}->{socket};
print $socket $input;
}
}
}
}
=head2 relay
this function is called just after forking : it starts the actual messaging,
the daemon waits for input and transmit to all its outputs
=head3 use
$messenger->relay;
=cut
sub relay{
my $self = shift;
if(!ref($self)){
croak "can not call relay from a class";
}
my $handle = $self->{input}->{fh};
#starts reading from input
given ($self->{input}->{type}){
when ('named_pipe'){
open $handle,'<',$self->{input}->{name};
while(<$handle>){
$self->_write($_);
}
}
when('unix_socket'){
my $socket = $handle->accept()||croak "$!";
while(<$socket>){
my $data = $_;
print "I receive $data\n";
$self->_write($data);
}
}
when('network_socket'){
print "gonna try and listen on network socket named
$self->{input}->{name}\n";
my $socket = $handle->accept()||croak"$!";
print "listening\n";
while(<$socket>){
my $data = $_;
print "I receive $data\n";
$self->_write($data);
}
}
}
}
####
use Security::Monitoring::Messaging::Messenger;
use 5.14.2;
use strict;
use warnings;
use Carp qw(carp croak);
use Storable qw(nstore_fd);
use IO::Socket::UNIX;
print "I'm $$\n";
my $input =
{name=>'inputfile',type=>'network_socket',local_addr=>'localhost',local_port=>'1065',listen=>5};
my $output =
{name=>'outputfile',type=>'unix_socket'};
my %params = (
input=>$input,
output=>{out1=>$output}
);
LISTEN:if(! -e 'listenfile'){
goto LISTEN;
}
open my $fh,'>','listenfile'or croak"could not open pipe : $!";
nstore_fd(\%params, $fh);
close $fh;
my $pid;
$pid = fork();
if(!$pid){
my $recsock;
while(!defined $recsock){
eval{$recsock =
IO::Socket::UNIX->new(Peer=>'outputfile',Type=>SOCK_STREAM)||croak"$!"};
}
print "son here, opening inbound socket\n";
WHI:while(<$recsock>){
print "received $_";
}
goto WHI;
print "I die, son\n";
}
else{
print "I'm the father of $pid\n";
my $c = 0;
print"creating socket\n";
my $sock;
while(!defined($sock)){
eval{$sock =
IO::Socket::INET->new(PeerAddr=>'localhost',PeerPort=>1065,Proto=>'tcp')or
croak "$!"};
}
WHAR:
my $data = "trololol ".$c."\n";
print $sock $data or croak "$!";
$c++;
goto WHAR;
print "seems I'm done, father\n";
}
####
use Security::Monitoring::Messaging::Messenger;
use 5.14.2;
use strict;
use warnings;
use Carp qw(carp croak);
use Storable qw(nstore_fd);
my $input = {name=>'inputfile',type=>'named_pipe'};
my $output =
{name=>'outputfile',type=>'network_socket',local_addr=>'localhost',local_port=>1066,Proto=>'tcp',listen=>1};
my %params = (
input=>$input,
output=>{out1=>$output}
);
open my $fh,'>','listenfile'or croak"could not open pipe : $!";
nstore_fd(\%params, $fh);
close $fh;
WAIT:if(!-e 'inputfile'){
goto WAIT;
}
my $pid;
$pid = fork();
if(!$pid){
my $recsock;
while(!defined $recsock){
eval{$recsock =
IO::Socket::INET->new(PeerAddr=>'localhost',PeerPort=>1066,Proto=>'tcp')||croak"$!"};
}
print "son here, opening inbound socket\n";
WHI:while(<$recsock>){
print "received $_";
}
goto WHI;
print "I die, son\n";
}
else{
print "I'm the father\n";
my $c = 0;
WAIT:
open $fh, '>','inputfile' or croak "could not open pipe :$!";
print $fh "trololol $c\n";
$c++;
goto WAIT;
}
####
use Security::Monitoring::Messaging::Messenger;
use 5.14.2;
use strict;
use warnings;
use Carp qw(carp croak);
use Storable qw(nstore_fd);
my $input = {name=>'inputfile',type=>'named_pipe'};
my $output = {name=>'outputfile',type=>'named_pipe'};
my %params = (
input=>$input,
output=>{out1=>$output}
);
open my $fh,'>','listenfile'or croak"could not open pipe : $!";
nstore_fd(\%params, $fh);
close $fh;
WAIT:if(!-e 'inputfile'){
goto WAIT;
}
my $pid;
$pid = fork();
if(!$pid){
print "forked\n";
WAIT2:if(! -e 'outputfile'){
goto WAIT2;
}
open $fh,'<','outputfile';
print "son listening to outputfile\n";
while(<$fh>){
my $out = $_;
print "got : $out";
}
}
else{
print "I'm the father\n";
my $c = 0;
WAIT:
open $fh, '>','inputfile' or croak "could not open pipe :$!";
print $fh "trololol $c\n";
$c++;
goto WAIT;
}
####
use Security::Monitoring::Messaging::Messenger;
use 5.14.2;
use strict;
use warnings;
use Carp qw(carp croak);
use Storable qw(nstore_fd);
use IO::Socket::UNIX;
print "I'm $$\n";
my $input =
{name=>'inputfile',type=>'unix_socket',listen=>1};
my $output = {name=>'outputfile',type=>'named_pipe'};
my %params = (
input=>$input,
output=>{out1=>$output}
);
open my $fh,'>','listenfile'or croak"could not open pipe : $!";
nstore_fd(\%params, $fh);
close $fh;
WAIT:if(!-e 'inputfile'){
goto WAIT;
}
my $pid;
$pid = fork();
if(!$pid){
WAIT2:if(! -e 'outputfile'){
goto WAIT2;
}
LOOP:open $fh,'<','outputfile';
while(<$fh>){
print "got $_";
}
goto LOOP;
}
else{
print "I'm the father of $pid\n";
my $c = 0;
WAIT:if(! -e 'inputfile'){
goto WAIT;
}
print"creating socket\n";
my $sock =
IO::Socket::UNIX->new(Peer=>'inputfile',Type=>SOCK_STREAM) or
croak"$!";
while(1){
$sock->print("$c\n") or croak"$!";
$c++;
sleep 1;
}
print "seems I'm done, father\n";
}
####
use Security::Monitoring::Messaging::Messenger;
use 5.14.2;
use strict;
use warnings;
use Carp qw(carp croak);
use Storable qw(nstore_fd);
use IO::Socket::UNIX;
print "I'm $$\n";
my $input =
{name=>'inputfile',type=>'network_socket',local_addr=>'localhost',local_port=>'1065',listen=>5};
my $output =
{name=>'outputfile',type=>'network_socket',local_addr=>'localhost',local_port=>'1066',listen=>5};
my %params = (
input=>$input,
output=>{out1=>$output}
);
LISTEN:if(! -e 'listenfile'){
goto LISTEN;
}
open my $fh,'>','listenfile'or croak"could not open pipe : $!";
nstore_fd(\%params, $fh);
close $fh;
my $pid;
$pid = fork();
if(!$pid){
my $recsock;
while(!defined $recsock){
eval{$recsock =
IO::Socket::INET->new(PeerAddr=>'localhost',PeerPort=>1066,Proto=>'tcp')||croak"$!"};
}
print "son here, opening inbound socket\n";
WHI:while(<$recsock>){
print "received $_";
}
goto WHI;
print "I die, son\n";
}
else{
print "I'm the father of $pid\n";
my $c = 0;
print"creating socket\n";
my $sock;
while(!defined($sock)){
eval{$sock =
IO::Socket::INET->new(PeerAddr=>'localhost',PeerPort=>1065,Proto=>'tcp')or
croak "$!"};
}
WHAR:
my $data = "trololol ".$c."\n";
print $sock $data or croak "$!";
$c++;
goto WHAR;
print "seems I'm done, father\n";
}
####
use Security::Monitoring::Messaging::Messenger;
use 5.14.2;
use strict;
use warnings;
use Carp qw(carp croak);
use Storable qw(nstore_fd);
use IO::Socket::UNIX;
print "I'm $$\n";
my $input =
{name=>'inputfile',type=>'network_socket',local_addr=>'localhost',local_port=>'1065',listen=>1};
my $output =
{name=>'outputfile',type=>'named_pipe',local=>'outputfile',listen=>1};
my %params = (
input=>$input,
output=>{out1=>$output}
);
LISTEN:if(! -e 'listenfile'){
goto LISTEN;
}
open my $fh,'>','listenfile'or croak"could not open pipe : $!";
nstore_fd(\%params, $fh);
close $fh;
my $pid;
$pid = fork();
if(!$pid){
WAITONOUTPUT:if(!-e 'outputfile'){
goto WAITONOUTPUT;
}
print "son here, opening outputfile\n";
open my $pipe, '<', 'outputfile';
WHI:while(<$pipe>){
print $_;
}
goto WHI;
print "I die, son\n";
}
else{
print "I'm the father of $pid\n";
my $c = 0;
print"creating socket\n";
my $sock =
IO::Socket::INET->new(PeerAddr=>'localhost',PeerPort=>1065,Proto=>'tcp') or
croak"$!";
WHAR:
my $data = "trololol ".$c."\n";
print $sock $data;
sleep 1;
$c++;
goto WHAR;
print "seems I'm done, father\n";
}
####
use Security::Monitoring::Messaging::Messenger;
use 5.14.2;
use strict;
use warnings;
use Carp qw(carp croak);
use Storable qw(nstore_fd);
use IO::Socket::UNIX;
print "I'm $$\n";
my $input =
{name=>'inputfile',type=>'unix_socket',local=>'inputfile',listen=>1};
my $output =
{name=>'outputfile',type=>'named_pipe',local=>'outputfile',listen=>1};
my %params = (
input=>$input,
output=>{out1=>$output}
);
LISTEN:if(! -e 'listenfile'){
goto LISTEN;
}
open my $fh,'>','listenfile'or croak"could not open pipe : $!";
nstore_fd(\%params, $fh);
close $fh;
WAIT:if(!-e 'inputfile'){
goto WAIT;
}
my $pid;
$pid = fork();
if(!$pid){
WAITONOUTPUT:if(!-e 'outputfile'){
goto WAITONOUTPUT;
}
open my $pipe, '<', 'outputfile';
WHI:while(<$pipe>){
print $_;
}
goto WHI;
print "I die, son\n";
}
else{
print "I'm the father of $pid\n";
my $c = 0;
WAIT:if(! -e 'inputfile'){
goto WAIT;
}
print"creating socket\n";
WHAR:
my $sock = IO::Socket::UNIX->new(Peer=>'inputfile',Type=>SOCK_STREAM) or
croak"$!";
my $data = "trololol ".$c."\n";
print $sock $data;
sleep 1;
$c++;
goto WHAR;
print "seems I'm done, father\n";
}
####
use Security::Monitoring::Messaging::Messenger;
use 5.14.2;
use strict;
use warnings;
use Carp qw(carp croak);
use Storable qw(nstore_fd);
use IO::Socket::UNIX;
print "I'm $$\n";
my $input =
{name=>'inputfile',type=>'unix_socket',local=>'inputfile',listen=>1};
my $output =
{name=>'outputfile',type=>'unix_socket',local=>'outputfile',listen=>1};
my %params = (
input=>$input,
output=>{out1=>$output}
);
LISTEN:if(! -e 'listenfile'){
goto LISTEN;
}
open my $fh,'>','listenfile'or croak"could not open pipe : $!";
nstore_fd(\%params, $fh);
close $fh;
WAIT:if(!-e 'inputfile'){
goto WAIT;
}
my $pid;
$pid = fork();
if(!$pid){
WAITONOUTPUT:if(!-e 'outputfile'){
goto WAITONOUTPUT;
}
WHI: my $stock = IO::Socket::UNIX->new(Peer=>'outputfile',Type=>SOCK_STREAM)||croak "$!";
while(<$stock>){
print "I receive $_";
}
goto WHI;
print "I die, son\n";
}
else{
print "I'm the father of $pid\n";
my $c = 0;
WAIT:if(! -e 'inputfile'){
goto WAIT;
}
print"creating socket\n";
WHAR:
my $sock = IO::Socket::UNIX->new(Peer=>'inputfile',Type=>SOCK_STREAM) or
croak"$!";
my $data = "trololol ".$c."\n";
print $sock $data;
$c++;
goto WHAR;
print "seems I'm done, father\n";
}
####
use Security::Monitoring::Messaging::Messenger;
use 5.14.2;
use strict;
use warnings;
use Carp qw(carp croak);
use Storable qw(nstore_fd);
use IO::Socket::UNIX;
print "I'm $$\n";
my $input =
{name=>'inputfile',type=>'unix_socket',local=>'inputfile',listen=>1};
my $output =
{name=>'outputfile',type=>'network_socket',local_addr=>'localhost',local_port=>'1066',listen=>1};
my %params = (
input=>$input,
output=>{out1=>$output}
);
LISTEN:if(! -e 'listenfile'){
goto LISTEN;
}
open my $fh,'>','listenfile'or croak"could not open pipe : $!";
nstore_fd(\%params, $fh);
close $fh;
WAIT:if(!-e 'inputfile'){
goto WAIT;
}
my $pid;
$pid = fork();
if(!$pid){
my $recsock;
while(!defined $recsock){
eval{$recsock =
IO::Socket::INET->new(PeerAddr=>'localhost',PeerPort=>1066,Proto=>'tcp')||croak"$!"};
}
print "son here, opening inbound socket\n";
WHI:while(<$recsock>){
print "received $_";
}
goto WHI;
print "I die, son\n";
}
else{
print "I'm the father of $pid\n";
my $c = 0;
WAIT:if(! -e 'inputfile'){
goto WAIT;
}
print"creating socket\n";
WHAR:
my $sock = IO::Socket::UNIX->new(Peer=>'inputfile',Type=>SOCK_STREAM) or
croak"$!";
my $data = "trololol ".$c."\n";
print $sock $data;
sleep 1;
$c++;
goto WHAR;
print "seems I'm done, father\n";
}