use strict; use warnings; use feature 'say'; use Time::HiRes qw( sleep time ); use Net::SNMPTrapd; use MCE::Flow; use MCE::Queue; my $queue = MCE::Queue->new( await => 1, fast => 1 ); my $start = time; # floating seconds # from left to right: 1 listener, 30 consumers, and 2 producers mce_flow { max_workers => [ 1, 30, 2 ] }, \&listener, \&consumer, \&producer; printf {*STDERR} "duration: %0.3f seconds\n", time() - $start; exit(0); # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # listener and consumer roles # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ sub listener { my $snmptrapd = Net::SNMPTrapd->new( ReusePort => 1 ); my $count = 0; while ( 1 ) { my $trap = $snmptrapd->get_trap(); if ( !defined $trap ) { printf {*STDERR} "$0: %s\n", Net::SNMPTrapd->error(); next; } elsif ( $trap == 0 ) { next; } # important, remove the file handle inside the object # so that serialization into the queue succeeds delete $trap->{_UDPSERVER_}; # enqueue the trap for a consumer to process $queue->enqueue($trap); # await will block if the queue has more than 2000 pending # this prevents the queue from consuming memory out of control # simply increase the number of consumers to not block # # $queue->await(2000) if ( ++$count % 4000 == 0 ); if ( ( ++$count % 4000 == 0 ) && $queue->pending() > 3000 ) { say {*STDERR} "$0: WARN: blocking temporarily"; $queue->await(2000); } # reset the counter to not overflow $count = 0 if ( $count > 2e9 ); # for benchmarking, leave the loop after 4000 traps last if ( $count == 4000 ); } # notify the manager process to stop the producer and queue MCE->do('quit_producers'); } sub consumer { while ( defined ( my $trap = $queue->dequeue() ) ) { $trap->process_trap(); # printf "[$$] %s\t%i\t%i\t%s\n", # $trap->remoteaddr, $trap->remoteport, # $trap->version, $trap->community; say "[$$] ".$trap->varbinds->[5]->{'1.3.6.1.4.1.50000.1.6'}; sleep 0.004; } } # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # producer role for generating traps # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ my @producer_pids; sub set_producer_pid { push @producer_pids, $_[0]; } sub quit_producers { kill 'QUIT', @producer_pids; $queue->end(); } sub producer { require Net::SNMP; my ( $run, $i ) = ( 1, 0 ); $SIG{QUIT} = sub { $run = 0 }; # notify the manager process my pid MCE->do('set_producer_pid', $$); my ( $session, $error ) = Net::SNMP->session( -hostname => 'localhost', -version => 2, -community => 'public', -port => 162 ); if ( !defined $session ) { printf "Error: Starting SNMP session (v2c trap) - %s\n", $error; return; } while ( $run ) { my $result = $session->snmpv2_trap( -varbindlist => [ '1.3.6.1.2.1.1.3.0', 0x43, int( time() ), '1.3.6.1.6.3.1.1.4.1.0', 0x06, '1.3.6.1.4.1.50000', '1.3.6.1.4.1.50000.1.3', 0x02, 1, '1.3.6.1.4.1.50000.1.4', 0x04, 'String', '1.3.6.1.4.1.50000.1.5', 0x06, '1.2.3.4.5.6.7.8.9', '1.3.6.1.4.1.50000.1.6', 0x40, '10.10.10.'.((++$i % 100) + 1), '1.3.6.1.4.1.50000.1.7', 0x41, 32323232, '1.3.6.1.4.1.50000.1.8', 0x42, 42424242, '1.3.6.1.4.1.50000.1.9', 0x43, int( time() ), '1.3.6.1.4.1.50000.1.10', 0x44, 'opaque data' ] ); } $session->close; }