Can't call method "reader" on an undefined value at /usr/local/lib/perl5/site_perl/5.8.6/Net/Traceroute.pm line 412.
####
Can't get tcp protocol by name at ...
####
#!/usr/local/bin/perl -w
package speak_db_names2;
@ISA = qw{Net::Server::PreForkSimple};
use strict;
use Cache::FastMmap;
use Config::Auto;
use Data::Dumper;
use DBI;
use File::Basename;
use Net::Nslookup;
use Net::Ping;
use Net::Server::PreForkSimple;
use Net::Traceroute;
use POSIX qw{:signal_h};
use Proc::Simple;
use Storable qw{nstore_fd};
use Time::Format qw{%time};
use Time::HiRes qw{time sleep};
## configuration information {{{
my $SERVER_FILE = dirname($0) . "/DB_NAMES2";
my $config = Config::Auto::parse($SERVER_FILE);
my $servers;
my $preferred_order_prefs;
## Loop through the config values and ensure that each config option (servers) are array refs:
while ( my ( $key, $value ) = each %$config ) {
## only do this to server storage config options
if ( $key =~ /.+_servers$/ ) {
if ( ref($value) ) {
$servers->{$key} = $value;
}
else {
$servers->{$key} = [ $value ];
}
## Set a preferred server order pref for each known group
if ( exists $config->{$key . "_have_preferred_order"} ) {
$preferred_order_prefs->{$key} = $config->{$key . "_have_preferred_order"};
}
else {
$preferred_order_prefs->{$key} = 0;
}
}
}
my $cache_timeout = 7; # Allow things to live this long in the cache
my $verbose = 0; # Be verbose to logfile
my $prefer = 5; # Weighting for preferred servers
my $DEBUG = 5;
## Problem Weights. These are multipliers for each specific metric. Lower is better on the check scale
my %prob_wghts = (
checks => 100,
connect_time => 1000, ## This number is normally in seconds, ie .00823. This makes it significant
ping => 1000, ## Same here
hops => 1,
processes => .1,
slow_queries => 10,
);
## }}}
my $ping = Net::Ping->new('icmp', 2)
or die "Cannot initialize Net::Ping: $!";
$ping->hires(1);
## Make a server instance, and call the run method. From here forward, things are handled by the hooks provided by the Net::Server::PreForkSimple class
my $server = bless { }, 'speak_db_names2';
$server->run(conf_file => dirname($0) . '/speak_db_names2.conf');
## subs below
## sub check_health {{{
sub check_health {
my %healthy_servers_hash;
## run health checks {{{
while ( my ($group, $these_servers) = each %$servers ) {
## health is evaluated for the group.
my %health;
foreach my $host ( @$these_servers ) {
$Net::Nslookup::TIMEOUT = 2;
if ( $DEBUG >= 4 ) {
$Net::Nslookup::DEBUG = 1;
}
my $ip = nslookup($host);
debug(3, "Nslookup $host: $ip");
if ( my $ping_time = check_ping($ip) ) {
$health{$host}{ping} = $ping_time;
}
else {
next;
}
if ( my $traceroute = check_traceroute($ip) ) {
$health{$host}{hops} = sprintf('%2d', $traceroute);
}
else {
next;
}
## Get a db handle and check the connection time
my ($dbh, $connect_time) = check_connect_time($ip);
debug(5, "$host DBH = " . Dumper($dbh));
debug(5, "$host DB connect_time = " . Dumper($connect_time));
## Return of -1 means that we were not able to connect after the allowed timeout
if ( $dbh ) {
$health{$host}{connect_time} = $connect_time;
}
else {
next;
}
## Check the running processlist on the server
if ( my $procs = check_processes($dbh) ) {
while ( my ($key, $value) = each %$procs ) {
$health{$host}{$key} = $value;
}
}
else {
next;
}
$health{$host}{host_ok} = 1;
}
my $health = "Health hash for group $group: \n" . join(" ", Dumper(%health));
debug(3, $health, 1);
my @ordered_servers = evaluate_server_order(\%health, $these_servers, $group);
$healthy_servers_hash{ $group } = \@ordered_servers;
}
## }}}
return \%healthy_servers_hash;
}
## }}}
## sub evaluate_server_order {{{
sub evaluate_server_order {
my $health = shift;
my $pref_order = shift;
my $group = shift;
my %server_order;
verbose(undef, $group);
## Loop through the metrics and apply weights
foreach my $metric ( qw{ connect_time checks
ping processes
slow_queries hops
} ) {
## Order the metrics from lowest to greatest
my %tmp_hash =
map {
$_ => $health->{$_}{$metric}
}
grep {
$health->{$_}{host_ok}
} keys %$health;
foreach ( keys %tmp_hash ) {
$server_order{$_} += sprintf('%6d', ( $tmp_hash{$_} * $prob_wghts{$metric} ));
}
verbose(\%tmp_hash, $metric)
}
verbose(\%server_order, 'Server Order');
## Add a multiplier for preferred servers
if ( $preferred_order_prefs->{$group} ) {
if ($prefer) {
foreach (0 .. $#$pref_order) {
if ($server_order{$pref_order->[$_]}) {
$server_order{$pref_order->[$_]} += $_ * $prefer;
}
}
}
}
my @server_order = sort { $server_order{$a} <=> $server_order{$b} } keys %server_order;
return @server_order;
}
## }}}
## sub process_request {{{
sub process_request {
my $self = shift;
my $serve_data;
for ( 1 .. 100 ) {
$serve_data = $self->{'cache'}->get('servers');
if ( defined $serve_data ) {
last;
}
elsif ( ! $self->{updater}->poll() ) {
debug(2, "The Updater is dead... long live the Updater!");
$self->{updater}->kill();
$self->{updater}->start(\&update_cache, $self);
}
sleep .25
}
nstore_fd $serve_data, \*STDOUT;
}
## }}}
## sub pre_server_close_hook {{{
sub pre_server_close_hook {
my $self = shift;
$self->{'updater'}->kill();
}
## }}}
## sub pre_loop_hook {{{
sub pre_loop_hook {
my $self = shift;
$self->{cache} = Cache::FastMmap->new( cache_size => '1k', init_file => 1, expire_time => $cache_timeout );
$self->{updater} = new Proc::Simple();
$self->{updater}->start(\&update_cache, $self);
}
## }}}
## sub update_cache {{{
sub update_cache {
my $self = shift;
my $seconds_since_update = 0;
while (1) {
my $serve_data = $self->{'cache'}->get('servers');
if ( ! defined $serve_data or $seconds_since_update >= $cache_timeout / 2 ) {
my $update_start = time;
debug(2, "Updater reloading cache....");
$serve_data = $self->check_health();
my $update_end = time;
my $update_duration = sprintf('%2.2f', $update_end - $update_start);
debug(2, "Update duration: $update_duration seconds.");
$seconds_since_update = 0;
debug(1, "Servers now: " . Dumper($serve_data) );
$self->{'cache'}->set('servers', $serve_data);
}
else {
$seconds_since_update += .5;
sleep .5;
debug(3, "Seconds since last update: $seconds_since_update.");
}
}
}
## }}}
## sub verbose {{{
sub verbose {
my $hash = shift;
my $type = shift;
if ($verbose) {
my $info;
if ( not defined($hash) ) {
$info = sprintf("%-20s %6s: %20s - \n", $time{'yyyy-mm-dd hh:mm:ss'},$$,$type);
}
else {
$info .= sprintf("%10s\n", $type);
}
foreach my $key ( sort { $hash->{$a} <=> $hash->{$b} } keys %$hash ) {
$info .= sprintf("%40s => %6d\n", $key, $hash->{$key});
}
Net::Server::write_to_log_hook($server, 3, $info);
}
}
## }}}
## Health checking functions: {{{
## sub check_connect_time {{{
sub check_connect_time {
my $db_host = shift;
debug(4, "Checking connect time for $db_host");
my $dbh;
my $connect_time = '';
my $seconds = 2;
my $mask = POSIX::SigSet->new( SIGALRM ); # signals to mask in the handler
my $action = POSIX::SigAction->new(
sub { die "connect timeout" }, # the handler code ref
$mask,
);
my $oldaction = POSIX::SigAction->new();
sigaction( &POSIX::SIGALRM, $action, $oldaction );
eval {
alarm($seconds);
my $start_time = time;
$dbh = DBI->connect_cached("DBI:mysql:database=test;host=$db_host", 'check_health', '*********',
{ RaiseError => 1, PrintError => 0 })
or die "Could not connect to $db_host: " . $DBI::errstr;
my $end_time = time;
$connect_time = $end_time - $start_time;
alarm 0; # cancel alarm (if code ran fast)
};
sigaction( &POSIX::SIGALRM, $oldaction ); # restore original signal handler
if ( $@ ) {
debug(4, "Problem!: $@");
return ( 0, undef );
}
else {
debug(4, "OK - $connect_time seconds");
return ( $dbh, $connect_time );
}
}
## }}}
## sub check_ping {{{
sub check_ping {
my $host = shift;
debug(4, "Running ping for $host: ");
my $round_trip;
eval {
## Ping the server for initial connectivity check
if ( my @ping_ret = $ping->ping($host) ) {
debug(4, "OK - $ping_ret[1] seconds");
$round_trip = $ping_ret[1];
}
};
if ( $@ ) {
debug(1, "Problem with ping: $@");
}
if ($round_trip) {
return $round_trip;
}
debug(4, "Not OK!");
return 0;
}
## }}}
## sub check_processess {{{
sub check_processes {
my $dbh = shift;
my %health;
debug(4, "Running process check: ");
if ( my $ret = $dbh->selectall_arrayref('show processlist') ) {
$health{processes} = scalar(@$ret);
$health{slow_queries} = scalar(
grep {
/(?:lock)/i
}
map {
$_->[6] if defined $_->[6]
}
@$ret );
$health{checks} = scalar(
grep {
/(?:check|alter|repair)/i
}
map {
$_->[6] if defined $_->[6]
} @$ret );
$health{checks} += scalar(
grep {
/(?:check|alter|repair)/i
}
map {
$_->[7] if defined $_->[7]
} @$ret );
$dbh->disconnect;
debug(4, "OK");
return \%health;
}
debug(4, "Problem");
$dbh->disconnect;
return 0;
}
## }}}
## sub check_traceroute {{{
sub check_traceroute {
my $host = shift;
debug(4, "Running traceroute for $host: ");
my $tr;
my $hops;
eval {
## Lets see some traceroute info:
$tr = Net::Traceroute->new( host => $host, timeout => 2, queries => 1, use_icmp => 1 )
or die "Cannot initialize Net::Traceroute: $!";
if ( $tr->found ) {
debug(4, "OK - " . $tr->hops . " hops.");
$hops = $tr->hops;
}
};
if ($@) {
debug(1, "Problem with traceroute: $@");
}
if ( $hops ) {
return $hops;
}
debug(4, "Problem!");
return 0;
}
## }}}
## }}}
sub debug {
my $level = shift;
my $message = shift;
my $nostamp = shift;
my $stamp = '';
unless ( $nostamp ) {
$stamp = "$time{'yyyy/mm/dd hh:mm:ss'} $$: ";
}
if ( $DEBUG >= $level ) {
Net::Server::write_to_log_hook($server, 3, $stamp . $message);
}
}