use strict;
use warnings;
use 5.008;
use threads; # multi-threads needed
use threads::shared; # need to share some variables
use Thread::Queue; # thread safe communication needed
use Getopt::Long;
use HTTP::Daemon;
use Time::HiRes qw( time ); # If available....
# subroutine proto type
sub trc (@);
# global variables
my $queue = Thread::Queue->new;
my %self : shared;
# configuration
my %cnf = (
# Username/Password.
usr => 'usr',
pwd => 'pwd',
# Appr. Position, being used for Ephemeris transfer from server.
lat => 47.28, # Degrees
lon => 8.56, # Degrees
acc => 1000, # meters
srv => 'agps.u-blox.com:46434',
cmd => 'eph',
# our server settings
port => 46434, # listen port
queue => 5, # listen queue
timeout => 10, #
rest => 10, # the time value to rest after previous conn
+ect from the same addr
conn => 3, # the allowed max connection per addr
max_con => 20000, # max connection simultaneously
ext_srv => '', # extent server to handle requests when t
+he main server reach its capacity
workers_ini => 2,#8, # prethread count
workers_max => 20, # max worker thread allowed
workers_idle_lo => 1, # low water mark
workers_idle_hi => 3,#10, # high water mark
period => 600, # period to check for new ephemeris
# directory
doc => 'doc', # the root directory where all the docs live in
# misc
dbg => 1, # print log for debug purpose
);
# parse command line parameter
GetOptions(\%cnf,
'usr=s', 'pwd=s',
'lat=f', 'lon=f', 'acc=i',
'srv=s', 'cmd=s', 'period=f',
'port=i', 'queue=i', 'timeout=f',
'rest=f', 'conn=i', 'max_con=i', 'ext_srv=s',
'doc=s',
'dbg=i',
);
# -----------------------
# main routine
# -----------------------
$SIG{'INT'} = $SIG{'TERM'} = sub { lock %self; $self{'stat'} = 'done';
+ };
my $listener = init_srv();
spawn_worker() foreach ( 1 .. $cnf{'workers_ini'} );
spawn_miner();
trc 'dbg', 'start main loop';
while ( $self{'stat'} ne 'done' ) {
if ( my $msg = $queue->dequeue() ) {
trc 'dbg', 'new msg';
my ( $class, $payload ) = split /\s+/, $msg, 2;
if ( $class eq 'worker' ) {
my ( $t, $s ) = split /\s+/, $payload, 2;
lock %{ $self{'worker'}{$t} };
trc 'dbg', 'worker stat', $t, $s;
if ( $s eq 'gone' ) {
delete $self{'worker'}{$t};
} else {
$self{'worker'}{$t}{'stat'} = $s;
$self{'worker'}{$t}{'time'} = time();
$self{'worker'}{$t}{'work'}++ if $s eq 'idle';
}
} elsif ( $class eq 'miner' ) {
my ( $t, $d ) = split /\s+/, $payload, 2;
if ( $d ) {
lock %{ $self{'miner'}{$t} };
trc 'dbg', 'miner data', $t, length $d;
$self{'miner'}{$t}{'expi'} = time() + $cnf{'period'};
$self{'miner'}{$t}{'data'} = $d;
}
} elsif ( $class eq 'beat' ) {
# heart beat event
# now we can monitor the thread's activity
trc 'dbg', 'heart beat', $payload;
} else {
# nothing to do
}
}
# reap worker if necessary
# adjust worker
my @worker_idle = sort { $a <=> $b } grep { $self{'worker'}{$_}{'s
+tat'} eq 'idle' } keys %{$self{'worker'}};
my ( $worker_cnt_ttl, $worker_cnt_idl ) = ( scalar keys %{$self{'w
+orker'}}, scalar @worker_idle );
trc 'dbg', 'idle worker', $worker_cnt_idl, 'total worker', $worke
+r_cnt_ttl;
if ( $worker_cnt_idl < $cnf{'workers_idle_lo'} ) {
foreach ( 1 .. $cnf{'workers_idle_lo'} - $worker_cnt_idl ) {
last if $_ > $cnf{'workers_max'} - $worker_cnt_ttl;
spawn_worker();
}
} elsif ( $worker_cnt_idl > $cnf{'workers_idle_hi'} ) {
foreach my $t ( @worker_idle[ 0 .. $worker_cnt_idl - $cnf{'wor
+kers_idle_hi'} - 1 ] ) {
kill_worker( $t );
}
}
}
END {
trc 'dbg', 'main thread start to terminate ...';
# close all the worker
foreach ( keys %{$self{'worker'}} ) {
lock %{ $self{'worker'}{$_} };
$self{'worker'}{$_}{'stat'} = 'done';
}
# need to wait?
trc 'dbg', 'terminated';
}
# -----------------------
# subroutine definitions
# -----------------------
sub retrieve_data {
# dummy data here
# fetch from root server
return 'dummy data, comming soon ...';
}
sub get_req_data {
# dummy data here
# fetch the request data from miner's repo
return {
data => 'dummy data, comming soon ...',
expi => time() + $cnf{'period'},
};
}
sub trc (@) {
local $, = ', ';
if ( $_[0] eq 'dbg' ) {
# shortcut
return if !$cnf{'dbg'};
print scalar(localtime), threads->self()->tid(), @_;
} else {
print scalar(localtime), @_;
}
print "\n";
}
sub init_srv {
# print out the configuration for debug purpose
if ( $cnf{'dbg'} ) {
print '<configuration>', "\n";
foreach my $k ( sort keys %cnf ) {
print $k, ': ', $cnf{ $k }, "\n";
}
print '</configuration>', "\n";
}
# main thread
my $srv = HTTP::Daemon->new(
LocalPort => $cnf{'port'},
Reuse => 1,
Listen => $cnf{'queue'},
Timeout => $cnf{'timeout'},
) or die "can't create local socket: $@\n";
trc 'log', "Accepting connections on", $srv->sockhost().':'.$srv->
+sockport();
{
lock %self;
$self{'stat'} = 'busy';
$self{'lock'} = &share( {} );
$self{'miner'} = &share( {} );
$self{'worker'} = &share( {} );
}
return $srv;
}
sub do_miner {
my $t = threads->self()->tid();
my @p = @_;
trc 'dbg', 'do_miner';
{
lock( %{ $self{'miner'} } );
$self{'miner'}{$t} = &share( {} );
}
# threads->yield();
# periodically check for ephemeris
while ( $self{'stat'} ne 'done' ) {
# Check for new ephemeris
notify( 'miner', $t, retrieve_data( @p ) );
select undef, undef, undef, $cnf{'period'};
}
trc 'dbg', 'miner gone';
}
sub spawn_miner {
if ( my $thread = threads->new( \&do_miner, @_ ) ) {
trc 'dbg', 'new miner created';
$thread->detach;
} else {
trc 'dbg', 'new miner create failed';
# force to success, otherwise the service is not available
die;
}
}
sub do_worker {
my $t = threads->self()->tid();
trc 'dbg', 'do_worker';
{
lock %{ $self{'worker'} };
$self{'worker'}{ $t } = &share( {} );
$self{'worker'}{ $t }{'stat'} = 'idle';
}
# threads->yield();
my $c;
LOOP: while ( $self{'worker'}{$t}{'stat'} ne 'done' ) {
{
trc 'dbg', 'current stat', $self{'worker'}{$t}{'stat'};
notify( 'beat', $t, '' );
lock %{ $self{'lock'} };
$c = $listener->accept() or next LOOP;
}
notify( 'worker', $t, 'busy' );
trc 'log', "new connect accept", $c->peerhost().":".$c->peerpo
+rt();
if ( my $r = $c->get_request() ) {
trc 'dbg', 'new request', $r->as_string;
if ( $r->method() eq 'GET' ) {
if ( $r->uri()->path() eq '/' ) {
my %query = $r->uri()->query_form();
if ( exists $query{'cmd'} ) {
my $data;
for ( $query{'cmd'} ) {
/eph/i and do { $data = get_req_data( %que
+ry ); last };
}
if ( $data ) {
my $res = HTTP::Response->new( 200 );
$res->push_header( 'Expires' => HTTP::Date
+::time2str( $data->{'expi'} ) );
$res->push_header( 'Content-Type' => 'appl
+ication/ubx' );
$res->content( $data->{'data'} );
$c->send_response( $res );
trc 'log', 'response', 'ok', "send @{ [ i
+nt( length $data->{'data'} ) ] } bytes";
} else {
$c->send_error();
trc 'log', 'response', 'error';
}
} else {
foreach my $f ( glob "$cnf{'doc'}/index.*" ) {
trc 'dbg', 'send', $f;
$c->send_file_response( $f );
last;
}
}
} else {
( my $path = $r->uri()->path() ) =~ s|^/|$cnf{'doc
+'}/|;
trc 'dbg', 'send', $path;
$c->send_file_response( $path );
}
}
}
$c->close();
notify( 'worker', $t, 'idle' );
}
notify( 'worker', $t, 'gone' );
trc 'dbg', 'worker gone';
}
sub spawn_worker {
if ( my $thread = threads->new( \&do_worker ) ) {
trc 'dbg', 'new worker created';
$thread->detach;
} else {
trc 'dbg', 'new worker create failed';
}
}
sub kill_worker {
my ( $t ) = @_;
trc 'dbg', 'kill worker', $t;
lock %{ $self{'worker'}{$t} };
$self{'worker'}{$t}{'stat'} = 'done';
}
sub notify {
$queue->enqueue( join ' ', @_ );
}
I still confused with threads::share, how can I share an item in the shared hash? For example, if I change as
$self{'lock'} = 'lock';
share( $self{'lock'} );
it cause 'thread failed to start: lock can only be used on shared values', at lock $self{'lock'}
|