After writing something using cbstream over IRC to collect CB messages for pseudo-statistical analysis, I finally tried to rewrite the fetcher to use PM's XML CB ticker directly. A fair bit of pain later, and I got somewhere - but, as with last time, I had to learn something from it, or it would be no fun. Last time, it was SQL, specifically DB2's SQL. This time? Something probably a lot more esoteric. Finding documentation of success, or even problems, with this technology was difficult, so I'm putting this out there as an example, though whether good or bad will be determined by comments :-)

The technology is, as you probably guessed from the subject line, Coro. Though I haven't gotten it to compile on AIX (which is important if I'm going to use it at $work), the CB stats are on my Linux box, so I had no issues with that part of it. However, the rest proved more difficult.

In order to learn Coro, I had to also learn a bit of AnyEvent, and even just a tiny bit of EV. Things worked better in general when I figured out to run my event loop explicitly (EV::loop) instead of relying on Coro to figure it out and run it for me. Which is okay, the docs basically say as much. Coro also worked a bit better when I recompiled it after installing EV - seems like an issue to me that Coro won't install Coro::EV unless EV is already installed - I apparently did things wrong by doing it in the reverse at first.

I also availed myself of Daemon::Generic - it provided me with a bit of canned code for handling things like pid files, automatically going into the background (or not), sending HUP's, etc. It even had an integration into Event handlers with its Daemon::Generic::Event, which, though I didn't use, it provided me with some insight on how to integrate everything.

My last problem was with the socket code - allowing listeners to use socat to listen through a socket (for now, a unix socket, 'cuz it's easier) was one thing - I used the code in Coro::Debug as an example. But when a socket disconnected, its weakened reference wasn't going away. Eventually I tracked it down to the @$self = grep { defined } @$self line unweakening everything. Once I re-weakened everything, then the extra listeners would go away after the next item was posted to them. I still need to see if I can find a way to have those threads listen for more than one event at a time: a semaphore or the filehandle closing or (in the future) something becoming readable, i.e., the other end sending a request of some sort. If I could be woken on either semaphore or closed filehandle, that thread could go away faster. I may have to re-write Coro::Channel to allow it... but I don't even know how yet.

tye suggested posting the code, with no explanation of why (not that I asked), so I'm merely assuming it's to provide either some example code for Coro or to critique or both. I do look forward to critiques - I'm just learning the Coro way of doing things right now, it looks cool, but I could be doing it wrong.

I'm not going to go into great detail about Coro itself - I figure you're in a browser, you can click on over to the Coro docs for yourself. Instead, here's most of the code. I'm going to skip the database back-end, as it's not terribly interesting, and almost completely unchanged. Not that I posted it before, but I figure there's plenty of example code for dealing with DBI, SQL, etc., that I don't need to add to that body of code. So, first is the Daemon::Generic-based front-end, which I called, boringly, fetch_cb:

