in reply to Single user to multi user socket server

What you need to do appears a little sketchy. If the issue is that you need lots of clients to talk to a server that will only accept a single connection then the answer is you need a proxy.

Client --> P Client --> R Client --> O ------> Single Conn Server Client --> X Client --> Y

Here is some simple code for a server that will handle multiple connections from clients.

Simple proxy code....updated in line with comments by thospel

use IO::Select; use IO::Socket; my $listen_port = 8080; # we proxy for many clients on this port my $scs_port = 8181; # and talk to a single connection on this por +t my $DEBUG = 1; my $lsn = new IO::Socket::INET( Listen => 1, LocalPort => $listen_port ) or die $!; # make a single connection to some server my $sock = IO::Socket::INET->new( PeerAddr => 'localhost', PeerPort => $scs_port, Proto => 'tcp' ) or die $!; my $client = new IO::Select( $lsn ); my $server = new IO::Select( $sock ); my ($count, $data, $buffer); while( my @ready = $client->can_read) { for my $fh (@ready) { if($fh == $lsn) { # Create a new socket to handle more conns $count++; $DEBUG && warn "Accepted new socket $count\n"; my $new = $lsn->accept; $client->add($new); # register it with IO::Select } else { # Process socket $DEBUG && warn "Proxy getting data\n"; $data = ''; $data .= $buffer while sysread( $fh, $buffer, 1 ) and $buf +fer ne "\n"; $DEBUG && warn "Proxy got: $data\n"; if ( ! $data or $data =~ m/exit/i ) { remove_handle( $client, $fh ); } else { # act as a proxy for this client to the SCS $DEBUG && warn "Writing to server\n"; die "Can't write to server!\n" unless $server->can_wri +te; syswrite( $sock, "$data\n" ); $DEBUG && warn "Reading from server\n"; die "Can't read from server!\n" unless $server->can_re +ad; $data = ''; $data .= $buffer while sysread( $sock, $buffer, 1 ) an +d $buffer ne "\n"; $DEBUG && warn "Writing back to client\n"; if ( $client->can_write ) { syswrite( $fh, "$data\n" ); } else { $DEBUG && warn "Can't write data back to client\n" +; remove_handle( $client, $fh ); } #syswrite( $fh, "Hello $data\n" ); # is we want a sim +ple server } } } } sub remove_handle { my ( $client, $fh ) = @_; $DEBUG && warn "Removing client!\n"; syswrite( $fh, "Sayonara!\n" ) if $client->can_write; $client->remove($fh); $fh->close; $count--; }

cheers

tachyon

