#!/usr/bin/perl use 5.12.2; # so that it should run under cron with the right DB2 environment BEGIN { unless ($ENV{DB2INSTANCE}) { exec ( ". ~db2inst1/sqllib/db2profile; exec $^X $0" . join '', map { qq[ "$_"] } @ARGV ); } } use strict; use warnings; # for easier testing in my module build tree. use blib; use CBStats::Fetch; use Daemon::Generic; newdaemon(); sub isfg { shift->{gd_foreground} } sub gd_preconfig { my $self = shift; () } sub gd_run { my $self = shift; my $fetcher = CBStats::Fetch->new(); $fetcher->run( quiet => !$self->isfg(), daemon => $self, ); } # let the CBStats::Fetch module set up its own # signals by preventing Daemon::Generic from doing so. sub gd_setup_signals { } # probably not needed anymore. sub gd_redirect_output { my $self = shift; return if $self->isfg; $self->SUPER::gd_redirect_output(); my $s = select STDOUT; $|=1; select STDERR; $|=1; select $s; } #### package CBStats::Fetch; use 5.12.2; use Exporter qw(import); our @EXPORT = qw(pm_get); use IO::Handle; use EV; use AnyEvent; use Coro; use Coro::Debug; #unlink '/tmp/cbstats-debug.sock'; #our $server = new_unix_server Coro::Debug '/tmp/cbstats-debug.sock'; use CBStats::db; use CBStats::MultiChannel; use POSIX qw(strftime); use XML::Simple; use Guard; use Data::Dump qw(dump); use AnyEvent::HTTP; sub pm_get { my %opts = @_; my $url = 'http://perlmonks.org/?'; $url .= join ';', map { "$_=$opts{$_}" } grep { defined $opts{$_} } keys %opts; http_get($url, 'User-Agent' => 'Tanktalus CB Stat 0.1', Coro::rouse_cb()); my ($body, $hdr) = Coro::rouse_wait(); $hdr->{Status} =~ /^2/ or die "Failed to retrieve $url"; return $body; } # parameters: # quiet => true/false # this simply makes it easier to check the object from within Coro::Debug # otherwise not really required. our $fetcher; sub new { my $class = shift; my $self = bless {@_}, $class; $fetcher = $self; $self->{multichannel} = CBStats::MultiChannel->new(); $self->setup(); $self; } sub setup { my $self = shift; # here I create all my threads and signals. # I keep track of them so I can ->ready() them later if need be. $self->{do_fetch} = async { $self->fetch_em() }; $self->{do_save} = async { $self->save_em(@_) } $self->{multichannel}->add(); $self->{do_scan} = async { $self->scan_em() }; # set up some priorities - more important to fetch than do anything # else. $self->{do_fetch}->nice(-1); #$self->{do_save}->nice(1); $self->{do_scan}->nice(1); $self->{fetch_hup} = AnyEvent->signal(signal => 'HUP', cb => sub { # if I get a HUP signal, we'll # mark fetch/scan to run at their # next opportunity. $self->update_status("HUP received..."); $self->{do_fetch}->ready() if $self->{do_fetch}; $self->{do_scan}->ready() if $self->{do_scan}; }); $self->{fetch_int} = AnyEvent->signal(signal => 'INT', cb => sub { $self->update_status("INT received, quitting..."); EV::unloop; }); } sub run { my $self = shift; # "main" program. Set up a periodic timer that will # get called every 5 * 60 seconds, i.e., 5 minutes. This # will kick off the fetch. my $f = EV::periodic 0, 5 * 60, 0, sub { $self->{do_fetch}->ready(); }; # start the socket-listening. $self->sockit(); EV::loop; } sub is_quiet { shift->{quiet} } sub timestamp { my $time = strftime "[%Y/%m/%d %H:%M:%S]", localtime; } sub update_status { my $self = shift; return if $self->is_quiet(); say timestamp, ' ', @_; } sub sem { my $self = shift; my $sem = shift; $self->{fetch_sem}{$sem} ||= Coro::Semaphore->new(); $self->{fetch_sem}{$sem}->guard(); } # Data::Dump output from the xml ticker's conversion to # perl data structure via XML::Simple. Keep it for easy # use later. # Fetch.pm:84: { # info => { # content => "Rendered by the New Chatterbox XML Ticker", # count => 1, # fromid => 1308780, # gentimeGMT => "2011-05-10 23:00:33", # lastid => 1308788, # site => "http://perlmonks.org/", # sitename => "PerlMonks", # ticker_id => 207304, # xmlmaker => "XML::Fling 1.001", # xmlstyle => "clean,modern", # }, # message => [ # { # author => "Tanktalus", # author_user => 421114, # createdepoch => 1305068429, # createdgmtime => "2011-05-10 23:00:29", # message_id => 1308788, # msgtext => "/me gets lonely", # parsed => "/me gets lonely", # }, # ], # } sub fetch_em { my $self = shift; my $db = CBStats::db->new(); my $logs = $db->get_table('Logs'); my $last_id = do { my $guard = $self->sem('db'); $logs->find_where('PMID IN (SELECT MAX(PMID) FROM !!!)'); }; $last_id = $last_id->pmid() if $last_id; while (1) { $self->update_status("Fetching..."); # debugging aids: see how many threads are listening to this. $self->update_status($self->{multichannel}->count() , " listeners..."); $self->update_status($self->{multichannel}->status()); # eval so that pm_get can die when PM returns an error, thus # skipping everything here. eval { my $xml = pm_get( node_id => 207304, xmlstyle => 'modern', fromid => $last_id, ); my $cb = XMLin( $xml, ForceArray => ['message'], ); if ($cb and $cb->{message}) { $self->{multichannel}->put(@{$cb->{message}}); $last_id = $cb->{message}[-1]{message_id}; } 1; } or do { my $e = $@; if ($e !~ /Failed to retrieve/) { say "Fetch died with: $@"; } }; # when we're done, we could sleep - but the EV::periodic is just # easier. Coro::schedule(); } } sub save_em { my $self = shift; my $channel = shift; my $db = CBStats::db->new(); my $logs = $db->get_table('Logs'); # the channel->get waits on a semaphore for something to be ready. # Thus, we go to sleep until something to do. while (my $msg = $channel->get()) { $self->update_status("... saving $msg->{message_id} by $msg->{author}"); { # here we call the code to transform the input into our db # format. my $guard = $self->sem('db'); $logs->log_from_msg($msg) } # if there's nothing else to save, kick off the scan. $self->{do_scan}->ready() unless $channel->size(); } } sub scan_em { my $self = shift; my $db = CBStats::db->new(); my @tbls = map { $db->get_table($_) } qw/Logs ReferU ReferW Karma/; while (1) { $self->update_status("Scanning..."); for my $tbl (@tbls) { my $guard = $self->sem('db'); $tbl->scan($self->{quiet}); Coro::cede(); # in case there's something more important to do. } Coro::schedule(); } } use AnyEvent::Socket; use Coro::Handle; use Guard; sub sockit { my $self = shift; unlink '/tmp/cbstats.sock'; AnyEvent::Socket::tcp_server "unix/", "/tmp/cbstats.sock", sub { my ($fh) = unblock shift; async_pool { my $channel = $self->{multichannel}->add(); scope_guard { undef $channel; $self->update_status("Guard exit"); }; $fh->autoflush(1); $fh->print("Welcome!\n\n"); $self->update_status("Socket connection added"); while (my $msg = $channel->get()) { last unless $fh->writable(); last unless $fh->print($msg->{author}, " said '", $msg->{msgtext}, "'\n"); } $self->update_status("Socket connection ended"); }; return; }; return; } 1; #### package CBStats::MultiChannel; use 5.12.2; use Coro::Channel; use Scalar::Util qw(weaken); sub new { my $class = shift; my $self = bless [], $class; $self; } # clear out any channels that have gone away sub clean { my $self = shift; @$self = grep { defined } @$self; # when we pull out the refs this way, they're # no longer weakened, so re-weaking everything. # (easier than using splice to pull undef items out - # if we get too many readers, we'll re-evaluate if this # is slow.) weaken($_) for @$self; } sub count { my $self = shift; $self->clean(); scalar @$self; } sub status { my $self = shift; $self->clean(); 'Channel=size :: ' . join ":", map { $_ . "=" . $self->[$_]->size() } 0..$#$self; } # create new channel, add it to $self, ensure it's weakened, and return # the non-weak version. sub add { my $self = shift; my $channel = Coro::Channel->new(); push @$self, $channel; $self->clean(); $channel; } # pass a message to all listeners. sub put { my $self = shift; $self->clean(); for my $msg (@_) { # if we were really multi-threaded, we'd still # have to check if $_ was defined, but Coro eliminates # that possibility since nothing else really runs between # the clean() above and this (we don't cede) $_->put($msg) for (@$self); } } 1;