in reply to multi thread problem in an simple HTTP server

I corrected it early on and then forgot about it.

This is what you would have seen had you had warnings enabled:

Argument "HTTP::Date" isn't numeric in gmtime at C:/Perl64/lib/HTTP/Da +te.pm line 26

The line in your code that causes this error is:

$res->push_header( 'Expires' => HTTP::Date->time2str( $stat{'data'}{'e +ph'}{'expi'} ) );

You are calling time2str as class method when it is expecting to be called as a function:

$res->push_header( 'Expires' => HTTP::Date::time2str( $stat{'data'}{'e +ph'}{'expi'} ) );

Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.
"Too many [] have been sedated by an oppressive environment of political correctness and risk aversion."

Replies are listed 'Best First'.
Re^2: multi thread problem in an simple HTTP server
by bravesoul (Initiate) on Apr 13, 2009 at 03:46 UTC
    You taught me an important lesson: the '-w' should be switched on at the developing stage. I used to force myself to add 'use warnings', but can't remember from what time I gave up such a good habit. You saved me from out-of-control.
Re^2: multi thread problem in an simple HTTP server (refactoring)
by bravesoul (Initiate) on Apr 14, 2009 at 14:32 UTC

    At last I decide to switch to Cywin environment, because it can work perfectly there. I will continue to fix bug, apply suggestion, and add new features. Here is the updated version, any suggestion will be appriciated, thanks in advance.

    #!/usr/bin/perl # created by joe.zheng at Wed Apr 1 10:23:14 2009 # tested in Cygwin, perl v5.8.7 built for cygwin-thread-multi-64int use strict; use warnings; use 5.008; use threads; # multi-threads needed use threads::shared; # need to share some variables use Thread::Queue; # thread safe communication needed use Getopt::Long; use HTTP::Daemon; use SDBM_File; use Time::HiRes qw( time ); use List::Util qw( min ); # subroutine proto type sub trc (@); # global variables my $queue = Thread::Queue->new; my %self; # configuration my %cnf = ( # root server settings srv => 'agps.u-blox.com:46434', # server URL cmd => 'eph', # command usr => 'usr', # username pwd => 'pwd', # password # default position, being used for data from the root server. lat => 31, # degrees lon => 121, # degrees acc => 1000, # meters # our server settings port => 46434, # listen port queue => 5, # listen queue timeout => 10, # socket timeout # the extent server redirect to when the main server # can't serve the request, such as reach its capacity, # the data requested is not available, etc. ext_srv => '', workers_ini => 4, # prethread count workers_max => 20, # max worker thread allowed workers_idle_lo => 2, # low water mark workers_idle_hi => 8, # high water mark period => 600, # period to check new data # directory doc => 'doc', # the root directory where all the docs live in # misc dbg => 1, # print log for debug purpose ); # parse command line parameter GetOptions( \%cnf, 'usr=s', 'pwd=s', 'lat=f', 'lon=f', 'acc=i', 'srv=s', 'cmd=s', 'period=f', 'port=i', 'queue=i', 'timeout=f', 'ext_srv=s', 'doc=s', 'dbg=i', ); # ----------------------- # main routine # ----------------------- # set signal handler $SIG{'INT'} = $SIG{'TERM'} = sub { lock $self{'stat'}; $self{'stat'} = 'done'; }; init_srv(); spawn_worker() foreach ( 1 .. $cnf{'workers_ini'} ); spawn_miner(); trc 'dbg', 'start main loop'; MAIN_LOOP: while ( $self{'stat'} ne 'done' ) { if ( my $msg = $queue->dequeue() ) { trc 'dbg', 'new msg'; my ( $class, $payload ) = split /\s+/, $msg, 2; if ( $class eq 'worker' ) { my ( $t, $s ) = split /\s+/, $payload, 2; lock %{ $self{'worker'}{$t} }; trc 'dbg', 'worker stat', $t, $s; if ( $s eq 'gone' ) { delete $self{'worker'}{$t}; } else { $self{'worker'}{$t}{'stat'} = $s; $self{'worker'}{$t}{'time'} = time(); $self{'worker'}{$t}{'work'}++ if $s eq 'idle'; } } elsif ( $class eq 'miner' ) { my ( $t, $d ) = split /\s+/, $payload, 2; if ($d) { lock %{ $self{'miner'}{$t} }; trc 'dbg', 'miner data', $t, length $d; $self{'miner'}{$t}{'expi'} = time() + $cnf{'period'}; $self{'miner'}{$t}{'data'} = $d; } } elsif ( $class eq 'beat' ) { # heart beat event # now we can monitor the thread's activity trc 'dbg', 'heart beat', $payload; } else { # nothing to do } # shortcut # handle all the pending messages first next MAIN_LOOP if $queue->pending(); } # reap worker if necessary # adjust worker my @worker_idle = sort { $a <=> $b } grep { $self{'worker'}{$_}{'stat'} eq 'idle' } keys %{ $self{'worker'} }; my ( $worker_cnt_ttl, $worker_cnt_idl ) = ( scalar keys %{ $self{'worker'} }, scalar @worker_idle ); trc 'dbg', 'idle worker', $worker_cnt_idl, 'total worker', $worker_cnt_ttl; if ( $worker_cnt_idl < $cnf{'workers_idle_lo'} ) { my $cnt = min( $cnf{'workers_idle_lo'} - $worker_cnt_idl, $cnf{'workers_max'} - $worker_cnt_ttl, ); spawn_worker() foreach ( 1 .. $cnt ); } elsif ( $worker_cnt_idl > $cnf{'workers_idle_hi'} ) { my $cnt = $worker_cnt_idl - $cnf{'workers_idle_hi'}; kill_worker( $worker_idle[ $_ - 1 ] ) foreach ( 1 .. $cnt ); } } trc 'dbg', 'stop main loop'; END { trc 'dbg', 'start to terminate ...'; # close all the worker foreach ( keys %{ $self{'worker'} } ) { kill_worker($_); } # need to wait? trc 'dbg', 'terminated'; } # ----------------------- # subroutine definitions # ----------------------- sub retrieve_data { # dummy data here # fetch from root server return 'dummy data, comming soon ...'; } sub get_req_data { # dummy data here # fetch the request data from miner's repo return { data => 'dummy data, comming soon ...', expi => time() + $cnf{'period'}, }; } sub trc (@) { local $, = ', '; if ( $_[0] eq 'dbg' ) { # shortcut return if !$cnf{'dbg'}; print scalar(localtime), threads->self()->tid(), @_; } else { print scalar(localtime), @_; } print "\n"; } sub init_srv { # print out the configuration for debug purpose if ( $cnf{'dbg'} ) { print '<configuration>', "\n"; foreach my $k ( sort keys %cnf ) { print $k, ': ', $cnf{$k}, "\n"; } print '</configuration>', "\n"; } # main thread my $srv = HTTP::Daemon->new( LocalPort => $cnf{'port'}, Reuse => 1, Listen => $cnf{'queue'}, Timeout => $cnf{'timeout'}, ) or die "can't create local socket: $@\n"; trc 'log', "Accepting connections on", $srv->sockhost() . ':' . $srv->sockport(); { $self{'sock'} = $srv; $self{'stat'} = 'busy', share( $self{'stat'} ); $self{'lock'} = 'lock', share( $self{'lock'} ); $self{'miner'} = &share( {} ); $self{'worker'} = &share( {} ); } } sub do_miner { my $t = threads->self()->tid(); my @p = @_; trc 'dbg', 'do_miner'; { lock( %{ $self{'miner'} } ); $self{'miner'}{$t} = &share( {} ); } # periodically check for ephemeris while ( $self{'stat'} ne 'done' ) { # Check for new ephemeris notify( 'miner', $t, retrieve_data(@p) ); select undef, undef, undef, $cnf{'period'}; } trc 'dbg', 'miner gone'; } sub spawn_miner { if ( my $thread = threads->new( \&do_miner, @_ ) ) { trc 'dbg', 'new miner created'; $thread->detach; } else { trc 'dbg', 'new miner create failed'; # force to success, otherwise the service is not available die; } } sub do_worker { my $t = threads->self()->tid(); trc 'dbg', 'do_worker'; { lock %{ $self{'worker'} }; $self{'worker'}{$t} = &share( {} ); $self{'worker'}{$t}{'stat'} = 'idle'; } my $c; ACCEPT: while ( $self{'worker'}{$t}{'stat'} ne 'done' ) { { trc 'dbg', 'current stat', $self{'worker'}{$t}{'stat'}; notify( 'beat', $t, '' ); lock $self{'lock'}; next ACCEPT unless $c = $self{'sock'}->accept(); } notify( 'worker', $t, 'busy' ); trc 'log', "new connect accept", $c->peerhost() . ":" . $c->peerport(); if ( my $r = $c->get_request() ) { handle_req( $c, $r ); } $c->close(); notify( 'worker', $t, 'idle' ); } notify( 'worker', $t, 'gone' ); trc 'dbg', 'worker gone'; } sub spawn_worker { if ( my $thread = threads->new( \&do_worker ) ) { trc 'dbg', 'new worker created'; $thread->detach; } else { trc 'dbg', 'new worker create failed'; } } sub kill_worker { my ($t) = @_; trc 'dbg', 'kill worker', $t; lock %{ $self{'worker'}{$t} }; $self{'worker'}{$t}{'stat'} = 'done'; } sub notify { $queue->enqueue( join ' ', @_ ); } sub handle_req { my ( $c, $r ) = @_; trc 'dbg', 'new request', $r->as_string; if ( $r->method() eq 'GET' ) { if ( $r->uri()->path() eq '/' ) { my %query = $r->uri()->query_form(); if ( exists $query{'cmd'} ) { my $data; for ( $query{'cmd'} ) { /eph/i and do { $data = get_req_data(%query); last }; } if ($data) { my $res = HTTP::Response->new(200); my $exp = HTTP::Date::time2str( $data->{'expi'} ); $res->push_header( 'Expires' => $exp ); $res->push_header( 'Content-Type' => 'application/ +ubx' ); $res->content( $data->{'data'} ); $c->send_response($res); trc 'log', 'response', 'ok', 'data len', length $data->{'data'}; } else { $c->send_error(); trc 'log', 'response', 'error'; } } else { foreach my $f ( glob "$cnf{'doc'}/index.*" ) { trc 'dbg', 'send', $f; $c->send_file_response($f); last; } } } else { ( my $path = $r->uri()->path() ) =~ s|^/|$cnf{'doc'}/|; trc 'dbg', 'send', $path; $c->send_file_response($path); } } else { $c->send_error( 503, 'method' . $r->method() . 'not supported' + ); } }
      At last I decide to switch to Cywin environment, because it can work perfectly there.

      Hm. Why you think you need to "switch to Cygwin environment" is beyond me. You code as posted run perfectly fine as designed under both Vista 64 and XP-32. And I see nothing in your code that would require a POSIX layer. All Cygwin is going to do is make run even more sluggishly(*) than it already does.

      (*)Example 1: You've ignored my suggestion to use Thread::Queue::dequeue_nb() and substituted your 'beat' messages to try and ensure that your MAIN_LOOP wakes up in a timely fashion. But your beat message will only get sent each time one of your threads either accepts and in-bound connection; or its accept() times out! With your timeout set to 10 seconds, nothing will happen in your MAIN_LOOP for up to 10 seconds unless you are receiving connections. If you had substituted dequeue_nb() and a short sleep in the MAIN _LOOP, it would have remained as responsive as you chose--every 1 second or 1/10th second, regardless of your accept timeout or traffic load.

      But the real kicker is--as I identified earlier--your queue serves no purpose! Every action taken as a result of one of your notify() messages is to adjust values in thread specific shared variables. So, why send a message from your worker threads to your main thread to adjust those values, when those workers have direct access to those shared variables? All you are doing is delay those actions through several layers of unnecessary locking--those associated with the queue itself, and those that prevent your main thread from accessing the thread specific shared data. If each thread adjusted its own state variables--neither type of locking wold be required!

      (*)Example 2: In your do_miner(), you have this notify:

      while ( $self{'stat'} ne 'done' ) { # Check for new ephemeris notify( 'miner', $t, retrieve_data(@p) ); select undef, undef, undef, $cnf{'period'}; }

      which sends a message to your MAIN_LOOP that invokes this code:

      elsif ( $class eq 'miner' ) { my ( $t, $d ) = split /\s+/, $payload, 2; if ($d) { lock %{ $self{'miner'}{$t} }; trc 'dbg', 'miner data', $t, length $d; $self{'miner'}{$t}{'expi'} = time() + $cnf{'period'}; $self{'miner'}{$t}{'data'} = $d; } }

      But why have your miner thread send a message (via a queue with all the context switching and locking that involves), in order to set two shared variables (involving further locking), when that thread already has direct access to those two thread specific, shared variables?

      Both those lumps of code can be trivially reduced to:

      # periodically check for ephemeris while ( $self{'stat'} ne 'done' ) { # Check for new ephemeris $self{'miner'}{$t}{'expi'} = time() + $cnf{'period'}; $self{'miner'}{$t}{'data'} = retrieve_data(@p); select undef, undef, undef, $cnf{'period'}; }

      Removing two layers of locking and context switches. You may say that it only happens every 600 seconds, but why have a thread wake up every 600 seconds to send a message to the main loop asking it to break of fromdoing the other things it is charged with--when that thread can do those things itself?

      In addition, by having both your miner thread and all your worker threads take care of maintaining their own stated, the reason for the queue disappears entirely. That leaves your MAIN_LOOP just one job to do: monitor the worker thread states and ramp their numbers up or down as required. Something that currently only gets done if the queue is empty:

      # shortcut # handle all the pending messages first next MAIN_LOOP if $queue->pending(); }

      Which it never will be at the times when it is most needed--when you are getting lots of inbound accepts--because your workers are tying the main loop up with pointless requests to have the MAIN_LOOP maintain their state.

      Here is the updated version, any suggestion will be appriciated,

      As you've ignored just about all the suggestions I already made, there seems little point in repeating them.

      However, as I've tested my 'no notify - no queue' theories, and found that even with 80 clients pounding away flat out, it can handle them perfectly whilst remaining responsive to both new clients and keyboard (sig int) under Vista and XP, whereas your version starts timing clients out after 60 seconds with only 15 clients trying to access it, you might as well have the modified code:

      BTW: A far better use of a queue in your server would be to queue all your trc() messages to a single thread for output rather than slowing all your workers down waiting for access to a global lock. But I left that as an exercise for the reader.


      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.

        Thanks for your help.

        1. I am working on windows 2000, my script can't work on it, but it run perfectly on the Cygwin though. That is why I made such a decision.
        2. I have no experience with mulit-thread program, and I am not sure I can achieve the lock/unlock wait/signal task on shared variable, so I beg Thread::Queue for help. You taught me a good lesson. In order to eliminate the lock/unlock task, I think it is better to prevent different threads from modifying the same data. For the 'boss-worker' multi-thread architecture, the 'boss' should make decision only and let the 'worker' do the job by themselves, right?
        3. now I implement a dedicated Thread::Queue to do the log task, here is my updated code.
        4. By the way, how can you achieved a "Read more... (9 kB)" linkage, it is really great! I am afraid I have no choice but pull the big block of code here.

          #!/usr/bin/perl # created by joe.zheng at Wed Apr 1 10:23:14 2009 use strict; use warnings; use 5.008; use threads; # multi-threads needed use threads::shared; # need to share some variables use Thread::Queue; # thread safe communication needed use Getopt::Long; use HTTP::Daemon; use Time::HiRes qw( time ); use List::Util qw( min ); # subroutine proto type sub trc (@); # global variables my $msgq = Thread::Queue->new; my %self; # configuration my %cnf = ( # root server settings srv => 'agps.u-blox.com:46434', # server URL cmd => 'eph', # command usr => 'usr', # username pwd => 'pwd', # password # default position, being used for data from the root server. lat => 31, # degrees lon => 121, # degrees acc => 1000, # meters # our server settings port => 46434, # listen port queue => 100, # listen queue timeout => 10, # socket timeout # the extent server redirect to when the main server # can't serve the request, such as reach its capacity, # the data requested is not available, etc. ext_srv => '', workers_ini => 4, # prethread count workers_max => 20, # max worker thread allowed workers_idle_lo => 2, # low water mark workers_idle_hi => 8, # high water mark period => 600, # period to check new data # directory doc => 'doc', # the root directory where all the docs live in # misc dbg => 1, # print log for debug purpose ); # parse command line parameter GetOptions( \%cnf, 'usr=s', 'pwd=s', 'lat=f', 'lon=f', 'acc=i', 'srv=s', 'cmd=s', 'period=f', 'port=i', 'queue=i', 'timeout=f', 'ext_srv=s', 'doc=s', 'dbg=i', ); # ----------------------- # main routine # ----------------------- # set signal handler $SIG{'INT'} = $SIG{'TERM'} = sub { lock $self{'stat'}; $self{'stat'} = 'done'; }; init_srv(); spawn_loger(); spawn_miner(); spawn_worker() foreach ( 1 .. $cnf{'workers_ini'} ); trc 'dbg', 'start main loop'; MAIN_LOOP: while ( $self{'stat'} ne 'done' ) { # adjust worker my @worker_idle = sort { $a <=> $b } grep { $self{'worker'}{$_}{'stat'} eq 'idle' } keys %{ $self{'worker'} }; my ( $worker_cnt_ttl, $worker_cnt_idl ) = ( scalar keys %{ $self{'worker'} }, scalar @worker_idle ); trc 'dbg', 'idle worker', $worker_cnt_idl, 'total worker', $worker_cnt_ttl; if ( $worker_cnt_idl < $cnf{'workers_idle_lo'} ) { my $cnt = min( $cnf{'workers_idle_lo'} - $worker_cnt_idl, $cnf{'workers_max'} - $worker_cnt_ttl, ); spawn_worker() foreach ( 1 .. $cnt ); } elsif ( $worker_cnt_idl > $cnf{'workers_idle_hi'} ) { my $cnt = $worker_cnt_idl - $cnf{'workers_idle_hi'}; kill_worker( $worker_idle[ $_ - 1 ] ) foreach ( 1 .. $cnt ); } sleep 1; } trc 'dbg', 'stop main loop'; END { trc 'dbg', 'start to terminate ...'; # close all the worker foreach ( keys %{ $self{'worker'} } ) { kill_worker($_); } trc 'dbg', 'terminated'; # now it is the time for loger exit $self{'stat'} = 'term'; # wait for the loger to flush the buffer sleep 1; } # ----------------------- # subroutine definitions # ----------------------- sub retrieve_data { # dummy data here # fetch from root server return 'dummy data, comming soon ...'; } sub get_req_data { # dummy data here # fetch the request data from miner's repo return { data => 'dummy data, comming soon ...', expi => time() + $cnf{'period'}, }; } sub trc (@) { if ( $_[0] eq 'dbg' ) { # shortcut return if !$cnf{'dbg'}; $msgq->enqueue( join ', ', scalar(localtime), threads->self()- +>tid(), @_ ); } else { $msgq->enqueue( join ', ', scalar(localtime), @_ ); } } sub init_srv { # print out the configuration for debug purpose if ( $cnf{'dbg'} ) { print '<configuration>', "\n"; foreach my $k ( sort keys %cnf ) { print $k, ': ', $cnf{$k}, "\n"; } print '</configuration>', "\n"; } # main thread my $srv = HTTP::Daemon->new( LocalPort => $cnf{'port'}, Reuse => 1, Listen => $cnf{'queue'}, Timeout => $cnf{'timeout'}, ) or die "can't create local socket: $@\n"; trc 'log', "Accepting connections on", $srv->sockhost() . ':' . $srv->sockport(); { $self{'sock'} = $srv; $self{'stat'} = 'busy', share( $self{'stat'} ); $self{'lock'} = 'lock', share( $self{'lock'} ); $self{'miner'} = &share( {} ); $self{'worker'} = &share( {} ); } } sub do_loger { while ( $self{'stat'} ne 'term' ) { while ( my $msg = $msgq->dequeue_nb() ) { print $msg, "\n"; } sleep 1; } # flush the buffer while ( my $msg = $msgq->dequeue_nb() ) { print $msg, "\n"; } } sub spawn_loger { if ( my $thread = threads->new( \&do_loger, @_ ) ) { trc 'dbg', 'new loger created'; $thread->detach; } else { # force to success, otherwise the service is not available die "new loger create failed"; } } sub do_miner { my $t = threads->self()->tid(); my @p = @_; trc 'dbg', 'do_miner'; $self{'miner'}{$t} = &share( {} ); # periodically check for ephemeris while ( $self{'stat'} ne 'done' ) { # Check for new ephemeris $self{'miner'}{$t}{'expi'} = time() + $cnf{'period'}; $self{'miner'}{$t}{'data'} = retrieve_data(@p); select undef, undef, undef, $cnf{'period'}; } trc 'dbg', 'miner gone'; } sub spawn_miner { if ( my $thread = threads->new( \&do_miner, @_ ) ) { trc 'dbg', 'new miner created'; $thread->detach; } else { trc 'dbg', 'new miner create failed'; # force to success, otherwise the service is not available die; } } sub do_worker { my $t = threads->self()->tid(); trc 'dbg', 'do_worker'; $self{'worker'}{$t} = &share( {} ); $self{'worker'}{$t}{'stat'} = 'idle'; $self{'worker'}{$t}{'comd'} = 'none'; my $c; ACCEPT: while ( $self{'worker'}{$t}{'comd'} ne 'kill' ) { { trc 'dbg', 'attempt to accept'; lock $self{'lock'}; next ACCEPT unless $c = $self{'sock'}->accept(); } $self{'worker'}{$t}{'stat'} = 'busy'; $self{'worker'}{$t}{'time'} = time(); trc 'log', "new connect accept", $c->peerhost() . ":" . $c->peerport(); if ( my $r = $c->get_request() ) { handle_req( $c, $r ); } $c->close(); $self{'worker'}{$t}{'stat'} = 'idle'; $self{'worker'}{$t}{'time'} = time(); $self{'worker'}{$t}{'work'}++; } delete $self{'worker'}{$t}; trc 'dbg', 'worker gone'; } sub spawn_worker { if ( my $thread = threads->new( \&do_worker ) ) { trc 'dbg', 'new worker created'; $thread->detach; } else { trc 'dbg', 'new worker create failed'; } } sub kill_worker { my ($t) = @_; trc 'dbg', 'kill worker', $t; if ( exists $self{'worker'}{$t} ) { $self{'worker'}{$t}{'comd'} = 'kill'; } } sub handle_req { my ( $c, $r ) = @_; trc 'dbg', 'new request', $r->as_string; if ( $r->method() eq 'GET' ) { if ( $r->uri()->path() eq '/' ) { my %query = $r->uri()->query_form(); if ( exists $query{'cmd'} ) { my $data; for ( $query{'cmd'} ) { /eph/i and do { $data = get_req_data(%query); last }; } if ($data) { my $res = HTTP::Response->new(200); my $exp = HTTP::Date::time2str( $data->{'expi'} ); $res->push_header( 'Expires' => $exp ); $res->push_header( 'Content-Type' => 'application/ +ubx' ); $res->content( $data->{'data'} ); $c->send_response($res); trc 'log', 'response', 'ok', 'data len', length $data->{'data'}; } else { $c->send_error(); trc 'log', 'response', 'error'; } } else { foreach my $f ( glob "$cnf{'doc'}/index.*" ) { trc 'dbg', 'send', $f; $c->send_file_response($f); last; } } } else { ( my $path = $r->uri()->path() ) =~ s|^/|$cnf{'doc'}/|; trc 'dbg', 'send', $path; $c->send_file_response($path); } } else { $c->send_error( 503, 'method' . $r->method() . 'not supported' + ); } }