in reply to Re: Re: Re: Single user to multi user socket server
in thread Single user to multi user socket server
This does just what <$fh> would do, only more inefficient. The basic problem remains: a readability select only guarantees you one unblocking sysread, every later one can block indefinitely.$data = ''; $data .= $buffer while sysread( $fh, $buffer, 1 ) and $buffer ne "\n";
Just because the server is a bit slow for a moment shouldn't be fatal I think. Anyways, it's somewhat reasonable that if the real server pauses for some reason, all clients pause too (though you could do pending writes to clients and buffering for input in that time). So a stdio read and write to the real server isn't too bad at this point.die "Can't write to server!\n" unless $server->can_write syswrite($sock, "$data\n" );
But worse is the fact that a syswrite can succeed partially, so the server will not see the \n and probably never answer. Then your emulated <$sock> just after it will wait forever.
For the client writeback you do
which indeed solves the blocking problem, but can again lose data (because the client happens to indeed not be writable or the syswrite only does a partial write).if ( $client->can_write ) { syswrite( $fh, "$data\n" ); }
The read parts you normally solve by having an input buffer per connection, and on readability do one sysread, and append the result to the per socket buffer. Then you check if you have complete lines in this buffer, and if so, process and remove them.
For write something simular goes. You have a per socket output buffer, and select for writability as long as that's not empty. Then on writability you do a syswrite, check how many chars you succesfully wrote and remove that many from your buffer. But as you can see this implies that you can have both readability and writability selects going on for your main wait, so you'll need the full 4-argument select.
Where is this style of programming documented ? Unfortunately I don't know any good references, and I'd love to know some, because I often see people doing this kind of stuff incorrectly, and I'd love to point them to a good explanation. I myself picked it up by just doing it and lots of debugging.
All of that gets pretty tedious very quickly. Fortunately it's all already builtin in POE. Here is a demo proxy in POE:
The main thing missing from that code is flowcontrol. The system can be driven out of memory if the clients submit faster than the server can process. It probably should also introduce a sane maximum number of connected clients.#! /usr/bin/perl -w use strict; use IO::Socket::INET; use POE qw(Wheel::ListenAccept Wheel::ReadWrite); my $listening_port=8019; my $rtk_port = 4004; # my $rtk_port = "echo"; my $rtk_ip = "localhost"; my $listener = IO::Socket::INET->new(LocalPort => $listening_port, Reuse => 1, Listen =>512) || die "Could create to listening socket on port $listening_port: $!\ +n"; # I don't bother with a POE::Wheel::SocketFactory here since # I want to know immediately if the server can't be reached. my $server = IO::Socket::INET->new(PeerAddr => $rtk_ip, PeerPort => $rtk_port) || die "Could create rtk socket on IP $rtk_ip and port $rtk_port: $!\ +n"; POE::Session->create(inline_states => { _start => \&start, "accept" => \&do_accept, "accept_error" => sub { warn "$_[ARG0] error: $_[ARG2]\n" }, "client_input" => \&client_input, "client_end" => \&client_end, "client_flushed" => sub { delete $_[HEAP]->{clients}{$_[ARG0]} +}, "server_input" => \&server_input, "server_end" => \&server_end, _default => sub { die("calling non existant event ", $_[ARG0]) unless substr($_[ARG0], 0, 1) eq "_"; return; } }); sub start { my $heap = $_[HEAP]; $heap->{listener} = POE::Wheel::ListenAccept->new (Handle => $listener, AcceptEvent => "accept", ErrorEvent => "accept_error"); $heap->{server} = POE::Wheel::ReadWrite->new (Handle => $server, InputEvent => "server_input", ErrorEvent => "server_end"); $heap->{id_queue} = []; $heap->{clients} = {}; } sub do_accept { my ($heap, $new_sock) = @_[HEAP, ARG0]; my $client = POE::Wheel::ReadWrite->new (Handle => $new_sock, InputEvent => "client_input", ErrorEvent => "client_end"); $heap->{clients}{$client->ID} = $client; } sub client_input { my ($heap, $line, $client_id) = @_[HEAP, ARG0, ARG1]; if ($heap->{server}) { if ($line =~ /exit/) { close_on_empty($heap, $client_id); } else { # Remember for which client id we queue this line push @{$heap->{id_queue}}, $client_id; $heap->{server}->put($line); } } } sub client_end { my ($heap, $operation, $errnum, $errstr, $client_id) = @_[HEAP,ARG +0..ARG3]; warn "$operation: $errstr\n" if $errnum; delete $heap->{clients}->{$client_id}; } sub server_input { my $heap = $_[HEAP]; my $id = shift @{$heap->{id_queue}} || die "multiline echos from server ?"; $heap->{clients}{$id}->put($_[ARG0]) if $heap->{clients}{$id}; } sub server_end { my ($heap, $operation, $errnum, $errstr) = @_[HEAP,ARG0..ARG2]; warn "$operation: $errstr\n" if $errnum; warn "Server shutdown\n"; delete $heap->{server}; delete $heap->{listener}; close $listener; close_on_empty($heap, $_) for keys %{$heap->{clients}}; } sub close_on_empty { my ($heap, $client_id) = @_; my $client = $heap->{clients}{$client_id}; if ($client->get_driver_out_octets) { $client->event("FlushedEvent", "client_flushed"); } else { delete $heap->{clients}{$client_id}; } } $poe_kernel->run();
|
|---|
| Replies are listed 'Best First'. | |
|---|---|
|
Re: Re^4: Single user to multi user socket server
by Anonymous Monk on Jun 02, 2004 at 17:27 UTC | |
|
Re^5: Single user to multi user socket server
by tachyon (Chancellor) on Jun 03, 2004 at 02:14 UTC | |
by Anonymous Monk on Jun 03, 2004 at 02:30 UTC |