PhillyR has asked for the wisdom of the Perl Monks concerning the following question:

I've created a proxy script to receive 2 streams of data and forward to multiple clients (each client picks which stream they want). I'm having a problem (most likely some blocking) when both streams are active at the same time. My proxy listens for connections from the server sending stream1. My proxy initiates a connection to a second server for stream2 when a client asks for stream2. Confusing? Excellent. Onto the code:
#!/usr/bin/perl -w use IO::Socket; use IO::Select; $stream1_prt = 5555; $stream2_prt = 5556; $client1_prt = 1100; $client2_prt = 1101; $my_ip = '192.168.1.12'; # Create Stream 1 listen $stream1_lsn = new IO::Socket::INET (LocalAddr => $my_ip, LocalPort => $stream1_prt, Type => SOCK_STREAM, Proto => "tcp", Reuse => 1, Blocking => 0, #part of issue? Listen => 5) or die "Stream 1 listen socket couldn't be create: $@\n"; # Create Client 1 listen $client1_lsn = new IO::Socket::INET (LocalAddr => $my_ip, LocalPort => $client1_prt, Type => SOCK_STREAM, Proto => "tcp", Reuse => 1, Blocking => 0, #part of issue? Listen => 5) or die "Client 1 listen socket couldn't be create: $@\n"; # Create Client 2 listen $client2_lsn = new IO::Socket::INET (LocalAddr => $my_ip, LocalPort => $client2_prt, Type => SOCK_STREAM, Proto => "tcp", Reuse => 1, Blocking => 0, #part of issue? Listen => 5) or die "Client 2 listen socket couldn't be create: $@\n"; # Add listen sockets to select $sockets = new IO::Select(); $sockets->add($stream1_lsn); $sockets->add($client1_lsn); $sockets->add($client2_lsn); #Go into infinite loop, handling connections - how do I end this? while(@ready = $sockets->can_read) { #got data foreach $socket (@ready) { # find which socket sent data $sck_prt = $socket->sockport; $pr_prt = $socket->peerport; #used by sockets sending data #New Stream1 if ($socket == $stream1_lsn) { $stream1_sock = $stream1_lsn->accept(); #accept stream 1 $sockets->add($stream1_sock); #add socket to select $stream1_hst = $stream1_sock->peerhost; print "New stream1 from IP [$stream1_hst]\n"; } #New client1 elsif ($socket == $client1_lsn) { $client1_sock = $client1_lsn->accept(); #accept client 1 $sockets->add($client1_sock); #add socket to select $client1_hst = $client1_sock->peerhost; print "New client for stream 1 @ IP [$client1_hst]\n"; } #New client2 elsif ($socket == $client2_lsn) { $client2_sock = $client2_lsn->accept(); #accept client 2 $sockets->add($client2_sock); #add socket to select $client2_hst = $client2_sock->peerhost; print "New client for stream 2 @ IP [$client2_hst]\n"; #Connection to second server running locally $stream2_sock = new IO::Socket::INET( PeerAddr => 'localhost', PeerPort => $stream2_prt, Type => SOCK_STREAM, Proto => "tcp") or die "Stream2 socket couldn't be created: $@\n"; $sockets->add($stream2_sock); #add socket to select } #Stream2 data to all client2's elsif(defined($stream2_sock)) { #may not be initiated yet if($socket == $stream2_sock) { # NEXT LINE IS BLOCKING --------------------------- if(defined($buf1=<$socket>)) { #put stream2 into buf1 @writers1 = $sockets->can_write; foreach writer1 (@writers1) { $wrt_sp1 = $writer1->sockport; if ($wrt_sp1 == $client2_prt) { print $writer1 $buf1; #send stream2 to each client2 } } } } #Stream1 data to all Client1's elsif(defined($pr_prt)) { #used this port due to multiple $stream1_sock connections #THIS LINE DOESN'T EXCUTE IF STREAM2 IS ACTIVE ----------- if(defined($buf=<$socket>)) { #put stream1 into buffer @writers = $sockets->can_write; foreach $writer (@writers) { $wrt_sp = $writer->sockport; if($sck_prt == $stream1_prt && $wrt_sp == $client1_prt) { print $writer $buf; #send stream1 to all client1's } } } } else { print "Closing socket\n"; #Needed if clients close connection $sockets->remove($socket); close($socket); } } }
When I connect a client1 to the proxy it connects and waits for stream1. When stream1 server initates the connection, stream1 starts flowing to client1. Later when client2 connects, the proxy initiates the connection to the second server and stream2 starts. Stream2 goes to client2 but stream1 stops. I need stream1 to continue to flow. What am I doing wrong?