Replies are listed 'Best First'.
Re: Re: Single user to multi user socket server
by thospel (Hermit) on Jun 02, 2004 at 03:33 UTC
    Unfortunately you mix stdio with select here, which makes it rather easy to hang this code. If a client ever sends an incomplete line and then waits, the server will get a readability select for the socket to that client, and then the <$fh> will hang until the client bothers to send a newline (easily demonstrated by setting the telnets you test with to character mode)

    The write back is also potentially blocking, so can also hang.

    There is also the issue of clients that don't send a proper "exit" but just close the connection (maybe a ^C or a crash). The write back you do for them will cause a SIGPIPE and the program will exit.

    I think that the easiest way to write this kind of proxy without all these blocking issues is by using POE. That already takes care of all these issues.

      All valid point. I have updated the code to avoid STDIO and added some error handling. For my own learning I would be interested what you would now change. The main issue now is that if a client hangs writing to the proxy it will block. What is the best approach to this sort of issue? Can you point me towards the best docs?

      cheers

      tachyon

        Unfortunately the changes make it worse.

        $data = ''; $data .= $buffer while sysread( $fh, $buffer, 1 ) and $buffer ne "\n";
        This does just what <$fh> would do, only more inefficient. The basic problem remains: a readability select only guarantees you one unblocking sysread, every later one can block indefinitely.

        die "Can't write to server!\n" unless $server->can_write syswrite($sock, "$data\n" );
        Just because the server is a bit slow for a moment shouldn't be fatal I think. Anyways, it's somewhat reasonable that if the real server pauses for some reason, all clients pause too (though you could do pending writes to clients and buffering for input in that time). So a stdio read and write to the real server isn't too bad at this point.

        But worse is the fact that a syswrite can succeed partially, so the server will not see the \n and probably never answer. Then your emulated <$sock> just after it will wait forever.

        For the client writeback you do

        if ( $client->can_write ) { syswrite( $fh, "$data\n" ); }
        which indeed solves the blocking problem, but can again lose data (because the client happens to indeed not be writable or the syswrite only does a partial write).

        The read parts you normally solve by having an input buffer per connection, and on readability do one sysread, and append the result to the per socket buffer. Then you check if you have complete lines in this buffer, and if so, process and remove them.

        For write something simular goes. You have a per socket output buffer, and select for writability as long as that's not empty. Then on writability you do a syswrite, check how many chars you succesfully wrote and remove that many from your buffer. But as you can see this implies that you can have both readability and writability selects going on for your main wait, so you'll need the full 4-argument select.

        Where is this style of programming documented ? Unfortunately I don't know any good references, and I'd love to know some, because I often see people doing this kind of stuff incorrectly, and I'd love to point them to a good explanation. I myself picked it up by just doing it and lots of debugging.

        All of that gets pretty tedious very quickly. Fortunately it's all already builtin in POE. Here is a demo proxy in POE:

        #! /usr/bin/perl -w use strict; use IO::Socket::INET; use POE qw(Wheel::ListenAccept Wheel::ReadWrite); my $listening_port=8019; my $rtk_port = 4004; # my $rtk_port = "echo"; my $rtk_ip = "localhost"; my $listener = IO::Socket::INET->new(LocalPort => $listening_port, Reuse => 1, Listen =>512) || die "Could create to listening socket on port $listening_port: $!\ +n"; # I don't bother with a POE::Wheel::SocketFactory here since # I want to know immediately if the server can't be reached. my $server = IO::Socket::INET->new(PeerAddr => $rtk_ip, PeerPort => $rtk_port) || die "Could create rtk socket on IP $rtk_ip and port $rtk_port: $!\ +n"; POE::Session->create(inline_states => { _start => \&start, "accept" => \&do_accept, "accept_error" => sub { warn "$_[ARG0] error: $_[ARG2]\n" }, "client_input" => \&client_input, "client_end" => \&client_end, "client_flushed" => sub { delete $_[HEAP]->{clients}{$_[ARG0]} +}, "server_input" => \&server_input, "server_end" => \&server_end, _default => sub { die("calling non existant event ", $_[ARG0]) unless substr($_[ARG0], 0, 1) eq "_"; return; } }); sub start { my $heap = $_[HEAP]; $heap->{listener} = POE::Wheel::ListenAccept->new (Handle => $listener, AcceptEvent => "accept", ErrorEvent => "accept_error"); $heap->{server} = POE::Wheel::ReadWrite->new (Handle => $server, InputEvent => "server_input", ErrorEvent => "server_end"); $heap->{id_queue} = []; $heap->{clients} = {}; } sub do_accept { my ($heap, $new_sock) = @_[HEAP, ARG0]; my $client = POE::Wheel::ReadWrite->new (Handle => $new_sock, InputEvent => "client_input", ErrorEvent => "client_end"); $heap->{clients}{$client->ID} = $client; } sub client_input { my ($heap, $line, $client_id) = @_[HEAP, ARG0, ARG1]; if ($heap->{server}) { if ($line =~ /exit/) { close_on_empty($heap, $client_id); } else { # Remember for which client id we queue this line push @{$heap->{id_queue}}, $client_id; $heap->{server}->put($line); } } } sub client_end { my ($heap, $operation, $errnum, $errstr, $client_id) = @_[HEAP,ARG +0..ARG3]; warn "$operation: $errstr\n" if $errnum; delete $heap->{clients}->{$client_id}; } sub server_input { my $heap = $_[HEAP]; my $id = shift @{$heap->{id_queue}} || die "multiline echos from server ?"; $heap->{clients}{$id}->put($_[ARG0]) if $heap->{clients}{$id}; } sub server_end { my ($heap, $operation, $errnum, $errstr) = @_[HEAP,ARG0..ARG2]; warn "$operation: $errstr\n" if $errnum; warn "Server shutdown\n"; delete $heap->{server}; delete $heap->{listener}; close $listener; close_on_empty($heap, $_) for keys %{$heap->{clients}}; } sub close_on_empty { my ($heap, $client_id) = @_; my $client = $heap->{clients}{$client_id}; if ($client->get_driver_out_octets) { $client->event("FlushedEvent", "client_flushed"); } else { delete $heap->{clients}{$client_id}; } } $poe_kernel->run();
        The main thing missing from that code is flowcontrol. The system can be driven out of memory if the clients submit faster than the server can process. It probably should also introduce a sane maximum number of connected clients.