in reply to Re^3: Trying to thread a daemon with threads and Thread::Queue
in thread Trying to thread a daemon with threads and Thread::Queue

BrowserUK, an older post of yours on the subject (from 2006) enlightened me, and I was able to find out what the problem was and got a little progress. Thank you so much..
Re^5: multithreaded tcp listener with IO::Socket

You mentioned: "The main problem with this approach is that you have to ensure that the socket returned by accept() does not go out of scope (causing the socket to be closed), before the thread has had chance to reopen it. That's easily achieved by pushing a copy of the socket handle into a (non-shared) array in the accept thread, as well as queueing it."

So I went ahead and took a look at your sample code (threads::Server) from the next reply on that same thread, here:
Re^7: multithreaded tcp listener with IO::Socket

I realized you're using an associative array (hash) in the main accept thread to store the main handle and keep the socket open, and also using a seperate queue (Qclean) in another routine ( while($Qclean->pending) ) to close the socket once the thread is done with it, iiuc..
Only problem is that the "close delete $hash{$fno}" line didn't work for me, leaving the connection on the client side open, so instead I tried:
close $hash{ $fno }; delete $hash{ $fno };
And that worked, the request went through. But... The process just mysteriously died!!! No errors, no warnings in the log, nothing!

I tried taking out the line "shutdown $socket, 2;" from the end of the handleConnection() sub, and reran it, then the process doesn't die for each request, but also the client still doesn't close :(

Removed the line "shutdown $socket, 2;" from the sub, and added "shutdown $sockets{ $fileno }, 2;" right before the "close $sockets{ $fileno };" line in the main accept thread, then the process doesn't die after a request, but also leaves the client open :| I'm puzzled...
Any ideas on this one?

Below is my full code (revised and cleaned of some minor bugs).
#!/usr/bin/perl -w use strict; use warnings; use IO::Socket; use threads; use Thread::Queue; use POSIX qw(setsid); use Proc::PID::File; use Log::Dispatch; use Log::Dispatch::File; use Date::Format; use File::Spec; use FindBin qw($Bin); sub dienice ($); ### ### Change default configuration here if needed ### # Our server port (Any number between 1024 and 65535) my $port = 9999; my $timeout = 60; # Number of listener threads to spawn # (2 or 3 threads are sufficient to handle 100 concurrent connections +since our duty cycle is a few milliseconds) my $listeners = 3; # Want to log connections? 1 = yes, 0 = no my $log_connections = 1; # Want to log if script is executed when it's already running? # (You might want to set it to 0 if you run periodically from cron - t +oo redundant) my $log_is_running = 1; # # Do not change the following unless you know what you're doing!!! # my $content = '<?xml version="1.0"?><cross-domain-policy><allow-access +-from domain="*" to-ports="' . $port . '" /></cross-domain-policy>'; my $NULLBYTE = pack( 'c', 0 ); ### ### Detect our base directory where we create the log and pid files ### our $BASE_DIR = $Bin; # Get script name, chop off any preceding path it was called with and +chop off its extension (ex: 'some/dir/script.pl' becomes 'script') our $ME = $0; $ME =~ s|.*/||; $ME =~ s|\..*||; our $LOG_FILE = "$ME.log"; ### ### Setup a logging agent ### my $log = new Log::Dispatch( callbacks => sub { my %h=@_; return Date::Format::time2str('%B %e +%T', time)." $ME\[$$]: ".$h{message}."\n"; } ); $log->add( Log::Dispatch::File->new( name => 'file1', min_level => 'warning', mode => 'append', filename => File::Spec->catfil +e($BASE_DIR, $LOG_FILE), ) ); ### ### Fork and background daemon process ### startDaemon(); $log->warning("Logging Started"); ### ### Setup signal handlers to give us time to cleanup (and log) before +shutting down ### my $running = 1; $SIG{HUP} = sub { $log->warning("Caught SIGHUP: exiting gracefully") +; $running = 0; }; $SIG{INT} = sub { $log->warning("Caught SIGINT: exiting gracefully") +; $running = 0; }; $SIG{QUIT} = sub { $log->warning("Caught SIGQUIT: exiting gracefully" +); $running = 0; }; $SIG{TERM} = sub { $log->warning("Caught SIGTERM: exiting gracefully" +); $running = 0; }; my $Q = Thread::Queue->new; my $Qclean = Thread::Queue->new; ### ### As long as the daemon is running, listen for and handle received c +onnections ### while ($running) { ### ### Spawn our listener threads and detach them since we don't want + return values and don't to wait for them to finish ### "detach" also allows automatic cleanup of the thread and recyc +les its memory ### for (1..$listeners) { threads->create(\&handleConnection)->detach; } ### ### BEGIN LISTENING ### my $sock = new IO::Socket::INET( LocalPort => $port, Proto => 'tcp', Listen => SOMAXCONN, ReuseAddr => 1); $sock or dienice("Socket error :$!"); $log->warning("Listening on port $port"); ### ### Handle connections ### my %sockets = (); while ( my ($new_sock, $clientAddr) = $sock->accept() ) { my ( $clientPort, $clientIp ) = sockaddr_in( $clientAddr ); # put the connection on the queue for a reader thread my $fno = fileno($new_sock); $sockets{ $fno } = $new_sock; $Q->enqueue( "$fno\0$clientIp" ); while( $Qclean->pending ) { my $fileno = $Qclean->dequeue(); close $sockets{ $fileno }; delete $sockets{ $fileno }; } } } ### ### Mark a clean exit in the log ### $log->warning("Logging Stopped"); ### ### startDaemon ### sub startDaemon { # fork a child process and have the parent process exit to disasso +ciate the process from controlling terminal or login shell defined(my $pid = fork) or dienice("Can't fork: $!"); exit if $pid; # setsid turns the process into a session and group leader to ensu +re our process doesn't have a controlling terminal POSIX::setsid() or dienice("Can't start a new session: $!"); # Get a PID file - or exit without error in case we're running per +iodically from cron # if ( Proc::PID::File->running(dir => "$BASE_DIR", name => "$ME", +verify => "1") ) # { # $log->warning("Daemon Already Running!") if ($log_is_running) +; # exit(0); # } } ### ### handleConnection ### sub handleConnection { my $tid = threads->tid(); $log->warning("Thread ($tid) started"); while ( my $work = $Q->dequeue() ) { my ( $fno, $clientIp ) = split ( chr(0), $work ); my $clientIpStr = inet_ntoa( $clientIp ); local $/ = $NULLBYTE; open my $socket, '+<&='.$fno or dienice("Thread ($tid) could n +ot reopen socket: $!"); my $request; if ( defined ( $request = <$socket> ) ) { chomp $request; if ( $request eq '<policy-file-request/>' ) { $log->warning("($tid) : XML Policy file request from: +$clientIpStr") if ($log_connections); print $socket $content.$NULLBYTE; } elsif ( $request =~ /<request>getmyip<\/request>/ ) { $log->warning("($tid) : XML IP request from: $clientIp +Str") if ($log_connections); print $socket "<data><ip>$clientIpStr</ip></data>".$NU +LLBYTE; } else { $log->warning("($tid) : Ignoring unrecognized request +from: $clientIpStr") if ($log_connections); } } else { $log->warning("($tid) : Ignoring NULL connection from: $cl +ientIpStr") if ($log_connections); } shutdown $socket, 2; close $socket; $Qclean->enqueue( $fno ); } return 1; } ### ### dienice ### sub dienice ($) { my ($package, $filename, $line) = caller; # write die messages to the log before die'ing $log->critical("$_[0] at line $line in $filename"); die $_[0]; }

Replies are listed 'Best First'.
Re^5: Trying to thread a daemon with threads and Thread::Queue
by BrowserUk (Patriarch) on Aug 29, 2008 at 04:25 UTC
    BrowserUK, an older post of yours on the subject (from 2006) enlightened me, and I was able to find out what the problem was and solved my issues. Thank you so much..

    Gah! You're too generous. I should have remembered and recognised that problem and saved you a couple of days. I'm glad you've achieved your goal.

    However, I still fail to see how spawning a new thread, or just transfering the socket to a thread pool is going to operate more quickly than just printing one short line, closing the connection and going back to the accept. Whilst a single push to a queue is going to be faster than IO to a socket, given that you're now setting up the listener queue, the tcp stack should be able to buffer the connections long enough to make the difference moot.

    Did you do any soak testing yet?


    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.
      Bleh! I updated my post after I realized another problem, and found you already replied back :|
      I can't do any testing yet, due to the problem I described in my previous (updated) reply..

      I am not really concerned about the speed of the response of the threaded version of this script vs. the non-threaded version. The whole idea was to allow handling multiple simultaneous connections/requests, in case one client is really lazy or slow, I don't want it to delay other clients/requests for several seconds.

      Still, I am curious to test it and see the difference, once I get it working :)