dash2 has asked for the wisdom of the Perl Monks concerning the following question:

Here's some simple code to pass messages between peers:

The sender:

sub transport_message { my $self = shift; my $message_string = shift; my $peer = shift; my $headers = new HTTP::Headers; $headers->date(time); $headers->content_type('text/plain'); $headers->server('Net::Distributed'); my $request = new HTTP::Request ( 'POST', "http://" . $peer->address, $headers, $message_string ); unless ($self->{UserAgent}) { $self->{UserAgent} = new LWP::UserAgent; $self->{UserAgent}->agent ("Net::Distributed::Transport/$VERSION"); $self->{UserAgent}->from ($self->{email}); $self->{UserAgent}->parse_head(0); $self->{UserAgent}->timeout($self->{send_timeout}); # send_timeout normally 5 secs } my $ua = $self->{UserAgent}; my $response = $ua->request($request); if ($response->is_success) { return 1; # of course, there may be problems further down the line } else { $self->_debug("Got " . $response->status_line . " from " . $peer->ad +dress, 0); return 0; } }
The receiver:

sub accept_messages { my $self = shift; unless ($self->{daemon}) { my ($localaddr, $localport) = split /:/, $self->{address}; $self->{daemon} = new HTTP::Daemon ( LocalAddr => $localaddr, LocalPort => $localport ); ref $self->{daemon} or $self->_debug("Could not create HTTP::Daemon +$! $@", 3); $self->{daemon}->timeout($self->{timeout}) if $self->{timeout}; # normally timeout is 20 } local $SIG{PIPE} = sub { warn 'PIPE WHILE READING'; undef $self->{daemon}; }; CONNECTION: while (1) { my $conn = $self->{daemon}->accept; # blocks return undef unless $conn; my $req = $conn->get_request; unless ($req) { $self->_debug( 'Problem reading from connection: ' . $conn->reason ); next CONNECTION; } if ($req->method ne 'POST') { my $r = new HTTP::Response (405,'POST method only' ); $conn->send_response($r); $conn->close; next CONNECTION; } elsif (my $content = $req->content) { my $r = new HTTP::Response(200, 'OK'); $conn->send_response($r); $conn->close; return $content; } } }

A lot of messages are flying about, and it gets pretty busy. I keep getting 'broken pipe' signals. Of course, I can ignore them with $SIG{PIPE} = 'IGNORE', but my attempts to send and receive messages still fail with timeouts.

What am I doing wrong? I've read perlipc and understand that I am trying to write to a broken pipe, but how can I stop this happening? Ideally, I need to make sure that the server queues messages and handles them one by one.

TVM

dave hj~

Replies are listed 'Best First'.
Re: HTTP::Daemon and broken pipes
by Dog and Pony (Priest) on Feb 16, 2002 at 12:03 UTC
    I haven't written one of these in perl, but I have done lots of the same stuff in java, and the preferred way to do things there is using a Thread to take care of the response. Something like:
    while(1) { my $conn = $self->{daemon}->accept; # blocks return undef unless $conn; my $thread = new Thread \&handle_request, $conn; # Thread away, next please! } sub handle_request { my $conn = shift; my $req = $conn->get_request; # The rest of your code here }
    The loop with the accept should loop infinetly, with no timeout. This will cause your program to listen to the port, and everytime a connection is received, it will be handled by a new thread. The loop immideately goes back to listening for subsequent requests.

    Since I do not know the rest of your code, I will take some wild shots here. It seems like you want to call accept_messages for each message you want to receive, and get some answer back, to do something with. Well, with my kind of approach, you want to call this method once, and then let it run, doing the processing of the message in the thread you spawn. That way you can handle a lot of messages. But this will probably trigger quite a rewrite, depening on what you do with the message ($content) you previously got back from accept_messages?

    You will also need to implement some way of shutting down the server, I usually have a command I can send to it via the port, if it isn't public. Or you could just trap CTRL-C and do some cleaning up (letting the threads finish first via join, perhaps, if you store them in an array) before a graceful shutdown.

    Anyhow, this is, more or less, how most servers do operate. Sorry if I am being unclear on something here. And do remember that this is more like pseudo-code than anything you can use and trust off the bat. Also you will want to read up on all the quirks with threading, so you know what you are doing. Done right, they are indeed very powerful tools. :)

      It's a nice thought, but this code is to distribute - it's not a client server model - so I can't assume that people will have a Perl built for threading. I think I need something like IO::Select, but preferably with the http stuff built in. For the context of the code, btw, see Net::Distributed.

      dave hj~

Re: HTTP::Daemon and broken pipes
by merlyn (Sage) on Feb 19, 2002 at 14:13 UTC
      Hi Randal

      I did consider it, and XML-RPC also. The decision against was basically that I didn't want to return more than a '200 OK' on messages, whereas XML-RPC (and SOAP IIRC) are basically 2 way messaging systems, where you send a request and get a meaningful response. I wanted to be able to use a one-way system, where nodes could send responses async... ascynchr... later on if they wanted to.

      maybe I should have, and there's no reason that the transport couldn't be subclassed to use SOAP (or indeed JXTA, Freenet etc. etc.)... but still, right now, I have loads of cool code that is stuck because i can't figure out how to make HTTP::Daemon recover correctly from a SIGPIPE...

      dave hj~

(tye)Re: HTTP::Daemon and broken pipes
by tye (Sage) on Feb 18, 2002 at 18:31 UTC

    A HTTP server needs to ignore SIGPIPE. Otherwise the server might die just because a client got closed before it finished reading the response from the server (though I thought that TCP/IP sockets didn't signal SIGPIPE -- but it probably varies by operating system).

    If a client goes away, it could take a while for the server's attempt to send data to it to eventually fail. It depends how the client goes away. If the client process dies, then the operating system will usually ensure that the send attempts quickly fail. If the client hangs, the computer or network fails, or many other failure cases, then the server is likely to be tied up for about 2 minutes trying to decide whether to give up resending data.

            - tye (but my friends call me "Tye")

      Thanks Tye.

      Actually, I am testing over the local loopback, so this may explain the SIGPIPEs. But assume I do ignore SIGPIPE, I still wonder how to stop getting continual "could not connect errors". I remember IO::Socket has a listen method... can I call this on HTTP::Daemon to allow it to queue connections? I'll try it.

      dave hj~

      My idea of setting listen() didn't work. I have experimented some more. Essentially, the system works fine for a bit, but then starts receiving SIGPIPEs. After that, nothing gets through... for some reason the message receiver never manages to accept another message. Here is my current code - can anyone see what I am doing wrong?

      sub accept_messages { my $self = shift; unless ($self->{daemon}) { my ($localaddr, $localport) = split /:/, $self->{address}; $self->{daemon} = new HTTP::Daemon ( LocalAddr => $localaddr, LocalPort => $localport ); ref $self->{daemon} or $self->_debug("Could not create HTTP::Daemon +$! $@", 3); $self->{daemon}->timeout($self->{timeout}) if $self->{timeout}; $self->{daemon}->listen($self->{queue}) if $self->{queue}; # tried s +etting this to 15. no joy. } local $SIG{PIPE} = sub { $self->_debug('PIPE WHILE ACCEPTING MESSAGE'); }; CONNECTION: while (1) { my $conn = $self->{daemon}->accept; # blocks return undef unless $conn; my $req = $conn->get_request; unless ($req) { $self->_debug( 'Problem reading from connection: ' . $conn->reason ); next CONNECTION; } if ($req->method ne 'POST') { my $r = new HTTP::Response (405,'POST method only' ); $conn->send_response($r); $conn->close; next CONNECTION; } elsif (my $content = $req->content) { my $r = new HTTP::Response(200, 'OK'); $conn->send_response($r); $conn->close; return $content; } } } sub transport_message { my $self = shift; my $message_string = shift; my $peer = shift; my $headers = new HTTP::Headers; $headers->date(time); $headers->content_type('text/plain'); $headers->server('Net::Distributed'); my $request = new HTTP::Request ( 'POST', "http://" . $peer->address, $headers, $message_string ); unless ($self->{UserAgent}) { $self->{UserAgent} = new LWP::UserAgent; $self->{UserAgent}->agent ("Net::Distributed::Transport/$VERSION"); $self->{UserAgent}->from ($self->{email}); $self->{UserAgent}->parse_head(0); $self->{UserAgent}->timeout($self->{send_timeout}); } my $ua = $self->{UserAgent}; local $SIG{PIPE} = sub { $self->_debug('PIPE WHILE SENDING MESSAGE'); }; my $response = $ua->request($request); if ($response->is_success) { return 1; # of course, there may be problems further down the line } else { $self->_debug("Got " . $response->status_line . " from " . $peer->ad +dress, 0); return 0; } }
      dave hj~