#!/usr/bin/perl
use 5.12.2;
# so that it should run under cron with the right DB2 environment
BEGIN {
unless ($ENV{DB2INSTANCE})
{
exec (
". ~db2inst1/sqllib/db2profile; exec $^X $0" .
join '', map { qq[ "$_"] } @ARGV
);
}
}
use strict;
use warnings;
# for easier testing in my module build tree.
use blib;
use CBStats::Fetch;
use Daemon::Generic;
newdaemon();
sub isfg { shift->{gd_foreground} }
sub gd_preconfig
{
my $self = shift;
()
}
sub gd_run
{
my $self = shift;
my $fetcher = CBStats::Fetch->new();
$fetcher->run(
quiet => !$self->isfg(),
daemon => $self,
);
}
# let the CBStats::Fetch module set up its own
# signals by preventing Daemon::Generic from doing so.
sub gd_setup_signals { }
# probably not needed anymore.
sub gd_redirect_output
{
my $self = shift;
return if $self->isfg;
$self->SUPER::gd_redirect_output();
my $s = select STDOUT; $|=1;
select STDERR; $|=1;
select $s;
}
####
package CBStats::Fetch;
use 5.12.2;
use Exporter qw(import);
our @EXPORT = qw(pm_get);
use IO::Handle;
use EV;
use AnyEvent;
use Coro;
use Coro::Debug;
#unlink '/tmp/cbstats-debug.sock';
#our $server = new_unix_server Coro::Debug '/tmp/cbstats-debug.sock';
use CBStats::db;
use CBStats::MultiChannel;
use POSIX qw(strftime);
use XML::Simple;
use Guard;
use Data::Dump qw(dump);
use AnyEvent::HTTP;
sub pm_get
{
my %opts = @_;
my $url = 'http://perlmonks.org/?';
$url .= join ';', map { "$_=$opts{$_}" } grep { defined $opts{$_} } keys %opts;
http_get($url, 'User-Agent' => 'Tanktalus CB Stat 0.1', Coro::rouse_cb());
my ($body, $hdr) = Coro::rouse_wait();
$hdr->{Status} =~ /^2/ or die "Failed to retrieve $url";
return $body;
}
# parameters:
# quiet => true/false
# this simply makes it easier to check the object from within Coro::Debug
# otherwise not really required.
our $fetcher;
sub new
{
my $class = shift;
my $self = bless {@_}, $class;
$fetcher = $self;
$self->{multichannel} = CBStats::MultiChannel->new();
$self->setup();
$self;
}
sub setup
{
my $self = shift;
# here I create all my threads and signals.
# I keep track of them so I can ->ready() them later if need be.
$self->{do_fetch} = async { $self->fetch_em() };
$self->{do_save} = async { $self->save_em(@_) } $self->{multichannel}->add();
$self->{do_scan} = async { $self->scan_em() };
# set up some priorities - more important to fetch than do anything
# else.
$self->{do_fetch}->nice(-1);
#$self->{do_save}->nice(1);
$self->{do_scan}->nice(1);
$self->{fetch_hup} = AnyEvent->signal(signal => 'HUP',
cb => sub {
# if I get a HUP signal, we'll
# mark fetch/scan to run at their
# next opportunity.
$self->update_status("HUP received...");
$self->{do_fetch}->ready() if $self->{do_fetch};
$self->{do_scan}->ready() if $self->{do_scan};
});
$self->{fetch_int} = AnyEvent->signal(signal => 'INT', cb => sub {
$self->update_status("INT received, quitting...");
EV::unloop;
});
}
sub run
{
my $self = shift;
# "main" program. Set up a periodic timer that will
# get called every 5 * 60 seconds, i.e., 5 minutes. This
# will kick off the fetch.
my $f = EV::periodic 0, 5 * 60, 0, sub {
$self->{do_fetch}->ready();
};
# start the socket-listening.
$self->sockit();
EV::loop;
}
sub is_quiet { shift->{quiet} }
sub timestamp
{
my $time = strftime "[%Y/%m/%d %H:%M:%S]", localtime;
}
sub update_status
{
my $self = shift;
return if $self->is_quiet();
say timestamp, ' ', @_;
}
sub sem
{
my $self = shift;
my $sem = shift;
$self->{fetch_sem}{$sem} ||= Coro::Semaphore->new();
$self->{fetch_sem}{$sem}->guard();
}
# Data::Dump output from the xml ticker's conversion to
# perl data structure via XML::Simple. Keep it for easy
# use later.
# Fetch.pm:84: {
# info => {
# content => "Rendered by the New Chatterbox XML Ticker",
# count => 1,
# fromid => 1308780,
# gentimeGMT => "2011-05-10 23:00:33",
# lastid => 1308788,
# site => "http://perlmonks.org/",
# sitename => "PerlMonks",
# ticker_id => 207304,
# xmlmaker => "XML::Fling 1.001",
# xmlstyle => "clean,modern",
# },
# message => [
# {
# author => "Tanktalus",
# author_user => 421114,
# createdepoch => 1305068429,
# createdgmtime => "2011-05-10 23:00:29",
# message_id => 1308788,
# msgtext => "/me gets lonely",
# parsed => "/me gets lonely",
# },
# ],
# }
sub fetch_em
{
my $self = shift;
my $db = CBStats::db->new();
my $logs = $db->get_table('Logs');
my $last_id = do {
my $guard = $self->sem('db');
$logs->find_where('PMID IN (SELECT MAX(PMID) FROM !!!)');
};
$last_id = $last_id->pmid() if $last_id;
while (1)
{
$self->update_status("Fetching...");
# debugging aids: see how many threads are listening to this.
$self->update_status($self->{multichannel}->count() , " listeners...");
$self->update_status($self->{multichannel}->status());
# eval so that pm_get can die when PM returns an error, thus
# skipping everything here.
eval {
my $xml = pm_get(
node_id => 207304,
xmlstyle => 'modern',
fromid => $last_id,
);
my $cb = XMLin(
$xml,
ForceArray => ['message'],
);
if ($cb and $cb->{message})
{
$self->{multichannel}->put(@{$cb->{message}});
$last_id = $cb->{message}[-1]{message_id};
}
1;
} or do {
my $e = $@;
if ($e !~ /Failed to retrieve/)
{
say "Fetch died with: $@";
}
};
# when we're done, we could sleep - but the EV::periodic is just
# easier.
Coro::schedule();
}
}
sub save_em
{
my $self = shift;
my $channel = shift;
my $db = CBStats::db->new();
my $logs = $db->get_table('Logs');
# the channel->get waits on a semaphore for something to be ready.
# Thus, we go to sleep until something to do.
while (my $msg = $channel->get())
{
$self->update_status("... saving $msg->{message_id} by $msg->{author}");
{
# here we call the code to transform the input into our db
# format.
my $guard = $self->sem('db');
$logs->log_from_msg($msg)
}
# if there's nothing else to save, kick off the scan.
$self->{do_scan}->ready() unless $channel->size();
}
}
sub scan_em
{
my $self = shift;
my $db = CBStats::db->new();
my @tbls = map { $db->get_table($_) } qw/Logs ReferU ReferW Karma/;
while (1)
{
$self->update_status("Scanning...");
for my $tbl (@tbls)
{
my $guard = $self->sem('db');
$tbl->scan($self->{quiet});
Coro::cede(); # in case there's something more important to do.
}
Coro::schedule();
}
}
use AnyEvent::Socket;
use Coro::Handle;
use Guard;
sub sockit
{
my $self = shift;
unlink '/tmp/cbstats.sock';
AnyEvent::Socket::tcp_server "unix/", "/tmp/cbstats.sock", sub {
my ($fh) =
unblock
shift;
async_pool {
my $channel = $self->{multichannel}->add();
scope_guard { undef $channel; $self->update_status("Guard exit"); };
$fh->autoflush(1);
$fh->print("Welcome!\n\n");
$self->update_status("Socket connection added");
while (my $msg = $channel->get())
{
last unless $fh->writable();
last unless $fh->print($msg->{author}, " said '", $msg->{msgtext}, "'\n");
}
$self->update_status("Socket connection ended");
};
return;
};
return;
}
1;
####
package CBStats::MultiChannel;
use 5.12.2;
use Coro::Channel;
use Scalar::Util qw(weaken);
sub new
{
my $class = shift;
my $self = bless [], $class;
$self;
}
# clear out any channels that have gone away
sub clean
{
my $self = shift;
@$self = grep { defined } @$self;
# when we pull out the refs this way, they're
# no longer weakened, so re-weaking everything.
# (easier than using splice to pull undef items out -
# if we get too many readers, we'll re-evaluate if this
# is slow.)
weaken($_) for @$self;
}
sub count
{
my $self = shift;
$self->clean();
scalar @$self;
}
sub status
{
my $self = shift;
$self->clean();
'Channel=size :: ' . join ":", map { $_ . "=" . $self->[$_]->size() } 0..$#$self;
}
# create new channel, add it to $self, ensure it's weakened, and return
# the non-weak version.
sub add
{
my $self = shift;
my $channel = Coro::Channel->new();
push @$self, $channel;
$self->clean();
$channel;
}
# pass a message to all listeners.
sub put
{
my $self = shift;
$self->clean();
for my $msg (@_)
{
# if we were really multi-threaded, we'd still
# have to check if $_ was defined, but Coro eliminates
# that possibility since nothing else really runs between
# the clean() above and this (we don't cede)
$_->put($msg) for (@$self);
}
}
1;