in reply to Re: Single user to multi user socket server
in thread Single user to multi user socket server

Unfortunately you mix stdio with select here, which makes it rather easy to hang this code. If a client ever sends an incomplete line and then waits, the server will get a readability select for the socket to that client, and then the <$fh> will hang until the client bothers to send a newline (easily demonstrated by setting the telnets you test with to character mode)

The write back is also potentially blocking, so can also hang.

There is also the issue of clients that don't send a proper "exit" but just close the connection (maybe a ^C or a crash). The write back you do for them will cause a SIGPIPE and the program will exit.

I think that the easiest way to write this kind of proxy without all these blocking issues is by using POE. That already takes care of all these issues.

  • Comment on Re: Re: Single user to multi user socket server

Replies are listed 'Best First'.
Re: Re: Re: Single user to multi user socket server
by tachyon (Chancellor) on Jun 02, 2004 at 06:42 UTC
    All valid point. I have updated the code to avoid STDIO and added some error handling. For my own learning I would be interested what you would now change. The main issue now is that if a client hangs writing to the proxy it will block. What is the best approach to this sort of issue? Can you point me towards the best docs?

    cheers

    tachyon

      Unfortunately the changes make it worse.

      $data = ''; $data .= $buffer while sysread( $fh, $buffer, 1 ) and $buffer ne "\n";
      This does just what <$fh> would do, only more inefficient. The basic problem remains: a readability select only guarantees you one unblocking sysread, every later one can block indefinitely.

      die "Can't write to server!\n" unless $server->can_write syswrite($sock, "$data\n" );
      Just because the server is a bit slow for a moment shouldn't be fatal I think. Anyways, it's somewhat reasonable that if the real server pauses for some reason, all clients pause too (though you could do pending writes to clients and buffering for input in that time). So a stdio read and write to the real server isn't too bad at this point.

      But worse is the fact that a syswrite can succeed partially, so the server will not see the \n and probably never answer. Then your emulated <$sock> just after it will wait forever.

      For the client writeback you do

      if ( $client->can_write ) { syswrite( $fh, "$data\n" ); }
      which indeed solves the blocking problem, but can again lose data (because the client happens to indeed not be writable or the syswrite only does a partial write).

      The read parts you normally solve by having an input buffer per connection, and on readability do one sysread, and append the result to the per socket buffer. Then you check if you have complete lines in this buffer, and if so, process and remove them.

      For write something simular goes. You have a per socket output buffer, and select for writability as long as that's not empty. Then on writability you do a syswrite, check how many chars you succesfully wrote and remove that many from your buffer. But as you can see this implies that you can have both readability and writability selects going on for your main wait, so you'll need the full 4-argument select.

      Where is this style of programming documented ? Unfortunately I don't know any good references, and I'd love to know some, because I often see people doing this kind of stuff incorrectly, and I'd love to point them to a good explanation. I myself picked it up by just doing it and lots of debugging.

      All of that gets pretty tedious very quickly. Fortunately it's all already builtin in POE. Here is a demo proxy in POE:

      #! /usr/bin/perl -w use strict; use IO::Socket::INET; use POE qw(Wheel::ListenAccept Wheel::ReadWrite); my $listening_port=8019; my $rtk_port = 4004; # my $rtk_port = "echo"; my $rtk_ip = "localhost"; my $listener = IO::Socket::INET->new(LocalPort => $listening_port, Reuse => 1, Listen =>512) || die "Could create to listening socket on port $listening_port: $!\ +n"; # I don't bother with a POE::Wheel::SocketFactory here since # I want to know immediately if the server can't be reached. my $server = IO::Socket::INET->new(PeerAddr => $rtk_ip, PeerPort => $rtk_port) || die "Could create rtk socket on IP $rtk_ip and port $rtk_port: $!\ +n"; POE::Session->create(inline_states => { _start => \&start, "accept" => \&do_accept, "accept_error" => sub { warn "$_[ARG0] error: $_[ARG2]\n" }, "client_input" => \&client_input, "client_end" => \&client_end, "client_flushed" => sub { delete $_[HEAP]->{clients}{$_[ARG0]} +}, "server_input" => \&server_input, "server_end" => \&server_end, _default => sub { die("calling non existant event ", $_[ARG0]) unless substr($_[ARG0], 0, 1) eq "_"; return; } }); sub start { my $heap = $_[HEAP]; $heap->{listener} = POE::Wheel::ListenAccept->new (Handle => $listener, AcceptEvent => "accept", ErrorEvent => "accept_error"); $heap->{server} = POE::Wheel::ReadWrite->new (Handle => $server, InputEvent => "server_input", ErrorEvent => "server_end"); $heap->{id_queue} = []; $heap->{clients} = {}; } sub do_accept { my ($heap, $new_sock) = @_[HEAP, ARG0]; my $client = POE::Wheel::ReadWrite->new (Handle => $new_sock, InputEvent => "client_input", ErrorEvent => "client_end"); $heap->{clients}{$client->ID} = $client; } sub client_input { my ($heap, $line, $client_id) = @_[HEAP, ARG0, ARG1]; if ($heap->{server}) { if ($line =~ /exit/) { close_on_empty($heap, $client_id); } else { # Remember for which client id we queue this line push @{$heap->{id_queue}}, $client_id; $heap->{server}->put($line); } } } sub client_end { my ($heap, $operation, $errnum, $errstr, $client_id) = @_[HEAP,ARG +0..ARG3]; warn "$operation: $errstr\n" if $errnum; delete $heap->{clients}->{$client_id}; } sub server_input { my $heap = $_[HEAP]; my $id = shift @{$heap->{id_queue}} || die "multiline echos from server ?"; $heap->{clients}{$id}->put($_[ARG0]) if $heap->{clients}{$id}; } sub server_end { my ($heap, $operation, $errnum, $errstr) = @_[HEAP,ARG0..ARG2]; warn "$operation: $errstr\n" if $errnum; warn "Server shutdown\n"; delete $heap->{server}; delete $heap->{listener}; close $listener; close_on_empty($heap, $_) for keys %{$heap->{clients}}; } sub close_on_empty { my ($heap, $client_id) = @_; my $client = $heap->{clients}{$client_id}; if ($client->get_driver_out_octets) { $client->event("FlushedEvent", "client_flushed"); } else { delete $heap->{clients}{$client_id}; } } $poe_kernel->run();
      The main thing missing from that code is flowcontrol. The system can be driven out of memory if the clients submit faster than the server can process. It probably should also introduce a sane maximum number of connected clients.
        Thanks for all your help. I have tried the Proxy method. But I guess I should have explained this part better. The data that the server is connecting to is streaming out every second none stop. The client only needs to receive the data no publish back to the server. I got the blow code to work for a browser, but it hangs on a telnet session unless you hit a key stroke.. Then it comes screaming across. The data that is coming out does not have a \n or \r or ETX or STX.. it is just there.. I have the code to package and build it. then ship it to the client.. But.. it is getting it to the client that is a pain in code..
        #!/usr/local/bin/perl -w use strict; use IO::Socket; use IO::Select; # create a socket to listen to a port my $listen = IO::Socket::INET->new(Proto => 'tcp', LocalPort => 2323, Listen => 1, Reuse => 1) or die $!; my $sk = IO::Socket::INET->new('PeerAddr' => '192.168.3.100', 'PeerPor +t' =>'4004', 'Proto'=>'tcp') || die "Can't connect"; # to start with, $select contains only the socket we're listening on my $select = IO::Select->new($listen); my @ready; # wait until there's something to do while(1) { if(@ready = $select->can_read(0)){ my $socket; # handle each socket that's ready for $socket (@ready) { # if the listening socket is ready, accept a new connection if($socket == $listen) { my $new = $listen->accept; $select->add($new); print $new->fileno . ": connected\n"; } else { # otherwise, read a line of text and send it back again my $line; my $lineB; $sk->recv($line, 1024); $socket->recv($lineB,1); $lineB=length($lineB); print "length ",length($lineB),"\n"; print "Ord ",ord($lineB),"\n"; print "Revc ",$lineB,"\n"; if($lineB > 0){ print "Hello\n"; $socket->send($line); } } } } }

        Many thanks for you time in putting that together

        cheers

        tachyon