use strict; use warnings; use Config::General; use Config; BEGIN { $Config{useithreads} || die "Recompile Perl with threads support to run this program"; } use threads; use Thread::Queue; use IO::Socket; use IO::Select; use Time::HiRes qw(sleep); # Catch signals $SIG{$_} = \&quit for qw(HUP INT TERM); my $srvSock = new IO::Socket::INET( Proto => 'tcp', LocalPort => 4554, Listen => 20, ReuseAddr => 1) || die "Could not create server socket: $@"; my $readSet = new IO::Select; $readSet->add($srvSock); # Main loop # SenderID => Socket correspondence my %socks; # Switches' queues my %inputQueues : shared; my %outputQueues : shared; while (1) { # Accept connections and / or incoming requests my ($rhSet) = IO::Select->select($readSet, undef, undef, 0.1); foreach my $rh (@$rhSet) { # New connection if ($rh == $srvSock) { my $cliSock = $rh->accept(); $readSet->add($cliSock); # Add entry to socks hash my $senderID = $cliSock->peerhost() . ":" . $cliSock->peer +port(); $socks{$senderID} = $cliSock; print "$senderID connected\n"; } else { # Input from client my $buf; if (sysread($rh, $buf, 8192)) { # Remove all 'carrige return' symbols $buf =~ s/\r//g; my $senderID = $rh->peerhost() . ":" . $rh->peerport() +; print "Received from $senderID: [$buf]\n"; print $rh "You sent: [$buf]\n"; processInput($senderID, $buf); } # Client closed connection else { $readSet->remove($rh); my $senderID = $rh->peerhost() . ":" . $rh->peerport() +; # Remove entry from socks hash delete $socks{$senderID}; close($rh); print "$senderID disconnected\n"; } } } # Print outputQueues { lock(%outputQueues); for my $swIP (keys %outputQueues) { while ($outputQueues{$swIP}->pending()) { my $senderID = $outputQueues{$swIP}->dequeue(); my $reply = $outputQueues{$swIP}->dequeue(); my $cliSocket = $socks{$senderID}; print $cliSocket "$reply\n"; } } } } # Process incoming buffer # \param Sender ID # \param Buffer sub processInput { my ($senderID, $buf) = @_; my @cmds = split(/\n/, $buf); # Dispatch commands foreach my $cmd (@cmds) { if ($cmd =~ /^(\d+\.\d+\.\d+\.\d+) ?(.*)/) { my ($ip, $swcmd) = ($1, $2); # If there's no thread for switch $ip, create it { lock(%inputQueues); lock(%outputQueues); if (!$inputQueues{$ip}) { $inputQueues{$ip} = new Thread::Queue; $outputQueues{$ip} = new Thread::Queue; # Create the thread my ($thr) = threads->create(\&switchThread, $ip); $thr->detach(); } } # Dispatch command $inputQueues{$ip}->enqueue($senderID, $swcmd); } else { print "'$cmd' command is unsupported\n"; } } } # Quit application # \param UNIX signal name sub quit { my $signame = shift; print "Interrupted by $signame\n"; exit; } # Switch thread # \param Switch IP sub switchThread { my $ip = shift; # Time when we received our last command my $lastActiveTime = time(); #my $idle = $conf{idle_timeout} * 60; my $idle = 60; my ($quit, @cmds, $inputQueueRef, $outputQueueRef); { lock(%inputQueues); lock(%outputQueues); $inputQueueRef = \$inputQueues{$ip}; $outputQueueRef = \$outputQueues{$ip}; } print "Thread $ip spawned\n"; # Thread loop while (!$quit) { while ($$inputQueueRef->pending()) { my $senderID = $$inputQueueRef->dequeue(); my $cmd = $$inputQueueRef->dequeue(); print "Sender: $senderID Cmd: $cmd\n"; $$outputQueueRef->enqueue($senderID); $$outputQueueRef->enqueue("Reply $cmd"); } sleep(0.1); $quit = time() - $lastActiveTime > $idle; } # Delete thread's entry and its queue { lock(%inputQueues); lock(%outputQueues); delete $inputQueues{$ip}; delete $outputQueues{$ip}; } print "Thread $ip finished\n"; }
In reply to Re^2: Multithreaded server with shared sockets?
by kornerr
in thread Multithreaded server with shared sockets?
by kornerr
| For: | Use: | ||
| & | & | ||
| < | < | ||
| > | > | ||
| [ | [ | ||
| ] | ] |