Unfortunately the changes make it worse.
$data = '';
$data .= $buffer while sysread( $fh, $buffer, 1 ) and $buffer ne "\n";
This does just what <$fh> would do, only more inefficient. The basic problem remains: a readability select only guarantees you one unblocking sysread, every later one can block indefinitely.
die "Can't write to server!\n" unless $server->can_write
syswrite($sock, "$data\n" );
Just because the server is a bit slow for a moment shouldn't be fatal I think. Anyways, it's somewhat reasonable that if the real server pauses for some reason, all clients pause too (though you could do pending writes to clients and buffering for input in that time). So a stdio
read and write to the real server isn't too bad at this point.
But worse is the fact that a syswrite can succeed partially,
so the server will not see the \n and probably never answer.
Then your emulated <$sock> just after it will wait forever.
For the client writeback you do
if ( $client->can_write ) {
syswrite( $fh, "$data\n" );
}
which indeed solves the blocking problem, but can again lose data (because the client happens to indeed not be writable or the syswrite only does a partial write).
The read parts you normally solve by having an input buffer per connection, and on readability do one sysread, and append the result to the per socket buffer. Then you check if you
have complete lines in this buffer, and if so, process and remove them.
For write something simular goes. You have a per socket output buffer, and select for writability as long as that's not empty. Then on writability you do a syswrite, check how many chars you succesfully wrote and remove that many from your buffer. But as you can see this implies that you can have both readability and writability selects going on for your main wait, so you'll need the full 4-argument select.
Where is this style of programming documented ? Unfortunately I don't know any good references, and I'd love to know some, because I often see people doing this kind of stuff incorrectly, and I'd love to point them to a good explanation. I myself picked it up by just doing it and
lots of debugging.
All of that gets pretty tedious very quickly. Fortunately it's all already builtin in POE. Here is a demo proxy in POE:
#! /usr/bin/perl -w
use strict;
use IO::Socket::INET;
use POE qw(Wheel::ListenAccept Wheel::ReadWrite);
my $listening_port=8019;
my $rtk_port = 4004;
# my $rtk_port = "echo";
my $rtk_ip = "localhost";
my $listener = IO::Socket::INET->new(LocalPort => $listening_port,
Reuse => 1,
Listen =>512) ||
die "Could create to listening socket on port $listening_port: $!\
+n";
# I don't bother with a POE::Wheel::SocketFactory here since
# I want to know immediately if the server can't be reached.
my $server = IO::Socket::INET->new(PeerAddr => $rtk_ip,
PeerPort => $rtk_port) ||
die "Could create rtk socket on IP $rtk_ip and port $rtk_port: $!\
+n";
POE::Session->create(inline_states => {
_start => \&start,
"accept" => \&do_accept,
"accept_error" => sub { warn "$_[ARG0] error: $_[ARG2]\n" },
"client_input" => \&client_input,
"client_end" => \&client_end,
"client_flushed" => sub { delete $_[HEAP]->{clients}{$_[ARG0]}
+},
"server_input" => \&server_input,
"server_end" => \&server_end,
_default => sub {
die("calling non existant event ", $_[ARG0]) unless
substr($_[ARG0], 0, 1) eq "_";
return;
}
});
sub start {
my $heap = $_[HEAP];
$heap->{listener} = POE::Wheel::ListenAccept->new
(Handle => $listener,
AcceptEvent => "accept",
ErrorEvent => "accept_error");
$heap->{server} = POE::Wheel::ReadWrite->new
(Handle => $server,
InputEvent => "server_input",
ErrorEvent => "server_end");
$heap->{id_queue} = [];
$heap->{clients} = {};
}
sub do_accept {
my ($heap, $new_sock) = @_[HEAP, ARG0];
my $client = POE::Wheel::ReadWrite->new
(Handle => $new_sock,
InputEvent => "client_input",
ErrorEvent => "client_end");
$heap->{clients}{$client->ID} = $client;
}
sub client_input {
my ($heap, $line, $client_id) = @_[HEAP, ARG0, ARG1];
if ($heap->{server}) {
if ($line =~ /exit/) {
close_on_empty($heap, $client_id);
} else {
# Remember for which client id we queue this line
push @{$heap->{id_queue}}, $client_id;
$heap->{server}->put($line);
}
}
}
sub client_end {
my ($heap, $operation, $errnum, $errstr, $client_id) = @_[HEAP,ARG
+0..ARG3];
warn "$operation: $errstr\n" if $errnum;
delete $heap->{clients}->{$client_id};
}
sub server_input {
my $heap = $_[HEAP];
my $id = shift @{$heap->{id_queue}} ||
die "multiline echos from server ?";
$heap->{clients}{$id}->put($_[ARG0]) if $heap->{clients}{$id};
}
sub server_end {
my ($heap, $operation, $errnum, $errstr) = @_[HEAP,ARG0..ARG2];
warn "$operation: $errstr\n" if $errnum;
warn "Server shutdown\n";
delete $heap->{server};
delete $heap->{listener};
close $listener;
close_on_empty($heap, $_) for keys %{$heap->{clients}};
}
sub close_on_empty {
my ($heap, $client_id) = @_;
my $client = $heap->{clients}{$client_id};
if ($client->get_driver_out_octets) {
$client->event("FlushedEvent", "client_flushed");
} else {
delete $heap->{clients}{$client_id};
}
}
$poe_kernel->run();
The main thing missing from that code is flowcontrol. The system can be driven out of memory if the clients submit faster than the server can process. It probably should also introduce a sane maximum number of connected clients. |