in reply to Re^2: multi thread problem in an simple HTTP server (refactoring)
in thread multi thread problem in an simple HTTP server
At last I decide to switch to Cywin environment, because it can work perfectly there.
Hm. Why you think you need to "switch to Cygwin environment" is beyond me. You code as posted run perfectly fine as designed under both Vista 64 and XP-32. And I see nothing in your code that would require a POSIX layer. All Cygwin is going to do is make run even more sluggishly(*) than it already does.
(*)Example 1: You've ignored my suggestion to use Thread::Queue::dequeue_nb() and substituted your 'beat' messages to try and ensure that your MAIN_LOOP wakes up in a timely fashion. But your beat message will only get sent each time one of your threads either accepts and in-bound connection; or its accept() times out! With your timeout set to 10 seconds, nothing will happen in your MAIN_LOOP for up to 10 seconds unless you are receiving connections. If you had substituted dequeue_nb() and a short sleep in the MAIN _LOOP, it would have remained as responsive as you chose--every 1 second or 1/10th second, regardless of your accept timeout or traffic load.
But the real kicker is--as I identified earlier--your queue serves no purpose! Every action taken as a result of one of your notify() messages is to adjust values in thread specific shared variables. So, why send a message from your worker threads to your main thread to adjust those values, when those workers have direct access to those shared variables? All you are doing is delay those actions through several layers of unnecessary locking--those associated with the queue itself, and those that prevent your main thread from accessing the thread specific shared data. If each thread adjusted its own state variables--neither type of locking wold be required!
(*)Example 2: In your do_miner(), you have this notify:
while ( $self{'stat'} ne 'done' ) { # Check for new ephemeris notify( 'miner', $t, retrieve_data(@p) ); select undef, undef, undef, $cnf{'period'}; }
which sends a message to your MAIN_LOOP that invokes this code:
elsif ( $class eq 'miner' ) { my ( $t, $d ) = split /\s+/, $payload, 2; if ($d) { lock %{ $self{'miner'}{$t} }; trc 'dbg', 'miner data', $t, length $d; $self{'miner'}{$t}{'expi'} = time() + $cnf{'period'}; $self{'miner'}{$t}{'data'} = $d; } }
But why have your miner thread send a message (via a queue with all the context switching and locking that involves), in order to set two shared variables (involving further locking), when that thread already has direct access to those two thread specific, shared variables?
Both those lumps of code can be trivially reduced to:
# periodically check for ephemeris while ( $self{'stat'} ne 'done' ) { # Check for new ephemeris $self{'miner'}{$t}{'expi'} = time() + $cnf{'period'}; $self{'miner'}{$t}{'data'} = retrieve_data(@p); select undef, undef, undef, $cnf{'period'}; }
Removing two layers of locking and context switches. You may say that it only happens every 600 seconds, but why have a thread wake up every 600 seconds to send a message to the main loop asking it to break of fromdoing the other things it is charged with--when that thread can do those things itself?
In addition, by having both your miner thread and all your worker threads take care of maintaining their own stated, the reason for the queue disappears entirely. That leaves your MAIN_LOOP just one job to do: monitor the worker thread states and ramp their numbers up or down as required. Something that currently only gets done if the queue is empty:
# shortcut # handle all the pending messages first next MAIN_LOOP if $queue->pending(); }
Which it never will be at the times when it is most needed--when you are getting lots of inbound accepts--because your workers are tying the main loop up with pointless requests to have the MAIN_LOOP maintain their state.
Here is the updated version, any suggestion will be appriciated,
As you've ignored just about all the suggestions I already made, there seems little point in repeating them.
However, as I've tested my 'no notify - no queue' theories, and found that even with 80 clients pounding away flat out, it can handle them perfectly whilst remaining responsive to both new clients and keyboard (sig int) under Vista and XP, whereas your version starts timing clients out after 60 seconds with only 15 clients trying to access it, you might as well have the modified code:
#!/usr/bin/perl # created by joe.zheng at Wed Apr 1 10:23:14 2009 # tested in Cygwin, perl v5.8.7 built for cygwin-thread-multi-64int use strict; use warnings; use 5.008; use threads; # multi-threads needed use threads::shared; # need to share some variables use Getopt::Long; use HTTP::Daemon; use SDBM_File; use Time::HiRes qw( time ); use List::Util qw( min ); # subroutine proto type sub trc (@); my %self; # configuration my %cnf = ( # root server settings srv => 'agps.u-blox.com:46434', # server URL cmd => 'eph', # command usr => 'usr', # username pwd => 'pwd', # password # default position, being used for data from the root server. lat => 31, # degrees lon => 121, # degrees acc => 1000, # meters # our server settings port => 46434, # listen port queue => 100, # listen queue timeout => 10, # socket timeout # the extent server redirect to when the main server # can't serve the request, such as reach its capacity, # the data requested is not available, etc. ext_srv => '', workers_ini => 4, # prethread count workers_max => 20, # max worker thread allowed workers_idle_lo => 2, # low water mark workers_idle_hi => 8, # high water mark period => 600, # period to check new data # directory doc => 'doc', # the root directory where all the docs live in # misc dbg => 1, # print log for debug purpose ); # parse command line parameter GetOptions( \%cnf, 'usr=s', 'pwd=s', 'lat=f', 'lon=f', 'acc=i', 'srv=s', 'cmd=s', 'period=f', 'port=i', 'queue=i', 'timeout=f', 'ext_srv=s', 'doc=s', 'dbg=i', ); # ----------------------- # main routine # ----------------------- # set signal handler $SIG{'INT'} = $SIG{'TERM'} = sub { lock $self{'stat'}; $self{'stat'} = 'done'; }; init_srv(); spawn_worker() foreach ( 1 .. $cnf{'workers_ini'} ); spawn_miner(); trc 'dbg', 'start main loop'; MAIN_LOOP: while ( $self{'stat'} ne 'done' ) { # adjust worker my @worker_idle = sort { $a <=> $b } grep { $self{'worker'}{$_}{'stat'} eq 'idle' } keys %{ $self{'worker'} }; my ( $worker_cnt_ttl, $worker_cnt_idl ) = ( scalar keys %{ $self{'worker'} }, scalar @worker_idle ); trc 'dbg', 'idle worker', $worker_cnt_idl, 'total worker', $worker_cnt_ttl; if ( $worker_cnt_idl < $cnf{'workers_idle_lo'} ) { my $cnt = min( $cnf{'workers_idle_lo'} - $worker_cnt_idl, $cnf{'workers_max'} - $worker_cnt_ttl, ); spawn_worker() foreach ( 1 .. $cnt ); } elsif ( $worker_cnt_idl > $cnf{'workers_idle_hi'} ) { my $cnt = $worker_cnt_idl - $cnf{'workers_idle_hi'}; kill_worker( $worker_idle[ $_ - 1 ] ) foreach ( 1 .. $cnt ); } sleep 1; } trc 'dbg', 'stop main loop'; END { trc 'dbg', 'start to terminate ...'; # close all the worker foreach ( keys %{ $self{'worker'} } ) { kill_worker($_); } # need to wait? trc 'dbg', 'terminated'; } # ----------------------- # subroutine definitions # ----------------------- sub retrieve_data { # dummy data here # fetch from root server return 'dummy data, comming soon ...'; } sub get_req_data { # dummy data here # fetch the request data from miner's repo return { data => 'dummy data, comming soon ...', expi => time() + $cnf{'period'}, }; } sub trc (@) { local $, = ', '; if ( $_[0] eq 'dbg' ) { # shortcut return if !$cnf{'dbg'}; print scalar(localtime), threads->self()->tid(), @_; } else { print scalar(localtime), @_; } print "\n"; } sub init_srv { # print out the configuration for debug purpose if ( $cnf{'dbg'} ) { print '<configuration>', "\n"; foreach my $k ( sort keys %cnf ) { print $k, ': ', $cnf{$k}, "\n"; } print '</configuration>', "\n"; } # main thread my $srv = HTTP::Daemon->new( LocalPort => $cnf{'port'}, Reuse => 1, Listen => $cnf{'queue'}, Timeout => $cnf{'timeout'}, ) or die "can't create local socket: $@\n"; trc 'log', "Accepting connections on", $srv->sockhost() . ':' . $srv->sockport(); { $self{'sock'} = $srv; $self{'stat'} = 'busy', share( $self{'stat'} ); $self{'lock'} = 'lock', share( $self{'lock'} ); $self{'miner'} = &share( {} ); $self{'worker'} = &share( {} ); } } sub do_miner { my $t = threads->self()->tid(); my @p = @_; trc 'dbg', 'do_miner'; { lock( %{ $self{'miner'} } ); $self{'miner'}{$t} = &share( {} ); } # periodically check for ephemeris while ( $self{'stat'} ne 'done' ) { # Check for new ephemeris $self{'miner'}{$t}{'expi'} = time() + $cnf{'period'}; $self{'miner'}{$t}{'data'} = retrieve_data(@p); select undef, undef, undef, $cnf{'period'}; } trc 'dbg', 'miner gone'; } sub spawn_miner { if ( my $thread = threads->new( \&do_miner, @_ ) ) { trc 'dbg', 'new miner created'; $thread->detach; } else { trc 'dbg', 'new miner create failed'; # force to success, otherwise the service is not available die; } } sub do_worker { my $t = threads->self()->tid(); trc 'dbg', 'do_worker'; $self{'worker'}{$t} = &share( {} ); $self{'worker'}{$t}{'stat'} = 'idle'; my $c; ACCEPT: while ( $self{'worker'}{$t}{'stat'} ne 'done' ) { { trc 'dbg', 'current stat', $self{'worker'}{$t}{'stat'}; lock $self{'lock'}; next ACCEPT unless $c = $self{'sock'}->accept(); } $self{'worker'}{$t}{'stat'} = 'busy'; $self{'worker'}{$t}{'time'} = time(); trc 'log', "new connect accept", $c->peerhost() . ":" . $c->peerport(); if ( my $r = $c->get_request() ) { handle_req( $c, $r ); } $c->close(); $self{'worker'}{$t}{'stat'} = 'idle'; $self{'worker'}{$t}{'time'} = time(); $self{'worker'}{$t}{'work'}++; } delete $self{'worker'}{$t}; trc 'dbg', 'worker gone'; } sub spawn_worker { if ( my $thread = threads->new( \&do_worker ) ) { trc 'dbg', 'new worker created'; $thread->detach; } else { trc 'dbg', 'new worker create failed'; } } sub kill_worker { my ($t) = @_; trc 'dbg', 'kill worker', $t; lock %{ $self{'worker'}{$t} }; $self{'worker'}{$t}{'stat'} = 'done'; } sub handle_req { my ( $c, $r ) = @_; trc 'dbg', 'new request', $r->as_string; if ( $r->method() eq 'GET' ) { if ( $r->uri()->path() eq '/' ) { my %query = $r->uri()->query_form(); if ( exists $query{'cmd'} ) { my $data; for ( $query{'cmd'} ) { /eph/i and do { $data = get_req_data(%query); last }; } if ($data) { my $res = HTTP::Response->new(200); my $exp = HTTP::Date::time2str( $data->{'expi'} ); $res->push_header( 'Expires' => $exp ); $res->push_header( 'Content-Type' => 'application/ +ubx' ); $res->content( $data->{'data'} ); $c->send_response($res); trc 'log', 'response', 'ok', 'data len', length $data->{'data'}; } else { $c->send_error(); trc 'log', 'response', 'error'; } } else { foreach my $f ( glob "$cnf{'doc'}/index.*" ) { trc 'dbg', 'send', $f; $c->send_file_response($f); last; } } } else { ( my $path = $r->uri()->path() ) =~ s|^/|$cnf{'doc'}/|; trc 'dbg', 'send', $path; $c->send_file_response($path); } } else { $c->send_error( 503, 'method' . $r->method() . 'not supported' + ); } }
BTW: A far better use of a queue in your server would be to queue all your trc() messages to a single thread for output rather than slowing all your workers down waiting for access to a global lock. But I left that as an exercise for the reader.
|
|---|
| Replies are listed 'Best First'. | |
|---|---|
|
Re^4: multi thread problem in an simple HTTP server (refactoring)
by bravesoul (Initiate) on Apr 15, 2009 at 09:19 UTC |