Thanks for suggestions. I decided to do as Corion suggested.
The main thread holds all client sockets, has input and output queue of type Thread::Queue for each worker thread. When input comes the main thread dispatches commands to corresponding input queue. Each of worker threads constantly monitors its own input queue. If it's not empty, it processes it and puts results to output queue. The main thread monitors all output queues, and when it's not empty it sends replies to clients.
Multiplexing TCP server with Thread::Queue's:
use strict;
use warnings;
use Config::General;
use Config;
BEGIN {
$Config{useithreads} ||
die "Recompile Perl with threads support to run this program";
}
use threads;
use Thread::Queue;
use IO::Socket;
use IO::Select;
use Time::HiRes qw(sleep);
# Catch signals
$SIG{$_} = \&quit for qw(HUP INT TERM);
my $srvSock = new IO::Socket::INET( Proto => 'tcp',
LocalPort => 4554,
Listen => 20,
ReuseAddr => 1) ||
die "Could not create server socket: $@";
my $readSet = new IO::Select;
$readSet->add($srvSock);
# Main loop
# SenderID => Socket correspondence
my %socks;
# Switches' queues
my %inputQueues : shared;
my %outputQueues : shared;
while (1) {
# Accept connections and / or incoming requests
my ($rhSet) = IO::Select->select($readSet, undef, undef, 0.1);
foreach my $rh (@$rhSet) {
# New connection
if ($rh == $srvSock) {
my $cliSock = $rh->accept();
$readSet->add($cliSock);
# Add entry to socks hash
my $senderID = $cliSock->peerhost() . ":" . $cliSock->peer
+port();
$socks{$senderID} = $cliSock;
print "$senderID connected\n";
}
else {
# Input from client
my $buf;
if (sysread($rh, $buf, 8192)) {
# Remove all 'carrige return' symbols
$buf =~ s/\r//g;
my $senderID = $rh->peerhost() . ":" . $rh->peerport()
+;
print "Received from $senderID: [$buf]\n";
print $rh "You sent: [$buf]\n";
processInput($senderID, $buf);
}
# Client closed connection
else {
$readSet->remove($rh);
my $senderID = $rh->peerhost() . ":" . $rh->peerport()
+;
# Remove entry from socks hash
delete $socks{$senderID};
close($rh);
print "$senderID disconnected\n";
}
}
}
# Print outputQueues
{
lock(%outputQueues);
for my $swIP (keys %outputQueues) {
while ($outputQueues{$swIP}->pending()) {
my $senderID = $outputQueues{$swIP}->dequeue();
my $reply = $outputQueues{$swIP}->dequeue();
my $cliSocket = $socks{$senderID};
print $cliSocket "$reply\n";
}
}
}
}
# Process incoming buffer
# \param Sender ID
# \param Buffer
sub processInput {
my ($senderID, $buf) = @_;
my @cmds = split(/\n/, $buf);
# Dispatch commands
foreach my $cmd (@cmds) {
if ($cmd =~ /^(\d+\.\d+\.\d+\.\d+) ?(.*)/) {
my ($ip, $swcmd) = ($1, $2);
# If there's no thread for switch $ip, create it
{
lock(%inputQueues);
lock(%outputQueues);
if (!$inputQueues{$ip}) {
$inputQueues{$ip} = new Thread::Queue;
$outputQueues{$ip} = new Thread::Queue;
# Create the thread
my ($thr) = threads->create(\&switchThread, $ip);
$thr->detach();
}
}
# Dispatch command
$inputQueues{$ip}->enqueue($senderID, $swcmd);
}
else {
print "'$cmd' command is unsupported\n";
}
}
}
# Quit application
# \param UNIX signal name
sub quit {
my $signame = shift;
print "Interrupted by $signame\n";
exit;
}
# Switch thread
# \param Switch IP
sub switchThread {
my $ip = shift;
# Time when we received our last command
my $lastActiveTime = time();
#my $idle = $conf{idle_timeout} * 60;
my $idle = 60;
my ($quit, @cmds, $inputQueueRef, $outputQueueRef);
{
lock(%inputQueues);
lock(%outputQueues);
$inputQueueRef = \$inputQueues{$ip};
$outputQueueRef = \$outputQueues{$ip};
}
print "Thread $ip spawned\n";
# Thread loop
while (!$quit) {
while ($$inputQueueRef->pending()) {
my $senderID = $$inputQueueRef->dequeue();
my $cmd = $$inputQueueRef->dequeue();
print "Sender: $senderID Cmd: $cmd\n";
$$outputQueueRef->enqueue($senderID);
$$outputQueueRef->enqueue("Reply $cmd");
}
sleep(0.1);
$quit = time() - $lastActiveTime > $idle;
}
# Delete thread's entry and its queue
{
lock(%inputQueues);
lock(%outputQueues);
delete $inputQueues{$ip};
delete $outputQueues{$ip};
}
print "Thread $ip finished\n";
}
|