use threads; #### use strict; use warnings; # perl snmp_server.pl | wc -l use Time::HiRes qw( sleep time ); use Net::SNMPTrapd; use MCE::Flow; use MCE::Queue; my $queue = MCE::Queue->new( await => 1, fast => 1 ); my $max_consumers = 30; my $start = 0; MCE::Flow::init { user_output => sub { print {*STDOUT} $_[0]; }, user_error => sub { print {*STDERR} $_[0]; }, }; mce_flow { max_workers => [ 1, $max_consumers ] }, \&listener, \&consumer; printf {*STDERR} "duration : %0.03f seconds\n", time - $start; exit(0); sub set_start { $start = $_[0]; } sub listener { my $snmptrapd = Net::SNMPTrapd->new( ReusePort => 1 ); my $count = 0; my $start; while ( 1 ) { my $trap = $snmptrapd->get_trap(); if ( !defined $trap ) { MCE->printf(\*STDERR, "$0: %s\n", Net::SNMPTrapd->error()); next; } elsif ( $trap == 0 ) { next; } $start = time(), MCE->do('set_start', $start) unless $start; # important, remove the file handle inside the object delete $trap->{_UDPSERVER_}; # enqueue the trap for a consumer to process $queue->enqueue($trap); # leave the loop after 10,000 traps last if ( ++$count >= 10000 ); # safety to prevent the queue from consuming memory out of control # $queue->await(2000) if ( $count % 4000 == 0 ); # if ( ( $count % 4000 == 0 ) && $queue->pending() > 3000 ) { # MCE->say(\*STDERR, "Warn: blocking temporarily"); # $queue->await(2000); # } # reset the counter to not overflow $count = 0 if ( $count > 2e9 ); } $queue->enqueue((undef) x $max_consumers); MCE->printf(\*STDERR, "enqueue : %0.03f seconds\n", time - $start); MCE->printf(\*STDERR, "pending : %d\n", $queue->pending()); } sub consumer { while ( defined ( my $trap = $queue->dequeue() ) ) { $trap->process_trap(); MCE->printf( "[$$] %s\t%i\t%i\t%s\n", $trap->remoteaddr, $trap->remoteport, $trap->version, $trap->community ); sleep 0.004; } } #### use strict; use warnings; # perl snmp_producer.pl use Net::SNMP; use MCE::Flow; use MCE::Queue; mce_flow { max_workers => 2 }, \&producer; exit(0); sub producer { my ( $session, $error ) = Net::SNMP->session( -hostname => '127.0.0.1', -version => 2, -community => 'public', -port => 162 ); if ( !defined $session ) { printf "Error: Starting SNMP session (v2c trap) - %s\n", $error; return; } for my $i ( 1 .. 5000 ) { my $result = $session->snmpv2_trap( -varbindlist => [ '1.3.6.1.2.1.1.3.0', 0x43, int( time() ), '1.3.6.1.6.3.1.1.4.1.0', 0x06, '1.3.6.1.4.1.50000', '1.3.6.1.4.1.50000.1.3', 0x02, 1, '1.3.6.1.4.1.50000.1.4', 0x04, 'String', '1.3.6.1.4.1.50000.1.5', 0x06, '1.2.3.4.5.6.7.8.9', '1.3.6.1.4.1.50000.1.6', 0x40, '10.10.10.'.((++$i % 100) + 1), '1.3.6.1.4.1.50000.1.7', 0x41, 32323232, '1.3.6.1.4.1.50000.1.8', 0x42, 42424242, '1.3.6.1.4.1.50000.1.9', 0x43, int( time() ), '1.3.6.1.4.1.50000.1.10', 0x44, 'opaque data' ] ); } $session->close; } #### Host A : perl snmp_server.pl | wc -l Host B : perl snmp_producer.pl