while ( $self{'stat'} ne 'done' ) {
# Check for new ephemeris
notify( 'miner', $t, retrieve_data(@p) );
select undef, undef, undef, $cnf{'period'};
}
####
elsif ( $class eq 'miner' ) {
my ( $t, $d ) = split /\s+/, $payload, 2;
if ($d) {
lock %{ $self{'miner'}{$t} };
trc 'dbg', 'miner data', $t, length $d;
$self{'miner'}{$t}{'expi'} = time() + $cnf{'period'};
$self{'miner'}{$t}{'data'} = $d;
}
}
####
# periodically check for ephemeris
while ( $self{'stat'} ne 'done' ) {
# Check for new ephemeris
$self{'miner'}{$t}{'expi'} = time() + $cnf{'period'};
$self{'miner'}{$t}{'data'} = retrieve_data(@p);
select undef, undef, undef, $cnf{'period'};
}
####
# shortcut
# handle all the pending messages first
next MAIN_LOOP if $queue->pending();
}
####
#!/usr/bin/perl
# created by joe.zheng at Wed Apr 1 10:23:14 2009
# tested in Cygwin, perl v5.8.7 built for cygwin-thread-multi-64int
use strict;
use warnings;
use 5.008;
use threads; # multi-threads needed
use threads::shared; # need to share some variables
use Getopt::Long;
use HTTP::Daemon;
use SDBM_File;
use Time::HiRes qw( time );
use List::Util qw( min );
# subroutine proto type
sub trc (@);
my %self;
# configuration
my %cnf = (
# root server settings
srv => 'agps.u-blox.com:46434', # server URL
cmd => 'eph', # command
usr => 'usr', # username
pwd => 'pwd', # password
# default position, being used for data from the root server.
lat => 31, # degrees
lon => 121, # degrees
acc => 1000, # meters
# our server settings
port => 46434, # listen port
queue => 100, # listen queue
timeout => 10, # socket timeout
# the extent server redirect to when the main server
# can't serve the request, such as reach its capacity,
# the data requested is not available, etc.
ext_srv => '',
workers_ini => 4, # prethread count
workers_max => 20, # max worker thread allowed
workers_idle_lo => 2, # low water mark
workers_idle_hi => 8, # high water mark
period => 600, # period to check new data
# directory
doc => 'doc', # the root directory where all the docs live in
# misc
dbg => 1, # print log for debug purpose
);
# parse command line parameter
GetOptions(
\%cnf, 'usr=s', 'pwd=s', 'lat=f',
'lon=f', 'acc=i', 'srv=s', 'cmd=s',
'period=f', 'port=i', 'queue=i', 'timeout=f',
'ext_srv=s', 'doc=s', 'dbg=i',
);
# -----------------------
# main routine
# -----------------------
# set signal handler
$SIG{'INT'} = $SIG{'TERM'}
= sub { lock $self{'stat'}; $self{'stat'} = 'done'; };
init_srv();
spawn_worker() foreach ( 1 .. $cnf{'workers_ini'} );
spawn_miner();
trc 'dbg', 'start main loop';
MAIN_LOOP:
while ( $self{'stat'} ne 'done' ) {
# adjust worker
my @worker_idle = sort { $a <=> $b }
grep { $self{'worker'}{$_}{'stat'} eq 'idle' }
keys %{ $self{'worker'} };
my ( $worker_cnt_ttl, $worker_cnt_idl )
= ( scalar keys %{ $self{'worker'} }, scalar @worker_idle );
trc 'dbg', 'idle worker', $worker_cnt_idl,
'total worker', $worker_cnt_ttl;
if ( $worker_cnt_idl < $cnf{'workers_idle_lo'} ) {
my $cnt = min(
$cnf{'workers_idle_lo'} - $worker_cnt_idl,
$cnf{'workers_max'} - $worker_cnt_ttl,
);
spawn_worker() foreach ( 1 .. $cnt );
}
elsif ( $worker_cnt_idl > $cnf{'workers_idle_hi'} ) {
my $cnt = $worker_cnt_idl - $cnf{'workers_idle_hi'};
kill_worker( $worker_idle[ $_ - 1 ] ) foreach ( 1 .. $cnt );
}
sleep 1;
}
trc 'dbg', 'stop main loop';
END {
trc 'dbg', 'start to terminate ...';
# close all the worker
foreach ( keys %{ $self{'worker'} } ) {
kill_worker($_);
}
# need to wait?
trc 'dbg', 'terminated';
}
# -----------------------
# subroutine definitions
# -----------------------
sub retrieve_data {
# dummy data here
# fetch from root server
return 'dummy data, comming soon ...';
}
sub get_req_data {
# dummy data here
# fetch the request data from miner's repo
return {
data => 'dummy data, comming soon ...',
expi => time() + $cnf{'period'},
};
}
sub trc (@) {
local $, = ', ';
if ( $_[0] eq 'dbg' ) {
# shortcut
return if !$cnf{'dbg'};
print scalar(localtime), threads->self()->tid(), @_;
}
else {
print scalar(localtime), @_;
}
print "\n";
}
sub init_srv {
# print out the configuration for debug purpose
if ( $cnf{'dbg'} ) {
print '', "\n";
foreach my $k ( sort keys %cnf ) {
print $k, ': ', $cnf{$k}, "\n";
}
print '', "\n";
}
# main thread
my $srv = HTTP::Daemon->new(
LocalPort => $cnf{'port'},
Reuse => 1,
Listen => $cnf{'queue'},
Timeout => $cnf{'timeout'},
) or die "can't create local socket: $@\n";
trc 'log', "Accepting connections on",
$srv->sockhost() . ':' . $srv->sockport();
{
$self{'sock'} = $srv;
$self{'stat'} = 'busy', share( $self{'stat'} );
$self{'lock'} = 'lock', share( $self{'lock'} );
$self{'miner'} = &share( {} );
$self{'worker'} = &share( {} );
}
}
sub do_miner {
my $t = threads->self()->tid();
my @p = @_;
trc 'dbg', 'do_miner';
{
lock( %{ $self{'miner'} } );
$self{'miner'}{$t} = &share( {} );
}
# periodically check for ephemeris
while ( $self{'stat'} ne 'done' ) {
# Check for new ephemeris
$self{'miner'}{$t}{'expi'} = time() + $cnf{'period'};
$self{'miner'}{$t}{'data'} = retrieve_data(@p);
select undef, undef, undef, $cnf{'period'};
}
trc 'dbg', 'miner gone';
}
sub spawn_miner {
if ( my $thread = threads->new( \&do_miner, @_ ) ) {
trc 'dbg', 'new miner created';
$thread->detach;
}
else {
trc 'dbg', 'new miner create failed';
# force to success, otherwise the service is not available
die;
}
}
sub do_worker {
my $t = threads->self()->tid();
trc 'dbg', 'do_worker';
$self{'worker'}{$t} = &share( {} );
$self{'worker'}{$t}{'stat'} = 'idle';
my $c;
ACCEPT:
while ( $self{'worker'}{$t}{'stat'} ne 'done' ) {
{
trc 'dbg', 'current stat', $self{'worker'}{$t}{'stat'};
lock $self{'lock'};
next ACCEPT unless $c = $self{'sock'}->accept();
}
$self{'worker'}{$t}{'stat'} = 'busy';
$self{'worker'}{$t}{'time'} = time();
trc 'log', "new connect accept",
$c->peerhost() . ":" . $c->peerport();
if ( my $r = $c->get_request() ) {
handle_req( $c, $r );
}
$c->close();
$self{'worker'}{$t}{'stat'} = 'idle';
$self{'worker'}{$t}{'time'} = time();
$self{'worker'}{$t}{'work'}++;
}
delete $self{'worker'}{$t};
trc 'dbg', 'worker gone';
}
sub spawn_worker {
if ( my $thread = threads->new( \&do_worker ) ) {
trc 'dbg', 'new worker created';
$thread->detach;
}
else {
trc 'dbg', 'new worker create failed';
}
}
sub kill_worker {
my ($t) = @_;
trc 'dbg', 'kill worker', $t;
lock %{ $self{'worker'}{$t} };
$self{'worker'}{$t}{'stat'} = 'done';
}
sub handle_req {
my ( $c, $r ) = @_;
trc 'dbg', 'new request', $r->as_string;
if ( $r->method() eq 'GET' ) {
if ( $r->uri()->path() eq '/' ) {
my %query = $r->uri()->query_form();
if ( exists $query{'cmd'} ) {
my $data;
for ( $query{'cmd'} ) {
/eph/i
and do { $data = get_req_data(%query); last };
}
if ($data) {
my $res = HTTP::Response->new(200);
my $exp = HTTP::Date::time2str( $data->{'expi'} );
$res->push_header( 'Expires' => $exp );
$res->push_header( 'Content-Type' => 'application/ubx' );
$res->content( $data->{'data'} );
$c->send_response($res);
trc 'log', 'response', 'ok', 'data len',
length $data->{'data'};
}
else {
$c->send_error();
trc 'log', 'response', 'error';
}
}
else {
foreach my $f ( glob "$cnf{'doc'}/index.*" ) {
trc 'dbg', 'send', $f;
$c->send_file_response($f);
last;
}
}
}
else {
( my $path = $r->uri()->path() ) =~ s|^/|$cnf{'doc'}/|;
trc 'dbg', 'send', $path;
$c->send_file_response($path);
}
}
else {
$c->send_error( 503, 'method' . $r->method() . 'not supported' );
}
}