#!/usr/bin/perl use 5.12.2; # so that it should run under cron with the right DB2 environment BEGIN { unless ($ENV{DB2INSTANCE}) { exec ( ". ~db2inst1/sqllib/db2profile; exec $^X $0" . join '', map { qq[ "$_"] } @ARGV ); } } use strict; use warnings; # for easier testing in my module build tree. use blib; use CBStats::Fetch; use Daemon::Generic; newdaemon(); sub isfg { shift->{gd_foreground} } sub gd_preconfig { my $self = shift; () } sub gd_run { my $self = shift; my $fetcher = CBStats::Fetch->new(); $fetcher->run( quiet => !$self->isfg(), daemon => $self, ); } # let the CBStats::Fetch module set up its own # signals by preventing Daemon::Generic from doing so. sub gd_setup_signals { } # probably not needed anymore. sub gd_redirect_output { my $self = shift; return if $self->isfg; $self->SUPER::gd_redirect_output(); my $s = select STDOUT; $|=1; select STDERR; $|=1; select $s; }
And then CBStats::Fetch, the main code:
package CBStats::Fetch; use 5.12.2; use Exporter qw(import); our @EXPORT = qw(pm_get); use IO::Handle; use EV; use AnyEvent; use Coro; use Coro::Debug; #unlink '/tmp/cbstats-debug.sock'; #our $server = new_unix_server Coro::Debug '/tmp/cbstats-debug.sock'; use CBStats::db; use CBStats::MultiChannel; use POSIX qw(strftime); use XML::Simple; use Guard; use Data::Dump qw(dump); use AnyEvent::HTTP; sub pm_get { my %opts = @_; my $url = 'http://perlmonks.org/?'; $url .= join ';', map { "$_=$opts{$_}" } grep { defined $opts{$_} +} keys %opts; http_get($url, 'User-Agent' => 'Tanktalus CB Stat 0.1', Coro::rous +e_cb()); my ($body, $hdr) = Coro::rouse_wait(); $hdr->{Status} =~ /^2/ or die "Failed to retrieve $url"; return $body; } # parameters: # quiet => true/false # this simply makes it easier to check the object from within Coro::De +bug # otherwise not really required. our $fetcher; sub new { my $class = shift; my $self = bless {@_}, $class; $fetcher = $self; $self->{multichannel} = CBStats::MultiChannel->new(); $self->setup(); $self; } sub setup { my $self = shift; # here I create all my threads and signals. # I keep track of them so I can ->ready() them later if need be. $self->{do_fetch} = async { $self->fetch_em() }; $self->{do_save} = async { $self->save_em(@_) } $self->{multichan +nel}->add(); $self->{do_scan} = async { $self->scan_em() }; # set up some priorities - more important to fetch than do anythin +g # else. $self->{do_fetch}->nice(-1); #$self->{do_save}->nice(1); $self->{do_scan}->nice(1); $self->{fetch_hup} = AnyEvent->signal(signal => 'HUP', cb => sub { # if I get a HUP signal, + we'll # mark fetch/scan to run + at their # next opportunity. $self->update_status("HU +P received..."); $self->{do_fetch}->ready +() if $self->{do_fetch}; $self->{do_scan}->ready( +) if $self->{do_scan}; }); $self->{fetch_int} = AnyEvent->signal(signal => 'INT', cb => sub { $self->update_status("IN +T received, quitting..."); EV::unloop; }); } sub run { my $self = shift; # "main" program. Set up a periodic timer that will # get called every 5 * 60 seconds, i.e., 5 minutes. This # will kick off the fetch. my $f = EV::periodic 0, 5 * 60, 0, sub { $self->{do_fetch}->ready(); }; # start the socket-listening. $self->sockit(); EV::loop; } sub is_quiet { shift->{quiet} } sub timestamp { my $time = strftime "[%Y/%m/%d %H:%M:%S]", localtime; } sub update_status { my $self = shift; return if $self->is_quiet(); say timestamp, ' ', @_; } sub sem { my $self = shift; my $sem = shift; $self->{fetch_sem}{$sem} ||= Coro::Semaphore->new(); $self->{fetch_sem}{$sem}->guard(); } # Data::Dump output from the xml ticker's conversion to # perl data structure via XML::Simple. Keep it for easy # use later. # Fetch.pm:84: { # info => { # content => "Rendered by the New Chatterbox XML Ticker", # count => 1, # fromid => 1308780, # gentimeGMT => "2011-05-10 23:00:33", # lastid => 1308788, # site => "http://perlmonks.org/", # sitename => "PerlMonks", # ticker_id => 207304, # xmlmaker => "XML::Fling 1.001", # xmlstyle => "clean,modern", # }, # message => [ # { # author => "Tanktalus", # author_user => 421114, # createdepoch => 1305068429, # createdgmtime => "2011-05-10 23:00:29", # message_id => 1308788, # msgtext => "/me gets lonely", # parsed => "/me <i> gets lonely</i>", # }, # ], # } sub fetch_em { my $self = shift; my $db = CBStats::db->new(); my $logs = $db->get_table('Logs'); my $last_id = do { my $guard = $self->sem('db'); $logs->find_where('PMID IN (SELECT MAX(PMID) FROM !!!)'); }; $last_id = $last_id->pmid() if $last_id; while (1) { $self->update_status("Fetching..."); # debugging aids: see how many threads are listening to this. $self->update_status($self->{multichannel}->count() , " listen +ers..."); $self->update_status($self->{multichannel}->status()); # eval so that pm_get can die when PM returns an error, thus # skipping everything here. eval { my $xml = pm_get( node_id => 207304, xmlstyle => 'modern', fromid => $last_id, ); my $cb = XMLin( $xml, ForceArray => ['message'], ); if ($cb and $cb->{message}) { $self->{multichannel}->put(@{$cb->{message}}); $last_id = $cb->{message}[-1]{message_id}; } 1; } or do { my $e = $@; if ($e !~ /Failed to retrieve/) { say "Fetch died with: $@"; } }; # when we're done, we could sleep - but the EV::periodic is ju +st # easier. Coro::schedule(); } } sub save_em { my $self = shift; my $channel = shift; my $db = CBStats::db->new(); my $logs = $db->get_table('Logs'); # the channel->get waits on a semaphore for something to be ready. # Thus, we go to sleep until something to do. while (my $msg = $channel->get()) { $self->update_status("... saving $msg->{message_id} by $msg->{ +author}"); { # here we call the code to transform the input into our db # format. my $guard = $self->sem('db'); $logs->log_from_msg($msg) } # if there's nothing else to save, kick off the scan. $self->{do_scan}->ready() unless $channel->size(); } } sub scan_em { my $self = shift; my $db = CBStats::db->new(); my @tbls = map { $db->get_table($_) } qw/Logs ReferU ReferW Karma/ +; while (1) { $self->update_status("Scanning..."); for my $tbl (@tbls) { my $guard = $self->sem('db'); $tbl->scan($self->{quiet}); Coro::cede(); # in case there's something more important t +o do. } Coro::schedule(); } } use AnyEvent::Socket; use Coro::Handle; use Guard; sub sockit { my $self = shift; unlink '/tmp/cbstats.sock'; AnyEvent::Socket::tcp_server "unix/", "/tmp/cbstats.sock", sub { my ($fh) = unblock shift; async_pool { my $channel = $self->{multichannel}->add(); scope_guard { undef $channel; $self->update_status("Guard +exit"); }; $fh->autoflush(1); $fh->print("Welcome!\n\n"); $self->update_status("Socket connection added"); while (my $msg = $channel->get()) { last unless $fh->writable(); last unless $fh->print($msg->{author}, " said '", $msg +->{msgtext}, "'\n"); } $self->update_status("Socket connection ended"); }; return; }; return; } 1;
And, finally, CBStats::MultiChannel
package CBStats::MultiChannel; use 5.12.2; use Coro::Channel; use Scalar::Util qw(weaken); sub new { my $class = shift; my $self = bless [], $class; $self; } # clear out any channels that have gone away sub clean { my $self = shift; @$self = grep { defined } @$self; # when we pull out the refs this way, they're # no longer weakened, so re-weaking everything. # (easier than using splice to pull undef items out - # if we get too many readers, we'll re-evaluate if this # is slow.) weaken($_) for @$self; } sub count { my $self = shift; $self->clean(); scalar @$self; } sub status { my $self = shift; $self->clean(); 'Channel=size :: ' . join ":", map { $_ . "=" . $self->[$_]->size( +) } 0..$#$self; } # create new channel, add it to $self, ensure it's weakened, and retur +n # the non-weak version. sub add { my $self = shift; my $channel = Coro::Channel->new(); push @$self, $channel; $self->clean(); $channel; } # pass a message to all listeners. sub put { my $self = shift; $self->clean(); for my $msg (@_) { # if we were really multi-threaded, we'd still # have to check if $_ was defined, but Coro eliminates # that possibility since nothing else really runs between # the clean() above and this (we don't cede) $_->put($msg) for (@$self); } } 1;
Hopefully that comes in handy for someone - maybe even ambrus and his vapourware :-)