Hi talexb,
Regarding the MCE module, the main process enters a loop to handle IPC events while running. The following is a MCE::Hobo and MCE::Shared demonstration, based on the MCE::Flow solution. Here, MCE::Shared spawns a background process to handle IPC events. This allows the main process to listen for traps.
use strict;
use warnings;
# perl snmp_server.pl | wc -l
use Time::HiRes qw( sleep time );
use Net::SNMPTrapd;
use MCE::Hobo;
use MCE::Shared;
my $queue = MCE::Shared->queue( await => 1, fast => 1 );
my $start = 0;
# construct two shared handles, prevents garbled output
# consumers send output to the shared-manager process
mce_open my $outfh, '>>', \*STDOUT;
mce_open my $errfh, '>>', \*STDERR;
MCE::Hobo->create(\&consumer) for 1 .. 30;
listener();
MCE::Hobo->waitall();
printf {$errfh} "duration : %0.03f seconds\n", time - $start;
exit(0);
sub listener {
my $snmptrapd = Net::SNMPTrapd->new( ReusePort => 1 );
my $count = 0;
while ( 1 ) {
my $trap = $snmptrapd->get_trap();
if ( !defined $trap ) {
printf {$errfh} "$0: %s\n", Net::SNMPTrapd->error();
next;
}
elsif ( $trap == 0 ) {
next;
}
$start = time() unless $start;
# important, remove the file handle inside the object
delete $trap->{_UDPSERVER_};
# enqueue the trap for a consumer to process
$queue->enqueue($trap);
# leave the loop after 10,000 traps
last if ( ++$count >= 10000 );
# reset the counter to not overflow
$count = 0 if ( $count > 2e9 );
}
$queue->end();
printf {$errfh} "enqueue : %0.03f seconds\n", time - $start;
printf {$errfh} "pending : %d\n", $queue->pending();
}
sub consumer {
while ( defined ( my $trap = $queue->dequeue() ) ) {
$trap->process_trap();
printf {$outfh} "[$$] %s\t%i\t%i\t%s\n",
$trap->remoteaddr, $trap->remoteport,
$trap->version, $trap->community;
sleep 0.004;
}
}
For comparison, the following provides a threads and Thread::Queue demonstration. Fortunately, one may run MCE::Shared alongside threads to get shared-handles support. Please note that this demonstration requires freezing and thawing at the application level. Serialization is typically automatic for MCE and MCE::Shared solutions.
use strict;
use warnings;
# perl snmp_server.pl | wc -l
use threads;
use Thread::Queue;
use Time::HiRes qw( sleep time );
use Storable qw( freeze thaw );
use Net::SNMPTrapd;
use MCE::Shared;
my $queue = Thread::Queue->new();
my $max_consumers = 30;
my $start = 0;
mce_open my $outfh, '>>', \*STDOUT;
mce_open my $errfh, '>>', \*STDERR;
threads->create(\&consumer) for 1 .. $max_consumers;
listener();
$_->join for threads->list();
printf {$errfh} "duration : %0.03f seconds\n", time - $start;
exit(0);
sub listener {
my $snmptrapd = Net::SNMPTrapd->new( ReusePort => 1 );
my $count = 0;
while ( 1 ) {
my $trap = $snmptrapd->get_trap();
if ( !defined $trap ) {
printf {$errfh} "$0: %s\n", Net::SNMPTrapd->error();
next;
}
elsif ( $trap == 0 ) {
next;
}
$start = time() unless $start;
# important, remove the file handle inside the object
delete $trap->{_UDPSERVER_};
# enqueue the trap for a consumer to process
$queue->enqueue( freeze($trap) );
# leave the loop after 10,000 traps
last if ( ++$count >= 10000 );
# reset the counter to not overflow
$count = 0 if ( $count > 2e9 );
}
# $queue->end(); # newer Thread::Queue and MCE::Shared releases
$queue->enqueue((undef) x $max_consumers); # older releases
printf {$errfh} "enqueue : %0.03f seconds\n", time - $start;
printf {$errfh} "pending : %d\n", $queue->pending();
}
sub consumer {
while ( defined ( my $item = $queue->dequeue() ) ) {
my $trap = thaw($item);
$trap->process_trap();
printf {$outfh} "[$$] %s\t%i\t%i\t%s\n",
$trap->remoteaddr, $trap->remoteport,
$trap->version, $trap->community;
sleep 0.004;
}
}
On my Linux box, the MCE::Hobo and MCE::Shared demonstration completes 10k traps in 2.2 seconds. The threads and Thread::Queue demonstration needs more time, unexpectingly and completes in 13.4 seconds. Notice the difference with the number of traps pending in the queue.
# perl snmp_hobo.pl | wc -l
enqueue : 1.982 seconds
pending : 1085
duration : 2.229 seconds
10000
# perl snmp_thr.pl | wc -l
enqueue : 1.987 seconds
pending : 8510
duration : 13.372 seconds
10000
The following is the trap generator used to feed both demonstrations. To not impact the listener/consumer script, run this from another host.
use strict;
use warnings;
use Net::SNMP;
use MCE::Flow;
use MCE::Queue;
mce_flow { max_workers => 2 }, \&producer;
exit(0);
sub producer {
my ( $session, $error ) = Net::SNMP->session(
-hostname => '192.168.0.16',
-version => 2,
-community => 'public',
-port => 162
);
if ( !defined $session ) {
printf "Error: Starting SNMP session (v2c trap) - %s\n", $erro
+r;
return;
}
for my $i ( 1 .. 5000 ) {
my $result = $session->snmpv2_trap(
-varbindlist => [
'1.3.6.1.2.1.1.3.0', 0x43, int( time() ),
'1.3.6.1.6.3.1.1.4.1.0', 0x06, '1.3.6.1.4.1.50000',
'1.3.6.1.4.1.50000.1.3', 0x02, 1,
'1.3.6.1.4.1.50000.1.4', 0x04, 'String',
'1.3.6.1.4.1.50000.1.5', 0x06, '1.2.3.4.5.6.7.8.9',
'1.3.6.1.4.1.50000.1.6', 0x40, '10.10.10.'.((++$i % 1
+00) + 1),
'1.3.6.1.4.1.50000.1.7', 0x41, 32323232,
'1.3.6.1.4.1.50000.1.8', 0x42, 42424242,
'1.3.6.1.4.1.50000.1.9', 0x43, int( time() ),
'1.3.6.1.4.1.50000.1.10', 0x44, 'opaque data'
]
);
}
$session->close;
}
Regarding IPC fetch-requests, I've tried for MCE and MCE::Shared on Linux to reach closer to BSD levels. Threads is a mystery sometimes. I'm not sure why threads is running slow under Red Hat / CentOS 7.3 - Perl v5.16.3.
# Linux: MCE::Hobo - MCE::Shared->queue, Perl v5.16.3
enqueue : 1.982 seconds
pending : 1085
duration : 2.229 seconds
10000
# Linux: threads - Thread::Queue, Perl v5.16.3
enqueue : 1.987 seconds
pending : 8510
duration : 13.372 seconds
10000
# Mac OS X: MCE::Hobo - MCE::Shared->queue, Perl v5.18.2
enqueue : 1.589 seconds
pending : 191
duration : 1.632 seconds
10000
# Mac OS X: threads - Thread::Queue, Perl v5.18.2
enqueue : 1.620 seconds
pending : 128
duration : 1.867 seconds
10000
Regards, Mario |