in reply to Re^3: Trying to thread a daemon with threads and Thread::Queue
in thread Trying to thread a daemon with threads and Thread::Queue

Meh, I just went with a totally different route..

1. Setting up IO::Socket::INET in main, and passing the $main_sock to a pool on pre-forked threads which all handle the accept()

2. Multiplexing with IO::Select in each thread to avoid the blocking nature of the accept() call. This provides us the advantage to pickup the faster client (the one who's able to send the request faster) and avoid a lazy client who takes their sweet time before they send their request (in the case of simultaneous connections/bursts)

I mentioned previously that I simulated a pair of sequential connections a 100 times with the non-threaded/non-select script which took about 1.990 seconds.

Doing the same simulation test on the new version of the script (a pre-forked pool of 3 threads and using the select method in each thread) took about 2.063 seconds.

Therefore, IMHO, the overhead is very minimal, and quite a fair trade off for having the ability to handle multiple clients simultaneously.

Only if I am wasn't so tired now, I would've quickly coded a script using threads to simulate maybe 20 or 30 simultaneous (asynchronous) connections a few times in a row and compare the results of the two versions of the script, even though I'm sure the threaded/non-blocking version would be way faster.

Here's my new threaded version of the script. Comments/Suggestions welcome..
#!/usr/bin/perl -w use strict; use warnings; use IO::Socket; use IO::Select; use threads; use POSIX qw(setsid); use Proc::PID::File; use Log::Dispatch; use Log::Dispatch::File; use Date::Format; use File::Spec; use FindBin qw($Bin); sub dienice ($); ### ### Change default configuration here if needed ### # Our server port (Any number between 1024 and 65535) my $port = 9999; # Number of listener threads to spawn # (2 or 3 threads are sufficient to handle 100 concurrent connections +since our duty cycle is a few milliseconds) my $listeners = 3; # Want to log connections? 1 = yes, 0 = no my $log_connections = 1; # Want to log if script is executed when it's already running? # (You might want to set it to 0 if you run periodically from cron - t +oo redundant) my $log_is_running = 1; # # Do not change the following unless you know what you're doing!!! # my $content = '<?xml version="1.0"?><cross-domain-policy><allow-access +-from domain="*" to-ports="' . $port . '" /></cross-domain-policy>'; my $NULLBYTE = pack( 'c', 0 ); ### ### Detect our base directory where we create the log and pid files ### our $BASE_DIR = $Bin; # Get script name, chop off any preceding path it was called with and +chop off its extension (ex: 'some/dir/script.pl' becomes 'script') our $ME = $0; $ME =~ s|.*/||; $ME =~ s|\..*||; our $LOG_FILE = "$ME.log"; ### ### Setup a logging agent ### my $log = new Log::Dispatch( callbacks => sub { my %h=@_; return Date::Format::time2str('%B %e +%T', time)." $ME\[$$]: ".$h{message}."\n"; } ); $log->add( Log::Dispatch::File->new( name => 'file1', min_level => 'warning', mode => 'append', filename => File::Spec->catfil +e($BASE_DIR, $LOG_FILE), ) ); ### ### Fork and background daemon process ### startDaemon(); $log->warning("Logging Started"); ### ### Setup signal handlers to give us time to cleanup (and log) before +shutting down ### my $running = 1; $SIG{HUP} = sub { $log->warning("Caught SIGHUP: exiting gracefully") +; $running = 0; }; $SIG{INT} = sub { $log->warning("Caught SIGINT: exiting gracefully") +; $running = 0; }; $SIG{QUIT} = sub { $log->warning("Caught SIGQUIT: exiting gracefully" +); $running = 0; }; $SIG{TERM} = sub { $log->warning("Caught SIGTERM: exiting gracefully" +); $running = 0; }; $SIG{PIPE} = sub { $log->warning("Caught SIGPIPE (Ignoring): $!"); $r +unning = 1; }; if ( $running == 1 ) { ### ### BEGIN LISTENING ### my $main_sock = new IO::Socket::INET( LocalPort => $port, Proto => 'tcp', Listen => SOMAXCONN, ReuseAddr => 1); $main_sock or dienice("Socket error :$!"); $log->warning("Listening on port $port"); ### ### Spawn our listener threads and detach them since we don't want + return values and don't to wait for them to finish ### "detach" also allows automatic cleanup of the thread and recyc +les its memory ### for (1..$listeners) { threads->create(\&handleConnection, $main_sock)->detach; } while ($running) { # Keep running } } ### ### Mark a clean exit in the log ### $log->warning("Logging Stopped"); ### ### startDaemon ### sub startDaemon { # fork a child process and have the parent process exit to disasso +ciate the process from controlling terminal or login shell defined(my $pid = fork) or dienice("Can't fork: $!"); exit if $pid; # setsid turns the process into a session and group leader to ensu +re our process doesn't have a controlling terminal POSIX::setsid() or dienice("Can't start a new session: $!"); # Get a PID file - or exit without error in case we're running per +iodically from cron # if ( Proc::PID::File->running(dir => "$BASE_DIR", name => "$ME", +verify => "1") ) # { # $log->warning("Daemon Already Running!") if ($log_is_running) +; # exit(0); # } } ### ### handleConnection ### sub handleConnection { my $main_sock = shift; my $tid = threads->tid(); $log->warning("Thread ($tid) started"); # Multiplexing Using select (See: http://www.unix.com.ua/orelly/pe +rl/advprog/ch12_03.htm) my $s = new IO::Select( $main_sock ); my ($new_sock, $sock, $fno, $clientAddr, $clientPort, $clientIp, $ +clientIpStr, $request); my %clients = (); my @ready = (); while ( @ready = $s->can_read ) { foreach $new_sock (@ready) { if ($new_sock == $main_sock) { ($sock, $clientAddr) = $main_sock->accept; ( $clientPort, $clientIp ) = sockaddr_in( $clientAddr +); $clientIpStr = inet_ntoa( $clientIp ); $fno = fileno($sock); $clients{ $fno } = $clientIpStr; $s->add($sock); } else { $fno = fileno($new_sock); $clientIpStr = $clients{ $fno }; delete $clients{ $fno }; local $/ = $NULLBYTE; if ( defined ( $request = <$new_sock> ) ) { chomp $request; if ( $request eq '<policy-file-request/>' ) { $log->warning("($tid) : XML Policy file reques +t from: $clientIpStr") if ($log_connections); print $new_sock $content.$NULLBYTE; } elsif ( $request =~ /<request>getmyip<\/request> +/ ) { $log->warning("($tid) : XML IP request from: $ +clientIpStr") if ($log_connections); print $new_sock "<data><ip>$clientIpStr</ip></ +data>".$NULLBYTE; } else { $log->warning("($tid) : Ignoring unrecognized +request from: $clientIpStr") if ($log_connections); } } $s->remove($new_sock); close $new_sock; } } } } ### ### dienice ### sub dienice ($) { my ($package, $filename, $line) = caller; # write die messages to the log before die'ing $log->critical("$_[0] at line $line in $filename"); die $_[0]; }

Replies are listed 'Best First'.
Re^5: Trying to thread a daemon with threads and Thread::Queue
by BrowserUk (Patriarch) on Aug 30, 2008 at 14:55 UTC

    This empty loop in your main thread:

    while ($running) { # Keep running }

    will consume as much cpu as the scheduler can give it. In essence, this means that when the main thread gets a timeslice, it will consume 100% of your cpu for its entire timeslice, doing absolutely nothing and preventing any other threads or processes getting a look in (unless you have multiple cores). Even if you have mutliple cores, this would just be 'busy work' consuming resources and power for no good reason.

    At the very least you should slow that loop down:

    sleep 1 while $running;

    Beyond that, what you have seems way too complex--mixing as it does fork, threads and select--and I seriously doubt that it will perform any better than a simple, single-threaded process. Of course, you can run your tests and 'prove' me wrong and I cannot argue with you. And after all, it's your code and you will have to maintain it.


    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.
      the empty loop in the main thread:
      while ($running) { # Keep running }
      was to keep the script from ending (or code falling off the end of the script) soon after it spawns the threads... guess you're right..

      So what do you suggest I do to prevent the script from ending after the main thread spawns the threads ? sleep while ($running); ?
        So what do you suggest I do to prevent the script from ending after the main thread spawns the threads ? sleep while ($running); ?

        That is the simplest option.

        Better would be to not detach your threads and use join:

        my @threads; for (1..$listeners) { push @threads, threads->create(\&handleConnection, $main_sock) +; } $_->join for @threads;

        But for that to work, you need to ensure that all your threads will terminate at the appropriate time. That means making your $running variable shared, and then ensuring that it will be checked and acted upon within your select loop. And for that to work, you need to ensure that it will be check regularly, and not only when the select returns some IO activity, and that means using a timeout on the select. In turn, that will probably require making your sockets non-blocking.


        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.