#!/usr/local/bin/perl -w package speak_db_names2; @ISA = qw{Net::Server::PreForkSimple}; use strict; use Cache::FastMmap; use Config::Auto; use Data::Dumper; use DBI; use File::Basename; use Net::Nslookup; use Net::Ping; use Net::Server::PreForkSimple; use Net::Traceroute; use POSIX qw{:signal_h}; use Proc::Simple; use Storable qw{nstore_fd}; use Time::Format qw{%time}; use Time::HiRes qw{time sleep}; ## configuration information {{{ my $SERVER_FILE = dirname($0) . "/DB_NAMES2"; my $config = Config::Auto::parse($SERVER_FILE); my $servers; my $preferred_order_prefs; ## Loop through the config values and ensure that each config option (servers) are array refs: while ( my ( $key, $value ) = each %$config ) { ## only do this to server storage config options if ( $key =~ /.+_servers$/ ) { if ( ref($value) ) { $servers->{$key} = $value; } else { $servers->{$key} = [ $value ]; } ## Set a preferred server order pref for each known group if ( exists $config->{$key . "_have_preferred_order"} ) { $preferred_order_prefs->{$key} = $config->{$key . "_have_preferred_order"}; } else { $preferred_order_prefs->{$key} = 0; } } } my $cache_timeout = 7; # Allow things to live this long in the cache my $verbose = 0; # Be verbose to logfile my $prefer = 5; # Weighting for preferred servers my $DEBUG = 5; ## Problem Weights. These are multipliers for each specific metric. Lower is better on the check scale my %prob_wghts = ( checks => 100, connect_time => 1000, ## This number is normally in seconds, ie .00823. This makes it significant ping => 1000, ## Same here hops => 1, processes => .1, slow_queries => 10, ); ## }}} my $ping = Net::Ping->new('icmp', 2) or die "Cannot initialize Net::Ping: $!"; $ping->hires(1); ## Make a server instance, and call the run method. From here forward, things are handled by the hooks provided by the Net::Server::PreForkSimple class my $server = bless { }, 'speak_db_names2'; $server->run(conf_file => dirname($0) . '/speak_db_names2.conf'); ## subs below ## sub check_health {{{ sub check_health { my %healthy_servers_hash; ## run health checks {{{ while ( my ($group, $these_servers) = each %$servers ) { ## health is evaluated for the group. my %health; foreach my $host ( @$these_servers ) { $Net::Nslookup::TIMEOUT = 2; if ( $DEBUG >= 4 ) { $Net::Nslookup::DEBUG = 1; } my $ip = nslookup($host); debug(3, "Nslookup $host: $ip"); if ( my $ping_time = check_ping($ip) ) { $health{$host}{ping} = $ping_time; } else { next; } if ( my $traceroute = check_traceroute($ip) ) { $health{$host}{hops} = sprintf('%2d', $traceroute); } else { next; } ## Get a db handle and check the connection time my ($dbh, $connect_time) = check_connect_time($ip); debug(5, "$host DBH = " . Dumper($dbh)); debug(5, "$host DB connect_time = " . Dumper($connect_time)); ## Return of -1 means that we were not able to connect after the allowed timeout if ( $dbh ) { $health{$host}{connect_time} = $connect_time; } else { next; } ## Check the running processlist on the server if ( my $procs = check_processes($dbh) ) { while ( my ($key, $value) = each %$procs ) { $health{$host}{$key} = $value; } } else { next; } $health{$host}{host_ok} = 1; } my $health = "Health hash for group $group: \n" . join(" ", Dumper(%health)); debug(3, $health, 1); my @ordered_servers = evaluate_server_order(\%health, $these_servers, $group); $healthy_servers_hash{ $group } = \@ordered_servers; } ## }}} return \%healthy_servers_hash; } ## }}} ## sub evaluate_server_order {{{ sub evaluate_server_order { my $health = shift; my $pref_order = shift; my $group = shift; my %server_order; verbose(undef, $group); ## Loop through the metrics and apply weights foreach my $metric ( qw{ connect_time checks ping processes slow_queries hops } ) { ## Order the metrics from lowest to greatest my %tmp_hash = map { $_ => $health->{$_}{$metric} } grep { $health->{$_}{host_ok} } keys %$health; foreach ( keys %tmp_hash ) { $server_order{$_} += sprintf('%6d', ( $tmp_hash{$_} * $prob_wghts{$metric} )); } verbose(\%tmp_hash, $metric) } verbose(\%server_order, 'Server Order'); ## Add a multiplier for preferred servers if ( $preferred_order_prefs->{$group} ) { if ($prefer) { foreach (0 .. $#$pref_order) { if ($server_order{$pref_order->[$_]}) { $server_order{$pref_order->[$_]} += $_ * $prefer; } } } } my @server_order = sort { $server_order{$a} <=> $server_order{$b} } keys %server_order; return @server_order; } ## }}} ## sub process_request {{{ sub process_request { my $self = shift; my $serve_data; for ( 1 .. 100 ) { $serve_data = $self->{'cache'}->get('servers'); if ( defined $serve_data ) { last; } elsif ( ! $self->{updater}->poll() ) { debug(2, "The Updater is dead... long live the Updater!"); $self->{updater}->kill(); $self->{updater}->start(\&update_cache, $self); } sleep .25 } nstore_fd $serve_data, \*STDOUT; } ## }}} ## sub pre_server_close_hook {{{ sub pre_server_close_hook { my $self = shift; $self->{'updater'}->kill(); } ## }}} ## sub pre_loop_hook {{{ sub pre_loop_hook { my $self = shift; $self->{cache} = Cache::FastMmap->new( cache_size => '1k', init_file => 1, expire_time => $cache_timeout ); $self->{updater} = new Proc::Simple(); $self->{updater}->start(\&update_cache, $self); } ## }}} ## sub update_cache {{{ sub update_cache { my $self = shift; my $seconds_since_update = 0; while (1) { my $serve_data = $self->{'cache'}->get('servers'); if ( ! defined $serve_data or $seconds_since_update >= $cache_timeout / 2 ) { my $update_start = time; debug(2, "Updater reloading cache...."); $serve_data = $self->check_health(); my $update_end = time; my $update_duration = sprintf('%2.2f', $update_end - $update_start); debug(2, "Update duration: $update_duration seconds."); $seconds_since_update = 0; debug(1, "Servers now: " . Dumper($serve_data) ); $self->{'cache'}->set('servers', $serve_data); } else { $seconds_since_update += .5; sleep .5; debug(3, "Seconds since last update: $seconds_since_update."); } } } ## }}} ## sub verbose {{{ sub verbose { my $hash = shift; my $type = shift; if ($verbose) { my $info; if ( not defined($hash) ) { $info = sprintf("%-20s %6s: %20s - \n", $time{'yyyy-mm-dd hh:mm:ss'},$$,$type); } else { $info .= sprintf("%10s\n", $type); } foreach my $key ( sort { $hash->{$a} <=> $hash->{$b} } keys %$hash ) { $info .= sprintf("%40s => %6d\n", $key, $hash->{$key}); } Net::Server::write_to_log_hook($server, 3, $info); } } ## }}} ## Health checking functions: {{{ ## sub check_connect_time {{{ sub check_connect_time { my $db_host = shift; debug(4, "Checking connect time for $db_host"); my $dbh; my $connect_time = ''; my $seconds = 2; my $mask = POSIX::SigSet->new( SIGALRM ); # signals to mask in the handler my $action = POSIX::SigAction->new( sub { die "connect timeout" }, # the handler code ref $mask, ); my $oldaction = POSIX::SigAction->new(); sigaction( &POSIX::SIGALRM, $action, $oldaction ); eval { alarm($seconds); my $start_time = time; $dbh = DBI->connect_cached("DBI:mysql:database=test;host=$db_host", 'check_health', '*********', { RaiseError => 1, PrintError => 0 }) or die "Could not connect to $db_host: " . $DBI::errstr; my $end_time = time; $connect_time = $end_time - $start_time; alarm 0; # cancel alarm (if code ran fast) }; sigaction( &POSIX::SIGALRM, $oldaction ); # restore original signal handler if ( $@ ) { debug(4, "Problem!: $@"); return ( 0, undef ); } else { debug(4, "OK - $connect_time seconds"); return ( $dbh, $connect_time ); } } ## }}} ## sub check_ping {{{ sub check_ping { my $host = shift; debug(4, "Running ping for $host: "); my $round_trip; eval { ## Ping the server for initial connectivity check if ( my @ping_ret = $ping->ping($host) ) { debug(4, "OK - $ping_ret[1] seconds"); $round_trip = $ping_ret[1]; } }; if ( $@ ) { debug(1, "Problem with ping: $@"); } if ($round_trip) { return $round_trip; } debug(4, "Not OK!"); return 0; } ## }}} ## sub check_processess {{{ sub check_processes { my $dbh = shift; my %health; debug(4, "Running process check: "); if ( my $ret = $dbh->selectall_arrayref('show processlist') ) { $health{processes} = scalar(@$ret); $health{slow_queries} = scalar( grep { /(?:lock)/i } map { $_->[6] if defined $_->[6] } @$ret ); $health{checks} = scalar( grep { /(?:check|alter|repair)/i } map { $_->[6] if defined $_->[6] } @$ret ); $health{checks} += scalar( grep { /(?:check|alter|repair)/i } map { $_->[7] if defined $_->[7] } @$ret ); $dbh->disconnect; debug(4, "OK"); return \%health; } debug(4, "Problem"); $dbh->disconnect; return 0; } ## }}} ## sub check_traceroute {{{ sub check_traceroute { my $host = shift; debug(4, "Running traceroute for $host: "); my $tr; my $hops; eval { ## Lets see some traceroute info: $tr = Net::Traceroute->new( host => $host, timeout => 2, queries => 1, use_icmp => 1 ) or die "Cannot initialize Net::Traceroute: $!"; if ( $tr->found ) { debug(4, "OK - " . $tr->hops . " hops."); $hops = $tr->hops; } }; if ($@) { debug(1, "Problem with traceroute: $@"); } if ( $hops ) { return $hops; } debug(4, "Problem!"); return 0; } ## }}} ## }}} sub debug { my $level = shift; my $message = shift; my $nostamp = shift; my $stamp = ''; unless ( $nostamp ) { $stamp = "$time{'yyyy/mm/dd hh:mm:ss'} $$: "; } if ( $DEBUG >= $level ) { Net::Server::write_to_log_hook($server, 3, $stamp . $message); } }