PhillyR has asked for the wisdom of the Perl Monks concerning the following question:

I have written a script that acts a TCP forwarding server for clients that cannot enable a listen socket for a particular data source. The server listens for 2 separate streams of data and listens for incoming clients. Each packet that is received is then forwarded to each attached client. Everything has been working well in my environment. However, in recent testing with some external users the outgoing sockets to the clients may be being blocked. After 20 minutes of not receiving any data the entire file would appear, versus receiving in real time. I had thought my code was pretty good, but now I start to question. Can anyone out there review to see if there are problems in the infinite loop or stream1 function or any other improvement suggestions? I'm still learning about the underlying details to PERL.

The data coming in is a known format. I parse the header to determine the size of a full packet so that clients don't connect in the middle. I then recreate the packet and send to all clients.

NOTE: stream2/client2 were not used in the tests
#!/usr/bin/perl -w #v1.0 #Declarations -------------------------------------------------------- +----- use warnings; use strict; use IO::Socket; use IO::Select; use Getopt::Long; use Term::ReadKey; use threads; use threads::shared; # -------------------------------------------------------------------- +----- # VARIABLES ---------------------------------------------------------- +- our @stream1_clients : shared; our @stream2_clients : shared; @stream1_clients = (); @stream2_clients = (); my $stream1_prt; my $stream2_prt; my $s1client_prt; my $s2client_prt; my $s1TCPIP; my $s2TCPIP; my $s1TCP; my $s2TCP; my $s1Log; my $s2Log; my $f1_out; my $f2_out; my $help; # -------------------------------------------------------------------- +----- # Data offsets and sizes --------------------------------------------- sub VERSION_FIELD_OFF () { 0 } sub VERSION_FIELD_LEN () { 2 } sub PACKET_SIZE_FIELD_OFF () { VERSION_FIELD_OFF() + VERSION_FIELD_ +LEN() } sub PACKET_SIZE_FIELD_LEN () { 4 } # -------------------------------------------------------------------- +----- # HELP AND USAGE --------------------------------------------------- sub usage { print "Unknown option @_\n" if (@_); print "Proxy acts as a listen server to receive data from an externa +l source\n"; print " and forward to clients lacking the ability to receive native +ly\n"; print " or share a connection to PCs behind a firewall \n\n"; print "usage: Proxy.pl [-s1prt PORT] [-s2prt PORT] [-c1prt PORT] [-c +2prt PORT]\n"; print " [-s1tcp] [-s2tcp] [-s1ip IP] [-s2ip IP] [-s1log LOGFILE] [- +s2log LOGFILE]\n"; print " [-help | -?]\n\n"; print " -s1prt: PORT for incoming stream 1. Default is 5555.\n"; print " -s2prt: PORT for incoming stream 2. Default is 6666.\n"; print " -c1prt: PORT for outgoing stream 1. Default is 1100.\n"; print " -c2prt: PORT for outgoing stream 2. Default is 1101.\n"; print " -s1tcp: Switch from listen server to client mode for stream + 1.\n"; print " -s2tcp: Switch from listen server to client mode for stream + 2.\n"; print " -s1ip: IP address of stream 1 when in client mode (-s1tcp). +\n"; print " -s2ip: IP address of stream 2 when in client mode (-s2tcp). +\n"; print " -s1log: Writes incoming stream 1 data directly to LOGFILE.\ +n"; print " -s2log: Writes incoming stream 2 data directly to LOGFILE.\ +n"; exit(0); } # -------------------------------------------------------------------- +----- # Parse Command line arguments --------------------------------------- +-- sub check_args{ usage() if ! GetOptions( "s1prt=s" => \$stream1_prt, "s2prt=s" => \$stream2_prt, "c1prt=s" => \$s1client_prt, "c2prt=s" => \$s2client_prt, "s1tcp" => \$s1TCP, "s2tcp" => \$s2TCP, "s1ip=s" => \$s1TCPIP, "s2ip=s" => \$s2TCPIP, "s1log=s" => \$s1Log, "s2log=s" => \$s2Log, "help|?" => \$help ); if(defined($help)) { usage(); } # Setup Default or User Defined Parameters --------------------- $stream1_prt = 5555 unless defined $stream1_prt; $stream2_prt = 6666 unless defined $stream2_prt; $s1client_prt = 1100 unless defined $s1client_prt; $s2client_prt = 1101 unless defined $s2client_prt; $s1TCP = 0 unless defined $s1TCP; $s2TCP = 0 unless defined $s2TCP; $s1TCPIP = 'localhost' unless defined $s1TCPIP; $s2TCPIP = 'localhost' unless defined $s2TCPIP; # ---------------------------------------------------------------- } check_args(); #START OF PROGRAM, parse command line $|++; #clear IO buffers # -------------------------------------------------------------------- +----- # WELCOME MESSAGE ---------------------------------------------- my $iaddr = gethostbyname(""); my $ip = inet_ntoa($iaddr); if(defined($iaddr)) { print "############################################################# +#######\n"; print " WELCOME TO THE PROXY FOR STREAMING DATA\n\n"; print " Configure clients to receive data from IP address [$ip]\n"; if ($s1TCP == 0) { print " Stream 1"; } else { print " TCP connection [$s1TCPIP]"; } print " incoming port [$stream1_prt], outgoing port [$s1client_prt]\ +n"; if ($s2TCP == 0) { print " Stream 2"; } else { print " TCP connection [$s2TCPIP]"; } print " incoming port [$stream2_prt], outgoing port [$s2client_prt]\ +n\n"; print " PRESS 'Q' TO EXIT\n"; print "############################################################# +#######\n"; } else { die ("ipaddr failed to get IP address\n"); } # -------------------------------------------------------------------- +----- # Create Stream 1 Listen Socket ---------------- my $stream1_lsn = new IO::Socket::INET ( LocalAddr => $ip, LocalPort => $stream1_prt, Type => SOCK_STREAM, Proto => "tcp", Reuse => 1, Listen => 5) or die "Stream 1 listen socket [$ip]:[$stream1_prt] couldn't be create +d: $@\n"; # -------------------------------------------------------------------- +----- # Create Stream 2 Listen Socket ---------------- my $stream2_lsn = new IO::Socket::INET ( LocalAddr => $ip, LocalPort => $stream2_prt, Type => SOCK_STREAM, Proto => "tcp", Reuse => 1, Listen => 5) or die "Stream 2 listen socket [$ip]:[$stream2_prt] couldn't be create +d: $@\n"; # -------------------------------------------------------------------- +----- # Create Listener for clients for stream 1 --------------------------- +- my $client1_lsn = new IO::Socket::INET ( LocalAddr => $ip, LocalPort => $s1client_prt, Type => SOCK_STREAM, Proto => "tcp", Reuse => 1, Listen => 5) or die "Stream 1 client listen socket [$ip]:[$s1client_prt] couldn't b +e created: $@\n"; # -------------------------------------------------------------------- +----- # Create Listener for clients for stream 2 --------------------------- +- my $client2_lsn = new IO::Socket::INET ( LocalAddr => $ip, LocalPort => $s2client_prt, Type => SOCK_STREAM, Proto => "tcp", Reuse => 1, Listen => 5) or die "Stream 2 client listen socket [$ip]:[$s2client_prt] couldn't b +e created: $@\n"; # -------------------------------------------------------------------- +----- # Add listen sockets to an array of sockets for monitoring ----------- +------ my $sockets = new IO::Select(); $sockets->add($stream1_lsn); $sockets->add($stream2_lsn); $sockets->add($client1_lsn); $sockets->add($client2_lsn); # -------------------------------------------------------------------- +----- # Create a separate thread to monitor keyboard input my $thr_input = threads->new(\&read_in)->detach; #Go into an infinite loop listening for incoming connections close STDIN; #Required to prevent loop blocking while (my @ready = $sockets->can_read) { #Connection heard on o +ne of the listen sockets foreach my $socket (@ready) { #Find incoming connection #>If the socket is the MSM s1 listen then complete the TCP connect +ion to MSM if ($socket == $stream1_lsn) { my $stream1_sock = $stream1_lsn->accept(); #accept incoming c +onnection my $stream1_hst = $stream1_sock->peerhost(); #IP of the conne +ction print "\n-> NEW STREAM from IP [$stream1_hst] as [STREAM 1]\n"; my $thr_stream1 = threads->new(\&stream1, $stream1_sock)->detach +(); #create thread to handle stream 1 data } #>If the socket is the MSM s2 listen then complete the TCP connect +ion to MSM elsif ($socket == $stream2_lsn) { my $stream2_sock = $stream2_lsn->accept(); #accept incoming c +onnection my $stream2_hst = $stream2_sock->peerhost(); #IP of the conne +ction print "\n-> NEW STREAM from IP [$stream2_hst] as [STREAM 2]\n"; my $thr_stream2 = threads->new(\&stream2, $stream2_sock)->detach +(); #create thread to handle stream 1 data } #>If socket is stream 1 listener, then a new client is requesting +stream 1 elsif($socket == $client1_lsn) { my $client1_sock = $client1_lsn->accept(); #accept connec +tion $sockets->add($client1_sock); #add socket to listen a +rray to keep connection alive my $fileno = fileno $client1_sock; push (@stream1_clients, $fileno); #add descriptor to shar +ed array to open clients in separate thread my $client1_hst = $client1_sock->peerhost; print "\n new client at IP [$client1_hst] for [STREAM 1]\n"; #If requested via the command line initiate a TCP connection to +a server for stream 1 if($s1TCP == 1) { my $stream1TCP = new IO::Socket::INET (PeerAddr => $s1TCPIP, PeerPort => $stream1_prt, Type => SOCK_STREAM, Proto => "tcp") or die "TCP socket [$s1TCPIP]:[$stream1_prt] couldn't be creat +ed: $@\n"; $s1TCP = 0; print "\n Created TCP connection to [$s1TCPIP]:[$stream1_prt +]\n"; my $thr_s1TCP = threads->new(\&stream1, $stream1TCP)->detach() +; } } #>If socket is stream 2 listener, then a new client is requesting +stream 2 elsif($socket == $client2_lsn) { my $client2_sock = $client2_lsn->accept(); #accept connec +tion $sockets->add($client2_sock); #add socket to listen a +rray to keep connection alive my $fileno2 = fileno $client2_sock; push (@stream2_clients, $fileno2); #add descriptor to sha +red array to open clients in separate thread my $client2_hst = $client2_sock->peerhost; print "\n new client at IP [$client2_hst] for [STREAM 2]\n"; #If requested via the command line initiate a TCP connection to +a server for stream 2 if($s2TCP == 1) { my $stream2TCP = new IO::Socket::INET (PeerAddr => $s2TCPIP, PeerPort => $stream2_prt, Type => SOCK_STREAM, Proto => "tcp") or die "TCP socket [$s2TCPIP]:[$stream2_prt] couldn't be creat +ed: $@\n"; $s2TCP = 0; print "\n Created TCP connection to [$s2TCPIP]:[$stream2_prt +]\n"; my $thr_s2TCP = threads->new(\&stream2, $stream2TCP)->detach() +; } } } } # -------------------------------------------------------------------- +----- # Handle sending data from stream 1 to all connected stream 1 clients +----- sub stream1{ my ($lstream1) = @_; #socket for stream 1 data binmode($lstream1, ":raw"); #set mode for binary #create a log for incoming data if selected at the command prompt if(defined($s1Log)) { open($f1_out, ">>", $s1Log) || die "unable to open file to write"; binmode($f1_out, ":raw"); } if($lstream1->connected) { MONITOR1: while(1) { #read data version my $numBytesRead = read($lstream1, my $version, VERSION_FIELD_LE +N() ); if ($numBytesRead == 0) { last MONITOR1; } #read packetsize $numBytesRead = read($lstream1, my $packets, PACKET_SIZE_FIELD_L +EN() ); #read rest of packet my ($packetSize) = unpack("N", $packets); #conver +t binary to integer $packetSize -= VERSION_FIELD_LEN() + PACKET_SIZE_FIELD_LEN(); + #already read version and packetsize $numBytesRead = read($lstream1, my $data, $packetSize); print "."; #create packet for distribution. This ensures clients start rec +eiving full data packets to prevent crashing my $payload = $version . $packets . $data; #send to all connected clients foreach my $fn1 (@stream1_clients) { open my $fh1, ">&$fn1" or warn $! and die; #open a filehand +le to the descriptor of each client socket binmode($fh1, ":raw"); #set the mode of the filehan +dle to receive raw binary data print $fh1 $payload; #send the packet to each clien +t } #write data to logfile if(defined($f1_out)) { print $f1_out $payload; } } } my $stream_ip = $lstream1->peerhost; print "\n* Stream 1 [$stream_ip:$stream1_prt] disconnected\n"; close($lstream1); if(defined($f1_out)) { close($f1_out);} } # -------------------------------------------------------------------- +----- # Handle sending data from stream 2 to all connected stream 2 clients +----- sub stream2{ my ($lstream2) = @_; #socket for stream 1 data binmode($lstream2, ":raw"); #set mode for binary #create a log for incoming data if selected at the command prompt if(defined($s2Log)) { open($f2_out, ">>", $s2Log) || die "unable to open file to write"; binmode($f2_out, ":raw"); } if($lstream2->connected) { MONITOR2: while(1) { #read data version my $numBytesRead2 = read($lstream2, my $version2, VERSION_FIELD_ +LEN() ); if ($numBytesRead2 == 0) { last MONITOR2; } #read packetsize $numBytesRead2 = read($lstream2, my $packets2, PACKET_SIZE_FIELD +_LEN() ); #read rest of packet my ($packetSize2) = unpack("N", $packets2); #conv +ert binary to integer $packetSize2 -= VERSION_FIELD_LEN() + PACKET_SIZE_FIELD_LEN(); + #already read version and packetsize $numBytesRead2 = read($lstream2, my $data2, $packetSize2); print "."; #create packet for distribution. This ensures clients start rec +eiving full data packets to prevent crashing my $payload2 = $version2 . $packets2 . $data2; #send to all connected clients foreach my $fn2 (@stream2_clients) { open my $fh2, ">&$fn2" or warn $! and die; #open a filehand +le to the descriptor of each client socket binmode($fh2, ":raw"); #set the mode of the filehan +dle to receive raw binary data print $fh2 $payload2; #send the packet to each clie +nt } #write data to logfile if(defined($f2_out)) { print $f2_out $payload2; } } } my $stream_ip = $lstream2->peerhost; print "\n* Stream 2 [$stream_ip:$stream2_prt] disconnected\n"; close($lstream2); if(defined($f2_out)) { close($f2_out);} } # -------------------------------------------------------------------- +----- # Monitor STDIN. Exit upon 'q' -------------------------------------- +--- sub read_in{ my $char; while(1) { if(defined($char = ReadKey(0)) ) { if ($char eq 'q') { # exit proxy #Close all open connections print "Closing...\n"; my @connections = $sockets->handles; foreach my $connection (@connections) { close($connection); } #End all threads my @threadList = threads->list(); foreach my $thr (@threadList) { $thr->kill(); } exit; } } } } # -------------------------------------------------------------------- +----- __END__

Replies are listed 'Best First'.
Re: TCP Forwarding Server
by Eliya (Vicar) on Mar 21, 2012 at 19:07 UTC
    After 20 minutes of not receiving any data the entire file would appear, ...

    I must admit I haven't scrutinized the code (a tad too much for my taste... :), but that problem description, together with no mention of autoflush (or flush) in the entire code, makes me guess you have a buffering issue...

    Note that the default buffering mode for most file handles / sockets is block-buffered, which means that an entire block of data (e.g. 4K) must have been processed, before the buffer fills up and is written/flushed to its actual destination.

    Also note that $|++ only has an effect on the "currently selected handle", i.e. STDOUT (the default) in your case.  It does not automagically turn on autoflushing for every handle used in the program.

    To turn on autoflushing for a specific handle, use

    use IO::Handle; ... $fh->autoflush();

    See also IO::Handle.