use threads;
####
use strict;
use warnings;
# perl snmp_server.pl | wc -l
use Time::HiRes qw( sleep time );
use Net::SNMPTrapd;
use MCE::Flow;
use MCE::Queue;
my $queue = MCE::Queue->new( await => 1, fast => 1 );
my $max_consumers = 30;
my $start = 0;
MCE::Flow::init {
user_output => sub { print {*STDOUT} $_[0]; },
user_error => sub { print {*STDERR} $_[0]; },
};
mce_flow { max_workers => [ 1, $max_consumers ] }, \&listener, \&consumer;
printf {*STDERR} "duration : %0.03f seconds\n", time - $start;
exit(0);
sub set_start {
$start = $_[0];
}
sub listener {
my $snmptrapd = Net::SNMPTrapd->new( ReusePort => 1 );
my $count = 0;
my $start;
while ( 1 ) {
my $trap = $snmptrapd->get_trap();
if ( !defined $trap ) {
MCE->printf(\*STDERR, "$0: %s\n", Net::SNMPTrapd->error());
next;
}
elsif ( $trap == 0 ) {
next;
}
$start = time(), MCE->do('set_start', $start) unless $start;
# important, remove the file handle inside the object
delete $trap->{_UDPSERVER_};
# enqueue the trap for a consumer to process
$queue->enqueue($trap);
# leave the loop after 10,000 traps
last if ( ++$count >= 10000 );
# safety to prevent the queue from consuming memory out of control
# $queue->await(2000) if ( $count % 4000 == 0 );
# if ( ( $count % 4000 == 0 ) && $queue->pending() > 3000 ) {
# MCE->say(\*STDERR, "Warn: blocking temporarily");
# $queue->await(2000);
# }
# reset the counter to not overflow
$count = 0 if ( $count > 2e9 );
}
$queue->enqueue((undef) x $max_consumers);
MCE->printf(\*STDERR, "enqueue : %0.03f seconds\n", time - $start);
MCE->printf(\*STDERR, "pending : %d\n", $queue->pending());
}
sub consumer {
while ( defined ( my $trap = $queue->dequeue() ) ) {
$trap->process_trap();
MCE->printf( "[$$] %s\t%i\t%i\t%s\n",
$trap->remoteaddr, $trap->remoteport,
$trap->version, $trap->community );
sleep 0.004;
}
}
####
use strict;
use warnings;
# perl snmp_producer.pl
use Net::SNMP;
use MCE::Flow;
use MCE::Queue;
mce_flow { max_workers => 2 }, \&producer;
exit(0);
sub producer {
my ( $session, $error ) = Net::SNMP->session(
-hostname => '127.0.0.1',
-version => 2,
-community => 'public',
-port => 162
);
if ( !defined $session ) {
printf "Error: Starting SNMP session (v2c trap) - %s\n", $error;
return;
}
for my $i ( 1 .. 5000 ) {
my $result = $session->snmpv2_trap(
-varbindlist => [
'1.3.6.1.2.1.1.3.0', 0x43, int( time() ),
'1.3.6.1.6.3.1.1.4.1.0', 0x06, '1.3.6.1.4.1.50000',
'1.3.6.1.4.1.50000.1.3', 0x02, 1,
'1.3.6.1.4.1.50000.1.4', 0x04, 'String',
'1.3.6.1.4.1.50000.1.5', 0x06, '1.2.3.4.5.6.7.8.9',
'1.3.6.1.4.1.50000.1.6', 0x40, '10.10.10.'.((++$i % 100) + 1),
'1.3.6.1.4.1.50000.1.7', 0x41, 32323232,
'1.3.6.1.4.1.50000.1.8', 0x42, 42424242,
'1.3.6.1.4.1.50000.1.9', 0x43, int( time() ),
'1.3.6.1.4.1.50000.1.10', 0x44, 'opaque data'
]
);
}
$session->close;
}
####
Host A : perl snmp_server.pl | wc -l
Host B : perl snmp_producer.pl