in reply to Re^3: multi thread problem in an simple HTTP server (refactoring)
in thread multi thread problem in an simple HTTP server
Thanks for your help.
By the way, how can you achieved a "Read more... (9 kB)" linkage, it is really great! I am afraid I have no choice but pull the big block of code here.
#!/usr/bin/perl # created by joe.zheng at Wed Apr 1 10:23:14 2009 use strict; use warnings; use 5.008; use threads; # multi-threads needed use threads::shared; # need to share some variables use Thread::Queue; # thread safe communication needed use Getopt::Long; use HTTP::Daemon; use Time::HiRes qw( time ); use List::Util qw( min ); # subroutine proto type sub trc (@); # global variables my $msgq = Thread::Queue->new; my %self; # configuration my %cnf = ( # root server settings srv => 'agps.u-blox.com:46434', # server URL cmd => 'eph', # command usr => 'usr', # username pwd => 'pwd', # password # default position, being used for data from the root server. lat => 31, # degrees lon => 121, # degrees acc => 1000, # meters # our server settings port => 46434, # listen port queue => 100, # listen queue timeout => 10, # socket timeout # the extent server redirect to when the main server # can't serve the request, such as reach its capacity, # the data requested is not available, etc. ext_srv => '', workers_ini => 4, # prethread count workers_max => 20, # max worker thread allowed workers_idle_lo => 2, # low water mark workers_idle_hi => 8, # high water mark period => 600, # period to check new data # directory doc => 'doc', # the root directory where all the docs live in # misc dbg => 1, # print log for debug purpose ); # parse command line parameter GetOptions( \%cnf, 'usr=s', 'pwd=s', 'lat=f', 'lon=f', 'acc=i', 'srv=s', 'cmd=s', 'period=f', 'port=i', 'queue=i', 'timeout=f', 'ext_srv=s', 'doc=s', 'dbg=i', ); # ----------------------- # main routine # ----------------------- # set signal handler $SIG{'INT'} = $SIG{'TERM'} = sub { lock $self{'stat'}; $self{'stat'} = 'done'; }; init_srv(); spawn_loger(); spawn_miner(); spawn_worker() foreach ( 1 .. $cnf{'workers_ini'} ); trc 'dbg', 'start main loop'; MAIN_LOOP: while ( $self{'stat'} ne 'done' ) { # adjust worker my @worker_idle = sort { $a <=> $b } grep { $self{'worker'}{$_}{'stat'} eq 'idle' } keys %{ $self{'worker'} }; my ( $worker_cnt_ttl, $worker_cnt_idl ) = ( scalar keys %{ $self{'worker'} }, scalar @worker_idle ); trc 'dbg', 'idle worker', $worker_cnt_idl, 'total worker', $worker_cnt_ttl; if ( $worker_cnt_idl < $cnf{'workers_idle_lo'} ) { my $cnt = min( $cnf{'workers_idle_lo'} - $worker_cnt_idl, $cnf{'workers_max'} - $worker_cnt_ttl, ); spawn_worker() foreach ( 1 .. $cnt ); } elsif ( $worker_cnt_idl > $cnf{'workers_idle_hi'} ) { my $cnt = $worker_cnt_idl - $cnf{'workers_idle_hi'}; kill_worker( $worker_idle[ $_ - 1 ] ) foreach ( 1 .. $cnt ); } sleep 1; } trc 'dbg', 'stop main loop'; END { trc 'dbg', 'start to terminate ...'; # close all the worker foreach ( keys %{ $self{'worker'} } ) { kill_worker($_); } trc 'dbg', 'terminated'; # now it is the time for loger exit $self{'stat'} = 'term'; # wait for the loger to flush the buffer sleep 1; } # ----------------------- # subroutine definitions # ----------------------- sub retrieve_data { # dummy data here # fetch from root server return 'dummy data, comming soon ...'; } sub get_req_data { # dummy data here # fetch the request data from miner's repo return { data => 'dummy data, comming soon ...', expi => time() + $cnf{'period'}, }; } sub trc (@) { if ( $_[0] eq 'dbg' ) { # shortcut return if !$cnf{'dbg'}; $msgq->enqueue( join ', ', scalar(localtime), threads->self()- +>tid(), @_ ); } else { $msgq->enqueue( join ', ', scalar(localtime), @_ ); } } sub init_srv { # print out the configuration for debug purpose if ( $cnf{'dbg'} ) { print '<configuration>', "\n"; foreach my $k ( sort keys %cnf ) { print $k, ': ', $cnf{$k}, "\n"; } print '</configuration>', "\n"; } # main thread my $srv = HTTP::Daemon->new( LocalPort => $cnf{'port'}, Reuse => 1, Listen => $cnf{'queue'}, Timeout => $cnf{'timeout'}, ) or die "can't create local socket: $@\n"; trc 'log', "Accepting connections on", $srv->sockhost() . ':' . $srv->sockport(); { $self{'sock'} = $srv; $self{'stat'} = 'busy', share( $self{'stat'} ); $self{'lock'} = 'lock', share( $self{'lock'} ); $self{'miner'} = &share( {} ); $self{'worker'} = &share( {} ); } } sub do_loger { while ( $self{'stat'} ne 'term' ) { while ( my $msg = $msgq->dequeue_nb() ) { print $msg, "\n"; } sleep 1; } # flush the buffer while ( my $msg = $msgq->dequeue_nb() ) { print $msg, "\n"; } } sub spawn_loger { if ( my $thread = threads->new( \&do_loger, @_ ) ) { trc 'dbg', 'new loger created'; $thread->detach; } else { # force to success, otherwise the service is not available die "new loger create failed"; } } sub do_miner { my $t = threads->self()->tid(); my @p = @_; trc 'dbg', 'do_miner'; $self{'miner'}{$t} = &share( {} ); # periodically check for ephemeris while ( $self{'stat'} ne 'done' ) { # Check for new ephemeris $self{'miner'}{$t}{'expi'} = time() + $cnf{'period'}; $self{'miner'}{$t}{'data'} = retrieve_data(@p); select undef, undef, undef, $cnf{'period'}; } trc 'dbg', 'miner gone'; } sub spawn_miner { if ( my $thread = threads->new( \&do_miner, @_ ) ) { trc 'dbg', 'new miner created'; $thread->detach; } else { trc 'dbg', 'new miner create failed'; # force to success, otherwise the service is not available die; } } sub do_worker { my $t = threads->self()->tid(); trc 'dbg', 'do_worker'; $self{'worker'}{$t} = &share( {} ); $self{'worker'}{$t}{'stat'} = 'idle'; $self{'worker'}{$t}{'comd'} = 'none'; my $c; ACCEPT: while ( $self{'worker'}{$t}{'comd'} ne 'kill' ) { { trc 'dbg', 'attempt to accept'; lock $self{'lock'}; next ACCEPT unless $c = $self{'sock'}->accept(); } $self{'worker'}{$t}{'stat'} = 'busy'; $self{'worker'}{$t}{'time'} = time(); trc 'log', "new connect accept", $c->peerhost() . ":" . $c->peerport(); if ( my $r = $c->get_request() ) { handle_req( $c, $r ); } $c->close(); $self{'worker'}{$t}{'stat'} = 'idle'; $self{'worker'}{$t}{'time'} = time(); $self{'worker'}{$t}{'work'}++; } delete $self{'worker'}{$t}; trc 'dbg', 'worker gone'; } sub spawn_worker { if ( my $thread = threads->new( \&do_worker ) ) { trc 'dbg', 'new worker created'; $thread->detach; } else { trc 'dbg', 'new worker create failed'; } } sub kill_worker { my ($t) = @_; trc 'dbg', 'kill worker', $t; if ( exists $self{'worker'}{$t} ) { $self{'worker'}{$t}{'comd'} = 'kill'; } } sub handle_req { my ( $c, $r ) = @_; trc 'dbg', 'new request', $r->as_string; if ( $r->method() eq 'GET' ) { if ( $r->uri()->path() eq '/' ) { my %query = $r->uri()->query_form(); if ( exists $query{'cmd'} ) { my $data; for ( $query{'cmd'} ) { /eph/i and do { $data = get_req_data(%query); last }; } if ($data) { my $res = HTTP::Response->new(200); my $exp = HTTP::Date::time2str( $data->{'expi'} ); $res->push_header( 'Expires' => $exp ); $res->push_header( 'Content-Type' => 'application/ +ubx' ); $res->content( $data->{'data'} ); $c->send_response($res); trc 'log', 'response', 'ok', 'data len', length $data->{'data'}; } else { $c->send_error(); trc 'log', 'response', 'error'; } } else { foreach my $f ( glob "$cnf{'doc'}/index.*" ) { trc 'dbg', 'send', $f; $c->send_file_response($f); last; } } } else { ( my $path = $r->uri()->path() ) =~ s|^/|$cnf{'doc'}/|; trc 'dbg', 'send', $path; $c->send_file_response($path); } } else { $c->send_error( 503, 'method' . $r->method() . 'not supported' + ); } }
|
|---|