use strict; use warnings; use 5.008; use threads; # multi-threads needed use threads::shared; # need to share some variables use Thread::Queue; # thread safe communication needed use Getopt::Long; use HTTP::Daemon; use Time::HiRes qw( time ); # If available.... # subroutine proto type sub trc (@); # global variables my $queue = Thread::Queue->new; my %self : shared; # configuration my %cnf = ( # Username/Password. usr => 'usr', pwd => 'pwd', # Appr. Position, being used for Ephemeris transfer from server. lat => 47.28, # Degrees lon => 8.56, # Degrees acc => 1000, # meters srv => 'agps.u-blox.com:46434', cmd => 'eph', # our server settings port => 46434, # listen port queue => 5, # listen queue timeout => 10, # rest => 10, # the time value to rest after previous connect from the same addr conn => 3, # the allowed max connection per addr max_con => 20000, # max connection simultaneously ext_srv => '', # extent server to handle requests when the main server reach its capacity workers_ini => 2,#8, # prethread count workers_max => 20, # max worker thread allowed workers_idle_lo => 1, # low water mark workers_idle_hi => 3,#10, # high water mark period => 600, # period to check for new ephemeris # directory doc => 'doc', # the root directory where all the docs live in # misc dbg => 1, # print log for debug purpose ); # parse command line parameter GetOptions(\%cnf, 'usr=s', 'pwd=s', 'lat=f', 'lon=f', 'acc=i', 'srv=s', 'cmd=s', 'period=f', 'port=i', 'queue=i', 'timeout=f', 'rest=f', 'conn=i', 'max_con=i', 'ext_srv=s', 'doc=s', 'dbg=i', ); # ----------------------- # main routine # ----------------------- $SIG{'INT'} = $SIG{'TERM'} = sub { lock %self; $self{'stat'} = 'done'; }; my $listener = init_srv(); spawn_worker() foreach ( 1 .. $cnf{'workers_ini'} ); spawn_miner(); trc 'dbg', 'start main loop'; while ( $self{'stat'} ne 'done' ) { if ( my $msg = $queue->dequeue() ) { trc 'dbg', 'new msg'; my ( $class, $payload ) = split /\s+/, $msg, 2; if ( $class eq 'worker' ) { my ( $t, $s ) = split /\s+/, $payload, 2; lock %{ $self{'worker'}{$t} }; trc 'dbg', 'worker stat', $t, $s; if ( $s eq 'gone' ) { delete $self{'worker'}{$t}; } else { $self{'worker'}{$t}{'stat'} = $s; $self{'worker'}{$t}{'time'} = time(); $self{'worker'}{$t}{'work'}++ if $s eq 'idle'; } } elsif ( $class eq 'miner' ) { my ( $t, $d ) = split /\s+/, $payload, 2; if ( $d ) { lock %{ $self{'miner'}{$t} }; trc 'dbg', 'miner data', $t, length $d; $self{'miner'}{$t}{'expi'} = time() + $cnf{'period'}; $self{'miner'}{$t}{'data'} = $d; } } elsif ( $class eq 'beat' ) { # heart beat event # now we can monitor the thread's activity trc 'dbg', 'heart beat', $payload; } else { # nothing to do } } # reap worker if necessary # adjust worker my @worker_idle = sort { $a <=> $b } grep { $self{'worker'}{$_}{'stat'} eq 'idle' } keys %{$self{'worker'}}; my ( $worker_cnt_ttl, $worker_cnt_idl ) = ( scalar keys %{$self{'worker'}}, scalar @worker_idle ); trc 'dbg', 'idle worker', $worker_cnt_idl, 'total worker', $worker_cnt_ttl; if ( $worker_cnt_idl < $cnf{'workers_idle_lo'} ) { foreach ( 1 .. $cnf{'workers_idle_lo'} - $worker_cnt_idl ) { last if $_ > $cnf{'workers_max'} - $worker_cnt_ttl; spawn_worker(); } } elsif ( $worker_cnt_idl > $cnf{'workers_idle_hi'} ) { foreach my $t ( @worker_idle[ 0 .. $worker_cnt_idl - $cnf{'workers_idle_hi'} - 1 ] ) { kill_worker( $t ); } } } END { trc 'dbg', 'main thread start to terminate ...'; # close all the worker foreach ( keys %{$self{'worker'}} ) { lock %{ $self{'worker'}{$_} }; $self{'worker'}{$_}{'stat'} = 'done'; } # need to wait? trc 'dbg', 'terminated'; } # ----------------------- # subroutine definitions # ----------------------- sub retrieve_data { # dummy data here # fetch from root server return 'dummy data, comming soon ...'; } sub get_req_data { # dummy data here # fetch the request data from miner's repo return { data => 'dummy data, comming soon ...', expi => time() + $cnf{'period'}, }; } sub trc (@) { local $, = ', '; if ( $_[0] eq 'dbg' ) { # shortcut return if !$cnf{'dbg'}; print scalar(localtime), threads->self()->tid(), @_; } else { print scalar(localtime), @_; } print "\n"; } sub init_srv { # print out the configuration for debug purpose if ( $cnf{'dbg'} ) { print '', "\n"; foreach my $k ( sort keys %cnf ) { print $k, ': ', $cnf{ $k }, "\n"; } print '', "\n"; } # main thread my $srv = HTTP::Daemon->new( LocalPort => $cnf{'port'}, Reuse => 1, Listen => $cnf{'queue'}, Timeout => $cnf{'timeout'}, ) or die "can't create local socket: $@\n"; trc 'log', "Accepting connections on", $srv->sockhost().':'.$srv->sockport(); { lock %self; $self{'stat'} = 'busy'; $self{'lock'} = &share( {} ); $self{'miner'} = &share( {} ); $self{'worker'} = &share( {} ); } return $srv; } sub do_miner { my $t = threads->self()->tid(); my @p = @_; trc 'dbg', 'do_miner'; { lock( %{ $self{'miner'} } ); $self{'miner'}{$t} = &share( {} ); } # threads->yield(); # periodically check for ephemeris while ( $self{'stat'} ne 'done' ) { # Check for new ephemeris notify( 'miner', $t, retrieve_data( @p ) ); select undef, undef, undef, $cnf{'period'}; } trc 'dbg', 'miner gone'; } sub spawn_miner { if ( my $thread = threads->new( \&do_miner, @_ ) ) { trc 'dbg', 'new miner created'; $thread->detach; } else { trc 'dbg', 'new miner create failed'; # force to success, otherwise the service is not available die; } } sub do_worker { my $t = threads->self()->tid(); trc 'dbg', 'do_worker'; { lock %{ $self{'worker'} }; $self{'worker'}{ $t } = &share( {} ); $self{'worker'}{ $t }{'stat'} = 'idle'; } # threads->yield(); my $c; LOOP: while ( $self{'worker'}{$t}{'stat'} ne 'done' ) { { trc 'dbg', 'current stat', $self{'worker'}{$t}{'stat'}; notify( 'beat', $t, '' ); lock %{ $self{'lock'} }; $c = $listener->accept() or next LOOP; } notify( 'worker', $t, 'busy' ); trc 'log', "new connect accept", $c->peerhost().":".$c->peerport(); if ( my $r = $c->get_request() ) { trc 'dbg', 'new request', $r->as_string; if ( $r->method() eq 'GET' ) { if ( $r->uri()->path() eq '/' ) { my %query = $r->uri()->query_form(); if ( exists $query{'cmd'} ) { my $data; for ( $query{'cmd'} ) { /eph/i and do { $data = get_req_data( %query ); last }; } if ( $data ) { my $res = HTTP::Response->new( 200 ); $res->push_header( 'Expires' => HTTP::Date::time2str( $data->{'expi'} ) ); $res->push_header( 'Content-Type' => 'application/ubx' ); $res->content( $data->{'data'} ); $c->send_response( $res ); trc 'log', 'response', 'ok', "send @{ [ int( length $data->{'data'} ) ] } bytes"; } else { $c->send_error(); trc 'log', 'response', 'error'; } } else { foreach my $f ( glob "$cnf{'doc'}/index.*" ) { trc 'dbg', 'send', $f; $c->send_file_response( $f ); last; } } } else { ( my $path = $r->uri()->path() ) =~ s|^/|$cnf{'doc'}/|; trc 'dbg', 'send', $path; $c->send_file_response( $path ); } } } $c->close(); notify( 'worker', $t, 'idle' ); } notify( 'worker', $t, 'gone' ); trc 'dbg', 'worker gone'; } sub spawn_worker { if ( my $thread = threads->new( \&do_worker ) ) { trc 'dbg', 'new worker created'; $thread->detach; } else { trc 'dbg', 'new worker create failed'; } } sub kill_worker { my ( $t ) = @_; trc 'dbg', 'kill worker', $t; lock %{ $self{'worker'}{$t} }; $self{'worker'}{$t}{'stat'} = 'done'; } sub notify { $queue->enqueue( join ' ', @_ ); } #### $self{'lock'} = 'lock'; share( $self{'lock'} );