#!/usr/bin/perl package SNMP::Query::AsynchMulti; # Pragmas use strict; use warnings; # Standard use Carp; # Cpan use SNMP; # This comes in handy so we don't pass bogus # parameters to SNMP::Session->new() our @valid_sess_params = qw( DestHost Community Version RemotePort Timeout Retries RetryNoSuch SecName SecLevel SecEngineId ContextEngineId Context AuthProto AuthPass PrivProto PrivPass AuthMasterKey PrivMasterKey AuthLocalizedKey PrivLocalizedKey VarFormats TypeFormats UseLongNames UseSprintValue UseEnums UseNumeric BestGuess NonIncreasing ); #---------------------------------------------------------- # Constructor sub new { my $class = shift; my $self = bless {}, $class; $self->{query_stack} = []; $self->{results} = []; $self->{max_in_flight} = 10; $self->{current_in_flight} = 0; $self->{total_this_run} = 0; $self->{grand_total} = 0; return $self; } # Verifies that the named parameter is a subref sub _check_param_callback { my $self = shift; my $param_name = shift; # string, hash key my $params = shift; # hashref return 1 unless exists $params->{$param_name}; croak "Bad parameter for [$param_name] - not a CODE ref" if ref $params->{$param_name} ne 'CODE'; return 1; } # TODO Fill in the code and use in the add() or _add_getbulk_query() m +ethods # Verifies that the named parameter is something the SNMP # module can use as a VarBind or VarBindList. sub _check_param_varbinds { my $self = shift; my $param_name = shift; # string, hash key my $params = shift; # hashref } sub add { my $self = shift; my $params = shift; my $query_stack = $self->{query_stack}; # These are required for all query ops so make sure they're presen +t. my $varbinds = $params->{VarBinds} || croak "Bad or missing parameter [VarBinds]"; my $query_type = $params->{QueryType} || croak "Bad or missing parameter [QueryType]"; # Make sure our callback params are valid. $self->_check_param_callback( 'PreCallback', $params ); $self->_check_param_callback( 'PostCallback', $params ); if ( $query_type eq 'getbulk' ) { my $query = $self->_make_getbulk_query($params); my $query_stack = $self->{query_stack}; push @$query_stack, $query; } else { croak "Attempt to add using unknown query type: $query_type\n" +; } return 1; } sub _make_getbulk_query { my $self = shift; my ($query_info) = @_; # These params are required for a getbulk query op. my $non_repeaters = exists $query_info->{NonRepeaters} ? $query_info->{NonRepeaters} : croak "Bad or missing parameter [NonRepeaters]"; my $max_repeaters = exists $query_info->{MaxRepeaters} ? $query_info->{MaxRepeaters} : croak "Bad or missing parameter [MaxRepeaters]"; # Currently, these are validated in the add() method, so no need h +ere. my $preop_callback = $query_info->{PreCallback}; my $postop_callback = $query_info->{PostCallback}; # TODO I may want to add a method to validate # and/or transform the VarBinds parameter my $varbinds = $query_info->{VarBinds}; # Parse out the parameters for creating the session. # I really think I should be validating them better here... # Maybe I need a separate subroutine... # TODO write the routine described above. my %sess_params; $sess_params{$_} = $query_info->{$_} for grep { exists $query_info->{$_} } @valid_sess_params; return sub { my $callback = sub { my $bulkwalk_results = shift; # Store the results and info about the query for later... push @{ $self->{results} }, $query_info, $bulkwalk_results +; $postop_callback->() if $postop_callback; $self->{current_in_flight}--; if ( scalar @{ $self->{query_stack} } ) { my $next_query = pop @{ $self->{query_stack} }; return $next_query->(); } $self->{current_in_flight} <= 0 ? return SNMP::finish() : +return 1; }; my $sess = SNMP::Session->new(%sess_params); $preop_callback->() if $preop_callback; $self->{current_in_flight}++; $self->{total_this_run}++; $self->{grand_total}++; return $sess->bulkwalk( $non_repeaters, $max_repeaters, $varbi +nds, [$callback] ); }; } sub current_in_flight { shift->{current_in_flight} } sub total_this_run { shift->{total_this_run} } sub grand_total { shift->{grand_total} } sub shuffle { } sub execute { my $self = shift; my $params = shift; # The KeepLast option can come in handy if, for example, another # thread or process is working on the contents of the results # array from a previous execution and may not finish before the # next execution. @{ $self->{results} } = () unless ( # I'll make my OWN idioms from now on, HAHA! (You can explicit +ly # set keeplast or it will use the object's default) defined $params->{KeepLast} ? $params->{KeepLast} : $self->{Ke +epLast} ); # Determine our maximum concurrency level for this run my $max_in_flight = $params->{InFlight} || $self->{max_in_flight}; # Make a copy of the stack in case we want to run the same query my $query_stack_ref = $self->{query_stack}; my @query_stack_copy = @{ $self->{query_stack} }; # Set some counters $self->{current_in_flight} = 0; $self->{total_this_run} = 0; # Begin issuing operations while ( scalar @$query_stack_ref ) { my $query = pop @$query_stack_ref; $query->(); last if $self->{current_in_flight} >= $max_in_flight; } # Wait for the ops to complete, or time-out (if specified) $params->{MasterTimeout} ? SNMP::MainLoop( $params->{MasterTimeout}, &SNMP::finish() ) : SNMP::MainLoop(); # Reset the stack for the next run. $self->{query_stack} = \@query_stack_copy; return $self->get_results(); } # Returns a reference to a copy of the results of the last query exec +ution sub get_results { return \@{ shift->{results} }; } 1; __END__
#!/usr/bin/perl use strict; use warnings; use Data::Dumper; use Carp; use Parse::CSV; use SNMP; use SNMP::Query::AsynchMulti; #--------------------------------------------------------- my $csv_file = shift || die "Please specify a CSV file with SNMP host +info!"; my $max_inflight = shift || 50; my $num_cycles = shift || 1; my $master_timeout = 0; # Set to number of seconds before # all queries are terminated. # 0 means no master timeout. my $batch_size = 50; # Run a callback whenever this many # results have been returned my @varbinds = qw( ifDescr ifInOctets ifOutOctets ifAlias ifType ifName ifInErrors ifOutErrors ifSpeed ifAdminStatus ifOperStatus ); my @reqired_fields = qw(HOSTIP COMMUNITY SNMPVER SNMPPORT); my @hosts = read_hosts_csv( $csv_file, @reqired_fields ); @hosts = clean_hosts_data( \@hosts ); # This object encapsulates the desired queries to run. my $query = SNMP::Query::AsynchMulti->new(); # This probably isn't necessary, but it's the Right Thing To Do. my $varlist = SNMP::VarList->new( map { [$_] } @varbinds ); my $preop_callback = sub { warn "+ IF/CT/GT: " . $query->current_in_flight() . "/" . $query->total_this_run() . "/" . $query->grand_total() . "\n"; }; my $postop_callback = sub { warn "- IF/CT/GT: " . $query->current_in_flight() . "/" . $query->total_this_run() . "/" . $query->grand_total() . "\n"; }; foreach my $host (@hosts) { $query->add( { # Params concerning the SNMP Session DestHost => $host->{HOSTIP}, Community => $host->{COMMUNITY}, Version => $host->{SNMPVER}, RemotePort => $host->{SNMPPORT}, #Timeout => $host->{SNMP_TIMEOUT}, #Retries => $host->{SNMP_RETRIES}, # Params concerning the type of query operation QueryType => 'getbulk', # See POD for explanation... MaxRepeaters => 20, # Additional options depend +on NonRepeaters => 0, # the QueryType... # The varbinds to be operated on VarBinds => $varlist, # Callbacks before and after this query op. PreCallback => $preop_callback, # Do this before the +op PostCallback => $postop_callback, # Do this after the o +p } ); warn "Added query to: $host->{HOSTIP}\n"; } warn "Shuffling queries (not yet implemented)\n"; $query->shuffle(); # Randomize order of queries...(not yet implemen +ted) # Run all the added queries with up to $max_inflight # asynchronous operations in-flight at any time. # Lather, rinse, repeat for $num_cycles. warn "Beginning polling cycle\n"; #use DDS; #warn Dump $query; exit; foreach my $iter ( 1 .. $num_cycles ) { sleep 30 unless $iter == 1; my $results = $query->execute( { InFlight => $max_inflight, MasterTimeout => $master_timeout, # TODO Implement the batching functionality BatchSize => 50, # Number of results that makes up a 'b +atch' BatchCallback => sub { 1 } , # Sub to call when a batch is done (or all results ar +e in) } ); print Dumper $results; } exit; #--------------------------------------------------------- # Read in the CSV file. sub read_hosts_csv { my $file = shift; my @required_fields = @_; # Parse entries from a CSV file into hashes hash my $csv_parser = Parse::CSV->new( file => $file, fields => 'auto', # Use the first line as column headers, # which become the hash keys. ); my @node_cfg; # Return a reference to this my $line_num = 0; while ( my $line = $csv_parser->fetch() ) { $line_num++; my $error_flag = 0; foreach my $field (@required_fields) { if ( !exists $line->{$field} ) { $error_flag = 1; carp "Missing field [$field] on line [$line_num] in CSV file [$file]"; } } croak "Terminating due to errors on line [$line_num] in CSV file [ +$file]" if $error_flag; push @node_cfg, $line; } if ( $csv_parser->errstr() ) { croak "Fatal error parsing [$file]: " . $csv_parser->errstr(); } return @node_cfg; } sub clean_hosts_data { my $hosts_data = shift; my @clean_hosts; foreach my $host (@$hosts_data) { # Maybe put in a loop to scrub leading and trailing # whitespace from each field? Yeah, I know. map in # void context is the devil's work, yadda, yadda. map { s/^\s*|\s*$//g } values %$host; if ( $host->{SNMPVER} == 2 #=~ /^1|2c?|3$/ && $host->{SNMPPORT} =~ /^\d+$/ && $host->{HOSTIP} =~ /^(?:\d{1,3}\.){3}\d{1,3}$/ # Flawed, but Good Enough. && $host->{COMMUNITY} ) { push @clean_hosts, $host; } else { warn "Invalid host data - skipping:\n" . " " . Dumper($host) . "\n"; } } return @clean_hosts; } 1; __END__
NODE_NAME,NODE_TYPE,COMMUNITY,HOSTIP,SNMPPORT,SNMPVER camel.mystuff.com,CPE-3810,foobar,10.1.15.62,161,2 somename.mystuff.com,CPE-2420,wheebaz,10.2.7.45,161,2 buhziuhuh.mystuff.com,CPE-2420,nipnop,10.2.7.40,161,2 salma-hayek.mystuff.com,CPE-2420,hummahumma,10.1.14.16,161,2 zippy.mystuff.com,CPE-2420,woo!woo,10.2.6.41,161,2 napoleon.mystuff.com,CPE-2420,vote4pedro,10.2.7.35,161,2 wjul.mystuff.com,CPE-3810,hoosurdaddy,10.1.15.72,161,2 telephone.mystuff.com,CPE-3660,yipyipyip,10.1.15.42,161,2 brrrrrring.mystuff.com,CPE-3660,uuuuuh-huh,10.1.15.73,161,2
In reply to UPDATED CODE: More features, faster, and more scalable
by Hercynium
in thread RFC: A module for SNMP polling of thousands of nodes simultaneously
by Hercynium
| For: | Use: | ||
| & | & | ||
| < | < | ||
| > | > | ||
| [ | [ | ||
| ] | ] |