use strict; use warnings; # perl snmp_server.pl | wc -l use Time::HiRes qw( sleep time ); use Net::SNMPTrapd; use MCE::Flow; use MCE::Queue; my $queue = MCE::Queue->new( await => 1, fast => 1 ); my $max_consumers = 30; my $start = 0; MCE::Flow::init { user_output => sub { print {*STDOUT} $_[0]; }, user_error => sub { print {*STDERR} $_[0]; }, }; mce_flow { max_workers => [ 1, $max_consumers ] }, \&listener, \&consumer; printf {*STDERR} "duration : %0.03f seconds\n", time - $start; exit(0); sub set_start { $start = $_[0]; } sub listener { my $snmptrapd = Net::SNMPTrapd->new( ReusePort => 1 ); my $count = 0; my $start; while ( 1 ) { my $trap = $snmptrapd->get_trap(); if ( !defined $trap ) { MCE->printf(\*STDERR, "$0: %s\n", Net::SNMPTrapd->error()); next; } elsif ( $trap == 0 ) { next; } $start = time(), MCE->do('set_start', $start) unless $start; # important, remove the file handle inside the object delete $trap->{_UDPSERVER_}; # enqueue the trap for a consumer to process $queue->enqueue($trap); # leave the loop after 10,000 traps last if ( ++$count >= 10000 ); # safety to prevent the queue from consuming memory out of control # $queue->await(2000) if ( $count % 4000 == 0 ); # if ( ( $count % 4000 == 0 ) && $queue->pending() > 3000 ) { # MCE->say(\*STDERR, "Warn: blocking temporarily"); # $queue->await(2000); # } # reset the counter to not overflow $count = 0 if ( $count > 2e9 ); } $queue->enqueue((undef) x $max_consumers); MCE->printf(\*STDERR, "enqueue : %0.03f seconds\n", time - $start); MCE->printf(\*STDERR, "pending : %d\n", $queue->pending()); } sub consumer { while ( defined ( my $trap = $queue->dequeue() ) ) { $trap->process_trap(); MCE->printf( "[$$] %s\t%i\t%i\t%s\n", $trap->remoteaddr, $trap->remoteport, $trap->version, $trap->community ); sleep 0.004; } }