Replies are listed 'Best First'.
Re: Problem handling 2 simultaneous socket streams
by BrowserUk (Patriarch) on Sep 15, 2011 at 14:55 UTC
    # NEXT LINE IS BLOCKING --------------------------- if(defined($buf1=<$socket>)) { #put stream2 into buf1

    Using buffered IO with non-blocking sockets is a no-no. readline() (<>) won't return until it sees the current delimiter (typically "\n"); if the other end sends data without a delimiter, it will block; if the other end never gets around to sending a delimiter, then it will block forever.

    The solution is to use read, sysread or recv, but then you'll need to buffer and track the delimiters yourself, which gets awfully messy.

    The whole thing would be far simpler using threads.


    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.
    A reply falls below the community's threshold of quality. You may see it by logging in.
Re: Problem handling 2 simultaneous socket streams
by ikegami (Patriarch) on Sep 15, 2011 at 14:55 UTC

    I started writing a mini-tutorial explaining why readline (aka <$fh>) interacts badly with select.

    On that same line, your use of can_write is broken. 1) It can also block. 2) It might not return all sockets, in which case you'll suffer data loss. I can't go into details right now, so I'll push you in the right direction: You need to use an output buffer for each writer socket and use IO::Select::select($readers, $writers, undef) where you currently use can_read.

    It's much easier to do using threads.

