in reply to Re^2: multi thread problem in an simple HTTP server
in thread multi thread problem in an simple HTTP server

"if I do not add the last line of code, the main thread do not spawn enough 'worker' threads at the first time, only 2 ones created, but it should be 8 at this time. I guess the current thread is switched to the main thread at that time so I add threads->yield(), then one more 'worker' threads is spawned at the first time, but it is still not what I want. At last I add sleep(1), it is OK then, why?"

You are misunderstanding what is going on here. Without the sleep, all the threads do get started--it just takes a minute or so for it to happen.

The reason for the delay comes down to the locking you are using, but I'll come back to that. There is a far more fundamental issue to deal with first. That of how you are creating your worker threads:

You are spawning each thread, using the thread handle return to obtain the thread ID, and using that to set up the control structures used by the thread:

if ( my $thread = threads->new( \&do_worker ) ) { my $t = $thread->tid; lock %{ $stat{'work'} }; $stat{'work'}{ $t } = &share( {} ); $stat{'work'}{ $t }{'stat'} = 'idle';

But, the first thing your thread does is attempt to use the control structure:

sub do_worker { my $t = threads->self()->tid(); # sleep(1); my $c; LOOP: while ( !$stat{'work'}{$t}{'stat'} ne 'done' ) {

Which your main thread hasn't yet created!

And, you are also accessing that shared control structure without having locked it!

To correct these problems, you could move the per-thread control structure creation inside the thread itself, and apply locking to the accesses:

sub do_worker { my $t = threads->self()->tid(); { lock %{ $stat{'work'} }; $stat{'work'}{ $t } = &share( {} ); $stat{'work'}{ $t }{'stat'} = 'idle'; } my $c; LOOP: while ( do{ lock %{ $stat{'work'}{$t} }; $stat{'work'}{$t}{'s +tat'} } ne 'done' ) { ... if ( my $thread = threads->new( \&do_worker ) ) { my $t = $thread->tid; trc 'dbg', 'new worker created'; $thread->detach; ...

But that raises another question: As these are 'per-thread' control structures, why are you having to lock them?

Looking at the places outside of the worker thread where they are accessed, it all happens in you main thread loop. But your main thread loop is controlled by a message queue, and the messages are being sent from the worker threads.

So the question then becomes: why send a message from a worker thread to the main thread to have it modify the shared, per-thread control structures when the worker threads could make those modifications directly?

Indeed, the more I think about that logic, the more questionable the reasons for having a queue at all. There are other questions about your architecture that need answering, but I'll get back to them later.


Now back to the question of why worker threads start up so slowly unless you add the sleep 1;.

The problem arises here:

LOOP: while ( $stat{'work'}{$t}{'stat'} ne 'done' ) { { lock %{ $stat{'lock'} }; $c = $listener->accept() or next LOOP; }

That lock ($stat{'lock'}) is a global lock(*). And accept() is a blocking call. So, once one thread has entered that lock and is waiting for a connection, no other thread will be able to progress past that point until the first one has either: received a connection; or the accept() times out.

Basically, the workers start slowly because you have programmed them to do so.

You can demonstrate this to yourself by adjusting the timeout

timeout => 10,</code>. <P>If you remove the <c>sleep 1;
and set the timeout to 1, then you'll see that the 8 workers start much more quickly. One per second (as the accept() times out) instead of one every 10 seconds.

Other problems:

(*)Why are you using a shared hash for locking? You never put anything inside that hash, so a shared scalar would do the job just as well. And probably with less overheads.

Your whole locking strategy needs review. For example, you are using code like this:

lock %{ $stat{'work'} }; ... $stat{'work'}{$t}{'stat'} = $s; $stat{'work'}{$t}{'time'} = time(); $stat{'work'}{$t}{'work'}++ if $s eq 'idle';

You are applying a global lock, when you only need to access per-thread data.That means all threads will be blocked for the duration of that lock which will have a detrimental affect on the overall performance.

You could avoid that, but only applying a lock to the per-thread that you wish to modify:

lock %{ $stat{'work'}{$t} }; ... $stat{'work'}{$t}{'stat'} = $s; $stat{'work'}{$t}{'time'} = time(); $stat{'work'}{$t}{'work'}++ if $s eq 'idle';

which would leave all the other thread free to run.

However, there is still the question of whether this per-thread data needs to be shared in the first place.

Enough for this post. More to follow.


Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.

Replies are listed 'Best First'.
Re^4: multi thread problem in an simple HTTP server (Q.6)
by bravesoul (Initiate) on Apr 13, 2009 at 16:02 UTC

    At last it works perfectly on my Cygwin environment, with perl v5.8.7 built for cywin-thread-multi-64int, so I think the 'per thread accept' architecture can't run well in the windows 2000/xp system

    Applying you suggestions I have reconstruct the script as below.
    use strict; use warnings; use 5.008; use threads; # multi-threads needed use threads::shared; # need to share some variables use Thread::Queue; # thread safe communication needed use Getopt::Long; use HTTP::Daemon; use Time::HiRes qw( time ); # If available.... # subroutine proto type sub trc (@); # global variables my $queue = Thread::Queue->new; my %self : shared; # configuration my %cnf = ( # Username/Password. usr => 'usr', pwd => 'pwd', # Appr. Position, being used for Ephemeris transfer from server. lat => 47.28, # Degrees lon => 8.56, # Degrees acc => 1000, # meters srv => 'agps.u-blox.com:46434', cmd => 'eph', # our server settings port => 46434, # listen port queue => 5, # listen queue timeout => 10, # rest => 10, # the time value to rest after previous conn +ect from the same addr conn => 3, # the allowed max connection per addr max_con => 20000, # max connection simultaneously ext_srv => '', # extent server to handle requests when t +he main server reach its capacity workers_ini => 2,#8, # prethread count workers_max => 20, # max worker thread allowed workers_idle_lo => 1, # low water mark workers_idle_hi => 3,#10, # high water mark period => 600, # period to check for new ephemeris # directory doc => 'doc', # the root directory where all the docs live in # misc dbg => 1, # print log for debug purpose ); # parse command line parameter GetOptions(\%cnf, 'usr=s', 'pwd=s', 'lat=f', 'lon=f', 'acc=i', 'srv=s', 'cmd=s', 'period=f', 'port=i', 'queue=i', 'timeout=f', 'rest=f', 'conn=i', 'max_con=i', 'ext_srv=s', 'doc=s', 'dbg=i', ); # ----------------------- # main routine # ----------------------- $SIG{'INT'} = $SIG{'TERM'} = sub { lock %self; $self{'stat'} = 'done'; + }; my $listener = init_srv(); spawn_worker() foreach ( 1 .. $cnf{'workers_ini'} ); spawn_miner(); trc 'dbg', 'start main loop'; while ( $self{'stat'} ne 'done' ) { if ( my $msg = $queue->dequeue() ) { trc 'dbg', 'new msg'; my ( $class, $payload ) = split /\s+/, $msg, 2; if ( $class eq 'worker' ) { my ( $t, $s ) = split /\s+/, $payload, 2; lock %{ $self{'worker'}{$t} }; trc 'dbg', 'worker stat', $t, $s; if ( $s eq 'gone' ) { delete $self{'worker'}{$t}; } else { $self{'worker'}{$t}{'stat'} = $s; $self{'worker'}{$t}{'time'} = time(); $self{'worker'}{$t}{'work'}++ if $s eq 'idle'; } } elsif ( $class eq 'miner' ) { my ( $t, $d ) = split /\s+/, $payload, 2; if ( $d ) { lock %{ $self{'miner'}{$t} }; trc 'dbg', 'miner data', $t, length $d; $self{'miner'}{$t}{'expi'} = time() + $cnf{'period'}; $self{'miner'}{$t}{'data'} = $d; } } elsif ( $class eq 'beat' ) { # heart beat event # now we can monitor the thread's activity trc 'dbg', 'heart beat', $payload; } else { # nothing to do } } # reap worker if necessary # adjust worker my @worker_idle = sort { $a <=> $b } grep { $self{'worker'}{$_}{'s +tat'} eq 'idle' } keys %{$self{'worker'}}; my ( $worker_cnt_ttl, $worker_cnt_idl ) = ( scalar keys %{$self{'w +orker'}}, scalar @worker_idle ); trc 'dbg', 'idle worker', $worker_cnt_idl, 'total worker', $worke +r_cnt_ttl; if ( $worker_cnt_idl < $cnf{'workers_idle_lo'} ) { foreach ( 1 .. $cnf{'workers_idle_lo'} - $worker_cnt_idl ) { last if $_ > $cnf{'workers_max'} - $worker_cnt_ttl; spawn_worker(); } } elsif ( $worker_cnt_idl > $cnf{'workers_idle_hi'} ) { foreach my $t ( @worker_idle[ 0 .. $worker_cnt_idl - $cnf{'wor +kers_idle_hi'} - 1 ] ) { kill_worker( $t ); } } } END { trc 'dbg', 'main thread start to terminate ...'; # close all the worker foreach ( keys %{$self{'worker'}} ) { lock %{ $self{'worker'}{$_} }; $self{'worker'}{$_}{'stat'} = 'done'; } # need to wait? trc 'dbg', 'terminated'; } # ----------------------- # subroutine definitions # ----------------------- sub retrieve_data { # dummy data here # fetch from root server return 'dummy data, comming soon ...'; } sub get_req_data { # dummy data here # fetch the request data from miner's repo return { data => 'dummy data, comming soon ...', expi => time() + $cnf{'period'}, }; } sub trc (@) { local $, = ', '; if ( $_[0] eq 'dbg' ) { # shortcut return if !$cnf{'dbg'}; print scalar(localtime), threads->self()->tid(), @_; } else { print scalar(localtime), @_; } print "\n"; } sub init_srv { # print out the configuration for debug purpose if ( $cnf{'dbg'} ) { print '<configuration>', "\n"; foreach my $k ( sort keys %cnf ) { print $k, ': ', $cnf{ $k }, "\n"; } print '</configuration>', "\n"; } # main thread my $srv = HTTP::Daemon->new( LocalPort => $cnf{'port'}, Reuse => 1, Listen => $cnf{'queue'}, Timeout => $cnf{'timeout'}, ) or die "can't create local socket: $@\n"; trc 'log', "Accepting connections on", $srv->sockhost().':'.$srv-> +sockport(); { lock %self; $self{'stat'} = 'busy'; $self{'lock'} = &share( {} ); $self{'miner'} = &share( {} ); $self{'worker'} = &share( {} ); } return $srv; } sub do_miner { my $t = threads->self()->tid(); my @p = @_; trc 'dbg', 'do_miner'; { lock( %{ $self{'miner'} } ); $self{'miner'}{$t} = &share( {} ); } # threads->yield(); # periodically check for ephemeris while ( $self{'stat'} ne 'done' ) { # Check for new ephemeris notify( 'miner', $t, retrieve_data( @p ) ); select undef, undef, undef, $cnf{'period'}; } trc 'dbg', 'miner gone'; } sub spawn_miner { if ( my $thread = threads->new( \&do_miner, @_ ) ) { trc 'dbg', 'new miner created'; $thread->detach; } else { trc 'dbg', 'new miner create failed'; # force to success, otherwise the service is not available die; } } sub do_worker { my $t = threads->self()->tid(); trc 'dbg', 'do_worker'; { lock %{ $self{'worker'} }; $self{'worker'}{ $t } = &share( {} ); $self{'worker'}{ $t }{'stat'} = 'idle'; } # threads->yield(); my $c; LOOP: while ( $self{'worker'}{$t}{'stat'} ne 'done' ) { { trc 'dbg', 'current stat', $self{'worker'}{$t}{'stat'}; notify( 'beat', $t, '' ); lock %{ $self{'lock'} }; $c = $listener->accept() or next LOOP; } notify( 'worker', $t, 'busy' ); trc 'log', "new connect accept", $c->peerhost().":".$c->peerpo +rt(); if ( my $r = $c->get_request() ) { trc 'dbg', 'new request', $r->as_string; if ( $r->method() eq 'GET' ) { if ( $r->uri()->path() eq '/' ) { my %query = $r->uri()->query_form(); if ( exists $query{'cmd'} ) { my $data; for ( $query{'cmd'} ) { /eph/i and do { $data = get_req_data( %que +ry ); last }; } if ( $data ) { my $res = HTTP::Response->new( 200 ); $res->push_header( 'Expires' => HTTP::Date +::time2str( $data->{'expi'} ) ); $res->push_header( 'Content-Type' => 'appl +ication/ubx' ); $res->content( $data->{'data'} ); $c->send_response( $res ); trc 'log', 'response', 'ok', "send @{ [ i +nt( length $data->{'data'} ) ] } bytes"; } else { $c->send_error(); trc 'log', 'response', 'error'; } } else { foreach my $f ( glob "$cnf{'doc'}/index.*" ) { trc 'dbg', 'send', $f; $c->send_file_response( $f ); last; } } } else { ( my $path = $r->uri()->path() ) =~ s|^/|$cnf{'doc +'}/|; trc 'dbg', 'send', $path; $c->send_file_response( $path ); } } } $c->close(); notify( 'worker', $t, 'idle' ); } notify( 'worker', $t, 'gone' ); trc 'dbg', 'worker gone'; } sub spawn_worker { if ( my $thread = threads->new( \&do_worker ) ) { trc 'dbg', 'new worker created'; $thread->detach; } else { trc 'dbg', 'new worker create failed'; } } sub kill_worker { my ( $t ) = @_; trc 'dbg', 'kill worker', $t; lock %{ $self{'worker'}{$t} }; $self{'worker'}{$t}{'stat'} = 'done'; } sub notify { $queue->enqueue( join ' ', @_ ); }

    I still confused with threads::share, how can I share an item in the shared hash? For example, if I change as

    $self{'lock'} = 'lock'; share( $self{'lock'} );
    it cause 'thread failed to start: lock can only be used on shared values', at lock $self{'lock'}