package CBStats::Fetch; use 5.12.2; use Exporter qw(import); our @EXPORT = qw(pm_get); use IO::Handle; use EV; use AnyEvent; use Coro; use Coro::Debug; #unlink '/tmp/cbstats-debug.sock'; #our $server = new_unix_server Coro::Debug '/tmp/cbstats-debug.sock'; use CBStats::db; use CBStats::MultiChannel; use POSIX qw(strftime); use XML::Simple; use Guard; use Data::Dump qw(dump); use AnyEvent::HTTP; sub pm_get { my %opts = @_; my $url = 'http://perlmonks.org/?'; $url .= join ';', map { "$_=$opts{$_}" } grep { defined $opts{$_} } keys %opts; http_get($url, 'User-Agent' => 'Tanktalus CB Stat 0.1', Coro::rouse_cb()); my ($body, $hdr) = Coro::rouse_wait(); $hdr->{Status} =~ /^2/ or die "Failed to retrieve $url"; return $body; } # parameters: # quiet => true/false # this simply makes it easier to check the object from within Coro::Debug # otherwise not really required. our $fetcher; sub new { my $class = shift; my $self = bless {@_}, $class; $fetcher = $self; $self->{multichannel} = CBStats::MultiChannel->new(); $self->setup(); $self; } sub setup { my $self = shift; # here I create all my threads and signals. # I keep track of them so I can ->ready() them later if need be. $self->{do_fetch} = async { $self->fetch_em() }; $self->{do_save} = async { $self->save_em(@_) } $self->{multichannel}->add(); $self->{do_scan} = async { $self->scan_em() }; # set up some priorities - more important to fetch than do anything # else. $self->{do_fetch}->nice(-1); #$self->{do_save}->nice(1); $self->{do_scan}->nice(1); $self->{fetch_hup} = AnyEvent->signal(signal => 'HUP', cb => sub { # if I get a HUP signal, we'll # mark fetch/scan to run at their # next opportunity. $self->update_status("HUP received..."); $self->{do_fetch}->ready() if $self->{do_fetch}; $self->{do_scan}->ready() if $self->{do_scan}; }); $self->{fetch_int} = AnyEvent->signal(signal => 'INT', cb => sub { $self->update_status("INT received, quitting..."); EV::unloop; }); } sub run { my $self = shift; # "main" program. Set up a periodic timer that will # get called every 5 * 60 seconds, i.e., 5 minutes. This # will kick off the fetch. my $f = EV::periodic 0, 5 * 60, 0, sub { $self->{do_fetch}->ready(); }; # start the socket-listening. $self->sockit(); EV::loop; } sub is_quiet { shift->{quiet} } sub timestamp { my $time = strftime "[%Y/%m/%d %H:%M:%S]", localtime; } sub update_status { my $self = shift; return if $self->is_quiet(); say timestamp, ' ', @_; } sub sem { my $self = shift; my $sem = shift; $self->{fetch_sem}{$sem} ||= Coro::Semaphore->new(); $self->{fetch_sem}{$sem}->guard(); } # Data::Dump output from the xml ticker's conversion to # perl data structure via XML::Simple. Keep it for easy # use later. # Fetch.pm:84: { # info => { # content => "Rendered by the New Chatterbox XML Ticker", # count => 1, # fromid => 1308780, # gentimeGMT => "2011-05-10 23:00:33", # lastid => 1308788, # site => "http://perlmonks.org/", # sitename => "PerlMonks", # ticker_id => 207304, # xmlmaker => "XML::Fling 1.001", # xmlstyle => "clean,modern", # }, # message => [ # { # author => "Tanktalus", # author_user => 421114, # createdepoch => 1305068429, # createdgmtime => "2011-05-10 23:00:29", # message_id => 1308788, # msgtext => "/me gets lonely", # parsed => "/me gets lonely", # }, # ], # } sub fetch_em { my $self = shift; my $db = CBStats::db->new(); my $logs = $db->get_table('Logs'); my $last_id = do { my $guard = $self->sem('db'); $logs->find_where('PMID IN (SELECT MAX(PMID) FROM !!!)'); }; $last_id = $last_id->pmid() if $last_id; while (1) { $self->update_status("Fetching..."); # debugging aids: see how many threads are listening to this. $self->update_status($self->{multichannel}->count() , " listeners..."); $self->update_status($self->{multichannel}->status()); # eval so that pm_get can die when PM returns an error, thus # skipping everything here. eval { my $xml = pm_get( node_id => 207304, xmlstyle => 'modern', fromid => $last_id, ); my $cb = XMLin( $xml, ForceArray => ['message'], ); if ($cb and $cb->{message}) { $self->{multichannel}->put(@{$cb->{message}}); $last_id = $cb->{message}[-1]{message_id}; } 1; } or do { my $e = $@; if ($e !~ /Failed to retrieve/) { say "Fetch died with: $@"; } }; # when we're done, we could sleep - but the EV::periodic is just # easier. Coro::schedule(); } } sub save_em { my $self = shift; my $channel = shift; my $db = CBStats::db->new(); my $logs = $db->get_table('Logs'); # the channel->get waits on a semaphore for something to be ready. # Thus, we go to sleep until something to do. while (my $msg = $channel->get()) { $self->update_status("... saving $msg->{message_id} by $msg->{author}"); { # here we call the code to transform the input into our db # format. my $guard = $self->sem('db'); $logs->log_from_msg($msg) } # if there's nothing else to save, kick off the scan. $self->{do_scan}->ready() unless $channel->size(); } } sub scan_em { my $self = shift; my $db = CBStats::db->new(); my @tbls = map { $db->get_table($_) } qw/Logs ReferU ReferW Karma/; while (1) { $self->update_status("Scanning..."); for my $tbl (@tbls) { my $guard = $self->sem('db'); $tbl->scan($self->{quiet}); Coro::cede(); # in case there's something more important to do. } Coro::schedule(); } } use AnyEvent::Socket; use Coro::Handle; use Guard; sub sockit { my $self = shift; unlink '/tmp/cbstats.sock'; AnyEvent::Socket::tcp_server "unix/", "/tmp/cbstats.sock", sub { my ($fh) = unblock shift; async_pool { my $channel = $self->{multichannel}->add(); scope_guard { undef $channel; $self->update_status("Guard exit"); }; $fh->autoflush(1); $fh->print("Welcome!\n\n"); $self->update_status("Socket connection added"); while (my $msg = $channel->get()) { last unless $fh->writable(); last unless $fh->print($msg->{author}, " said '", $msg->{msgtext}, "'\n"); } $self->update_status("Socket connection ended"); }; return; }; return; } 1;