Re: Problem handling 2 simultaneous socket streams
by zentara (Cardinal) on Sep 15, 2011 at 15:26 UTC
    You might get some ideas from testing Simple threaded chat server. It has a simple method for making the chat echo to all clients, and you can probably modify it for your purposes manipulating the @clients list, or splitting @clients into 2, @clients1 and @clients2. It's not bulletproof code, but does demonstrate the flow thru threads.

    I'm not really a human, but I play one on earth.
    Old Perl Programmer Haiku ................... flash japh
      Thanks for the advice. I switched over to using a combination of select and threads for managing the separate incoming streams and it is working properly.
      How would I go about adding in reading STDIN to allow a user to type quit to end this program? I have unsuccessfully tried creating a new thread (prior to the while loop below) that monitored STDIN. Once again I was blocked.
      #!/usr/bin/perl -w # ----------------------------- use warnings; use strict; use IO::Socket; use IO::Select; use threads; use threads::shared; # ----------------------------- # ----------------------------- our @stream1_clients : shared; our @stream2_clients : shared; my $stream1_prt = 5555; my $stream2_prt = 5556; my $client1_prt = 1100; my $client2_prt = 1101; my $ip = '192.168.1.12'; # ----------------------------- # Create Stream 1 listen ------ my $stream1_lsn = new IO::Socket::INET (LocalAddr => $ip, LocalPort => $stream1_prt, Type => SOCK_STREAM, Proto => "tcp", Reuse => 1, Listen => 5) or die "Stream 1 listen socket couldn't be created: $@\n"; # ----------------------------- # Create Client 1 listen ------ my $client1_lsn = new IO::Socket::INET (LocalAddr => $ip, LocalPort => $client1_prt, Type => SOCK_STREAM, Proto => "tcp", Reuse => 1, Blocking => 0, #part of issue? Listen => 5) or die "Client 1 listen socket couldn't be create: $@\n"; # ----------------------------- # Create Client 2 listen ------ my $client2_lsn = new IO::Socket::INET (LocalAddr => $ip, LocalPort => $client2_prt, Type => SOCK_STREAM, Proto => "tcp", Reuse => 1, Blocking => 0, #part of issue? Listen => 5) or die "Client 2 listen socket couldn't be create: $@\n"; # ----------------------------- # Add listen sockets to select my $sockets = new IO::Select(); $sockets->add($stream1_lsn); $sockets->add($client1_lsn); $sockets->add($client2_lsn); # ----------------------------- #<< SPLIT OFF A SEPARATE THREAD FOR STDIN?? #Go into infinite loop, handling connections - how do I end this? while(my @ready = $sockets->can_read) { #got data foreach my $socket (@ready) { # find which socket sent data #New Stream1--------------------- if ($socket == $stream1_lsn) { my $stream1_sock = $stream1_lsn->accept(); #accept stream 1 $stream1_hst = $stream1_sock->peerhost; print "New stream1 from IP [$stream1_hst]\n"; my $thr_stream1 = threads->new(\&stream1, $stream1_sock)->detach +(); } # ----------------------------- #New client1 ------------------ elsif ($socket == $client1_lsn) { my $client1_sock = $client1_lsn->accept(); #accept client 1 $sockets->add($client1_sock); #add socket to select my $fileno = fileno $client1_sock; push (@stream1_clients, $fileno); $client1_hst = $client1_sock->peerhost; print "New client for stream 1 @ IP [$client1_hst]\n"; } # ----------------------------- #New client2 ------------------ elsif ($socket == $client2_lsn) { my $client2_sock = $client2_lsn->accept(); #accept client 2 $sockets->add($client2_sock); #add socket to select my $fileno2 = fileno $client2_sock; push(@stream2_clients, $fileno2); $client2_hst = $client2_sock->peerhost; print "New client for stream 2 @ IP [$client2_hst]\n"; #Connection to second server running locally my $stream2_sock = new IO::Socket::INET( PeerAddr => 'localhost', PeerPort => $stream2_prt, Type => SOCK_STREAM, Proto => "tcp") or die "Stream2 socket couldn't be created: $@\n"; my $thr_stream2 = threads->new(\&stream2, $stream2_sock)->detach +(); } # ----------------------------- } } # Routine called by thread to send stream 1 sub stream1 { my ($lclient1) = @_; my $buf1; if ($lclient1->connected) { while(defined($buf1=<$lclient1>) { foreach $fn1 (@stream1_clients) { open my $fh1, ">&=fn1" or warn $! and die; binmode($fh1, ":raw"); print $fh1 $buf1; } } } close ($lclient1); } # ----------------------------- # Routine called by thread to send stream 2 sub stream2 { my ($lclient2) = @_; my $buf2; if ($lclient2->connected) { while(defined($buf2=<$lclient2>) { foreach $fn2 (@stream2_clients) { open my $fh2, ">&=fn2" or warn $! and die; binmode($fh2, ":raw"); print $fh2 $buf2; } } } close ($lclient2); } # ----------------------------- # sub STDIN_stream ??
        How would I go about adding in reading STDIN to allow a user to type quit to end this program?

        See Re: Ask for STDIN but don't pause for it and the last example. This is untested in your code, but something like this will work. Just add \*STDIN to your Select object and test for it.

        # Add listen sockets to select my $sockets = new IO::Select(); $sockets->add(\*STDIN ); $sockets->add($stream1_lsn); $sockets->add($client1_lsn); $sockets->add($client2_lsn); ..... #Go into infinite loop, handling connections - how do I end this? while(my @ready = $sockets->can_read) { #got data foreach my $socket (@ready) { # find which socket sent data if ($socket == \*STDIN) { #read stdin and detect a q; exit; }

        I'm not really a human, but I play one on earth.
        Old Perl Programmer Haiku ................... flash japh
Re: Problem handling 2 simultaneous socket streams
by osbosb (Monk) on Sep 15, 2011 at 17:48 UTC
    Don't forget to:
    use strict;
    in addition to your warnings switch
      Good catch. Absolutely use use strict;. Even more important than use warnings; or -w.
Re: Problem handling 2 simultaneous socket streams
by salva (Canon) on Sep 16, 2011 at 06:40 UTC