kornerr has asked for the wisdom of the Perl Monks concerning the following question:

Hello. I want to write a TCP server which dispatches command queues to worker threads. I tried Thread::Queue and Thread::Queue::Any, but I can't pass IO::Socket objects over them. After reading http://www.nntp.perl.org/group/perl.ithreads/2008/02/msg1238.html I started to think that my design is wrong. So I ask you for guidence. What is the best way to accomplish a server where the main thread dispatches command queues to worker threads? So far I got this: http://sial.org/pbot/36818 Thanks.
  • Comment on Multithreaded server with shared sockets?

Replies are listed 'Best First'.
Re: Multithreaded server with shared sockets?
by Corion (Patriarch) on May 26, 2009 at 09:17 UTC

    I can't comment much on your design as you didn't post it here. You might want to post (the relevant parts of) your code in addition to your text. Put it in between <code>...</code> tags so it renders and downloads nicely.

    As for your design, why are you passing sockets to your worker threads? Just read all the information necessary for a job from the socket, and then pass that information to the worker. That way, you don't need to pass sockets around. Personally, I prefer nonblocking IO for doing socket communications, and I'd have one single thread dedicated to doing the socket IO, which basically maintains a set of buffers for each socket connection and fetches/dispatches/returns the data from the clients to the workers and back.

Re: Multithreaded server with shared sockets?
by zentara (Cardinal) on May 26, 2009 at 13:18 UTC
      Thanks for suggestions. I decided to do as Corion suggested.
      The main thread holds all client sockets, has input and output queue of type Thread::Queue for each worker thread. When input comes the main thread dispatches commands to corresponding input queue. Each of worker threads constantly monitors its own input queue. If it's not empty, it processes it and puts results to output queue. The main thread monitors all output queues, and when it's not empty it sends replies to clients.

      Multiplexing TCP server with Thread::Queue's:
      use strict; use warnings; use Config::General; use Config; BEGIN { $Config{useithreads} || die "Recompile Perl with threads support to run this program"; } use threads; use Thread::Queue; use IO::Socket; use IO::Select; use Time::HiRes qw(sleep); # Catch signals $SIG{$_} = \&quit for qw(HUP INT TERM); my $srvSock = new IO::Socket::INET( Proto => 'tcp', LocalPort => 4554, Listen => 20, ReuseAddr => 1) || die "Could not create server socket: $@"; my $readSet = new IO::Select; $readSet->add($srvSock); # Main loop # SenderID => Socket correspondence my %socks; # Switches' queues my %inputQueues : shared; my %outputQueues : shared; while (1) { # Accept connections and / or incoming requests my ($rhSet) = IO::Select->select($readSet, undef, undef, 0.1); foreach my $rh (@$rhSet) { # New connection if ($rh == $srvSock) { my $cliSock = $rh->accept(); $readSet->add($cliSock); # Add entry to socks hash my $senderID = $cliSock->peerhost() . ":" . $cliSock->peer +port(); $socks{$senderID} = $cliSock; print "$senderID connected\n"; } else { # Input from client my $buf; if (sysread($rh, $buf, 8192)) { # Remove all 'carrige return' symbols $buf =~ s/\r//g; my $senderID = $rh->peerhost() . ":" . $rh->peerport() +; print "Received from $senderID: [$buf]\n"; print $rh "You sent: [$buf]\n"; processInput($senderID, $buf); } # Client closed connection else { $readSet->remove($rh); my $senderID = $rh->peerhost() . ":" . $rh->peerport() +; # Remove entry from socks hash delete $socks{$senderID}; close($rh); print "$senderID disconnected\n"; } } } # Print outputQueues { lock(%outputQueues); for my $swIP (keys %outputQueues) { while ($outputQueues{$swIP}->pending()) { my $senderID = $outputQueues{$swIP}->dequeue(); my $reply = $outputQueues{$swIP}->dequeue(); my $cliSocket = $socks{$senderID}; print $cliSocket "$reply\n"; } } } } # Process incoming buffer # \param Sender ID # \param Buffer sub processInput { my ($senderID, $buf) = @_; my @cmds = split(/\n/, $buf); # Dispatch commands foreach my $cmd (@cmds) { if ($cmd =~ /^(\d+\.\d+\.\d+\.\d+) ?(.*)/) { my ($ip, $swcmd) = ($1, $2); # If there's no thread for switch $ip, create it { lock(%inputQueues); lock(%outputQueues); if (!$inputQueues{$ip}) { $inputQueues{$ip} = new Thread::Queue; $outputQueues{$ip} = new Thread::Queue; # Create the thread my ($thr) = threads->create(\&switchThread, $ip); $thr->detach(); } } # Dispatch command $inputQueues{$ip}->enqueue($senderID, $swcmd); } else { print "'$cmd' command is unsupported\n"; } } } # Quit application # \param UNIX signal name sub quit { my $signame = shift; print "Interrupted by $signame\n"; exit; } # Switch thread # \param Switch IP sub switchThread { my $ip = shift; # Time when we received our last command my $lastActiveTime = time(); #my $idle = $conf{idle_timeout} * 60; my $idle = 60; my ($quit, @cmds, $inputQueueRef, $outputQueueRef); { lock(%inputQueues); lock(%outputQueues); $inputQueueRef = \$inputQueues{$ip}; $outputQueueRef = \$outputQueues{$ip}; } print "Thread $ip spawned\n"; # Thread loop while (!$quit) { while ($$inputQueueRef->pending()) { my $senderID = $$inputQueueRef->dequeue(); my $cmd = $$inputQueueRef->dequeue(); print "Sender: $senderID Cmd: $cmd\n"; $$outputQueueRef->enqueue($senderID); $$outputQueueRef->enqueue("Reply $cmd"); } sleep(0.1); $quit = time() - $lastActiveTime > $idle; } # Delete thread's entry and its queue { lock(%inputQueues); lock(%outputQueues); delete $inputQueues{$ip}; delete $outputQueues{$ip}; } print "Thread $ip finished\n"; }