in reply to Re: Multithreaded server with shared sockets?
in thread Multithreaded server with shared sockets?

Thanks for suggestions. I decided to do as Corion suggested.
The main thread holds all client sockets, has input and output queue of type Thread::Queue for each worker thread. When input comes the main thread dispatches commands to corresponding input queue. Each of worker threads constantly monitors its own input queue. If it's not empty, it processes it and puts results to output queue. The main thread monitors all output queues, and when it's not empty it sends replies to clients.

Multiplexing TCP server with Thread::Queue's:
use strict; use warnings; use Config::General; use Config; BEGIN { $Config{useithreads} || die "Recompile Perl with threads support to run this program"; } use threads; use Thread::Queue; use IO::Socket; use IO::Select; use Time::HiRes qw(sleep); # Catch signals $SIG{$_} = \&quit for qw(HUP INT TERM); my $srvSock = new IO::Socket::INET( Proto => 'tcp', LocalPort => 4554, Listen => 20, ReuseAddr => 1) || die "Could not create server socket: $@"; my $readSet = new IO::Select; $readSet->add($srvSock); # Main loop # SenderID => Socket correspondence my %socks; # Switches' queues my %inputQueues : shared; my %outputQueues : shared; while (1) { # Accept connections and / or incoming requests my ($rhSet) = IO::Select->select($readSet, undef, undef, 0.1); foreach my $rh (@$rhSet) { # New connection if ($rh == $srvSock) { my $cliSock = $rh->accept(); $readSet->add($cliSock); # Add entry to socks hash my $senderID = $cliSock->peerhost() . ":" . $cliSock->peer +port(); $socks{$senderID} = $cliSock; print "$senderID connected\n"; } else { # Input from client my $buf; if (sysread($rh, $buf, 8192)) { # Remove all 'carrige return' symbols $buf =~ s/\r//g; my $senderID = $rh->peerhost() . ":" . $rh->peerport() +; print "Received from $senderID: [$buf]\n"; print $rh "You sent: [$buf]\n"; processInput($senderID, $buf); } # Client closed connection else { $readSet->remove($rh); my $senderID = $rh->peerhost() . ":" . $rh->peerport() +; # Remove entry from socks hash delete $socks{$senderID}; close($rh); print "$senderID disconnected\n"; } } } # Print outputQueues { lock(%outputQueues); for my $swIP (keys %outputQueues) { while ($outputQueues{$swIP}->pending()) { my $senderID = $outputQueues{$swIP}->dequeue(); my $reply = $outputQueues{$swIP}->dequeue(); my $cliSocket = $socks{$senderID}; print $cliSocket "$reply\n"; } } } } # Process incoming buffer # \param Sender ID # \param Buffer sub processInput { my ($senderID, $buf) = @_; my @cmds = split(/\n/, $buf); # Dispatch commands foreach my $cmd (@cmds) { if ($cmd =~ /^(\d+\.\d+\.\d+\.\d+) ?(.*)/) { my ($ip, $swcmd) = ($1, $2); # If there's no thread for switch $ip, create it { lock(%inputQueues); lock(%outputQueues); if (!$inputQueues{$ip}) { $inputQueues{$ip} = new Thread::Queue; $outputQueues{$ip} = new Thread::Queue; # Create the thread my ($thr) = threads->create(\&switchThread, $ip); $thr->detach(); } } # Dispatch command $inputQueues{$ip}->enqueue($senderID, $swcmd); } else { print "'$cmd' command is unsupported\n"; } } } # Quit application # \param UNIX signal name sub quit { my $signame = shift; print "Interrupted by $signame\n"; exit; } # Switch thread # \param Switch IP sub switchThread { my $ip = shift; # Time when we received our last command my $lastActiveTime = time(); #my $idle = $conf{idle_timeout} * 60; my $idle = 60; my ($quit, @cmds, $inputQueueRef, $outputQueueRef); { lock(%inputQueues); lock(%outputQueues); $inputQueueRef = \$inputQueues{$ip}; $outputQueueRef = \$outputQueues{$ip}; } print "Thread $ip spawned\n"; # Thread loop while (!$quit) { while ($$inputQueueRef->pending()) { my $senderID = $$inputQueueRef->dequeue(); my $cmd = $$inputQueueRef->dequeue(); print "Sender: $senderID Cmd: $cmd\n"; $$outputQueueRef->enqueue($senderID); $$outputQueueRef->enqueue("Reply $cmd"); } sleep(0.1); $quit = time() - $lastActiveTime > $idle; } # Delete thread's entry and its queue { lock(%inputQueues); lock(%outputQueues); delete $inputQueues{$ip}; delete $outputQueues{$ip}; } print "Thread $ip finished\n"; }