Beefy Boxes and Bandwidth Generously Provided by pair Networks
Perl: the Markov chain saw
 
PerlMonks  

Re^7: multithreaded tcp listener with IO::Socket

by BrowserUk (Patriarch)
on May 18, 2006 at 13:31 UTC ( [id://550235]=note: print w/replies, xml ) Need Help??


in reply to Re^6: multithreaded tcp listener with IO::Socket
in thread multithreaded tcp listener with IO::Socket

You need a dot ('.'), not a comma (',') (and don't forget some error handling :).

open my $socket, '+<&=' . $fileno or die $!; ........................^

For an explaination of the syntax and what it is doing, see perlopentut and the section entitled "Re-Opening Files (dups)".

Sorry for not posting the code I promised. I've been trying to work it into a proper module. It works, but need substantial extra testing and documentation. I'll post it below as it is in it's current state.

The usage will be something like this:

my $server = threads::Server->new( LocalPort => 9000, Pool => 5, Accept => sub { printf "Accepted connection from %s:%d on port:%d\n", $_->peerhost, $_->peerport, $_->sockport; return; }, Thread => sub { my( $client, $Qout, @args ) = @_; while( <$client> ) { chomp; ##1 warnf "Got '%s'\n", $_; $Qout->enqueue( $_ ); print $client 'Ack'; } }, Common => sub { my $Q = shift; while( $Q->dequeue() ) { #1 warnf "Processing '$_'"; } }, ); $server->Run;

That needs explaination, documentation, a lot of work, and relies on another of my own modules (a ripoff of theDamian's Smart::Comments), but it might provide some ideas for you.

package threads::Server; use Smart::Comments::Lite '4'; use 5.008006; use strict; use warnings; use threads; use Carp qw[ cluck ]; use Thread::Queue; use IO::Socket; our $VERSION = '1.0.0'; my %serverDefaults = ( Pool => 3, Accept => sub{ undef }, Thread => sub{ warn }, Common => sub{ warn }, ); my %socketDefaults = ( LocalHost => 'localhost', Listen => 5, Reuse => 1, ); sub new { my( $class, %args ) = @_; ##4 warnf "1:%s\n", join ' ', %args; my %self = %serverDefaults; @self{ keys %serverDefaults } = delete @args{ keys %serverDefaults + }; ##4 warnf "2:%s\n", join' ',%args; @self{ qw[ Qwork Qin Qclean ] } = map{ new Thread::Queue } 1 .. 3; @{ $self{ Threads } } = map{ threads->create( \&_thread, @self{ qw[ Thread Qwork Qin Qclean + ] } ) } 1 .. $self{ Pool }; threads->create( @self{ qw[ Common Qin ] } ); %args = ( %socketDefaults, %args ); ##4 warnf "3:%s\n", join ' ', %args; $self{ Socket } = IO::Socket::INET->new( %args ) or cluck __PACKAGE__ . ": failed to create listening socket: $ +!\n" and return PACKAGE->DESTROY( \%self ); return bless \%self, $class; } sub Run { my $self = $_[0]; my( $server, $Qwork, $Qclean, $accept ) = @{ $self }{ qw[ Socket +Qwork Qclean Accept ] }; while( local $_ = $server->accept ) { my $fileno = fileno $_; $self->{ Clients }{ $fileno } = $_; my $clientArgs = eval{ join chr(0), $accept->() } || ''; $Qwork->enqueue( "$fileno\0$clientArgs" ); while( $Qclean->pending ) { ##1 warnf "Cleanup of %d\n", my $fno = $Qclean->dequeue(); close delete $self->{ Clients }{ $fno }; } } } sub _thread { ##1 warnf "Starting thread %d\n", threads->tid; my( $userCode, $Qwork, $Qin, $Qclean ) = @_; while( my $work = $Qwork->dequeue() ) { my( $fileno, @args ) = split chr(0), $work; open my $client, '+<&=' . $fileno or cluck "Failed to dup $fileno in ${ \threads->tid } : $! +\n" and next; $userCode->( $client, $Qin, @args ); close $client; $Qclean->enqueue( $fileno ); } } sub DESTROY { } return 1 if caller; local $\ = "\n"; my $server = threads::Server->new( LocalPort => 9000, Pool => 5, Accept => sub { printf "Accepted connection from %s:%d on port:%d\n", $_->peerhost, $_->peerport, $_->sockport; return; }, Thread => sub { my( $client, $Qout, @args ) = @_; while( <$client> ) { chomp; ##1 warnf "Got '%s'\n", $_; $Qout->enqueue( $_ ); print $client 'Ack'; } }, Common => sub { my $Q = shift; while( $Q->dequeue() ) { #1 warnf "Processing '$_'"; } }, ); $server->Run; __END__ =head1 NAME threads::Server - Perl extension for blah blah blah =head1 SYNOPSIS use threads::Server; blah blah blah =head1 DESCRIPTION Stub documentation for threads::Server, created by h2xs. It looks like + the author of the extension was negligent enough to leave the stub unedited. Blah blah blah. =head2 EXPORT None by default. =head1 SEE ALSO Mention other useful documentation such as the documentation of related modules or operating system documentation (such as man pages in UNIX), or any relevant external documentation such as RFCs or standards. If you have a mailing list set up for your module, mention it here. If you have a web site set up for your module, mention it here. =head1 AUTHOR A. U. Thor, E<lt>a.u.thor@a.galaxy.far.far.awayE<gt> =head1 COPYRIGHT AND LICENSE Copyright (C) 2006 by A. U. Thor This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.8.6 or, at your option, any later version of Perl 5 you may have available. =cut

Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
Lingua non convalesco, consenesco et abolesco. -- Rule 1 has a caveat! -- Who broke the cabal?
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.

Replies are listed 'Best First'.
Re^8: multithreaded tcp listener with IO::Socket
by txemy (Initiate) on Nov 15, 2010 at 10:48 UTC

    hello, first of all thanks a lot for posting that code, it has been of great help for me.

    Now, I found I need to make some changes for it to work. I'm starting a process to serve http request on a port. Client is doing basically this:

    $ua = LWP::UserAgent->new; my $req = HTTP::Request->new(GET => "http://localhost:8998/foo"); $req->authorization_basic('fo', 'fi'); my $res = $ua->request($req) ;

    The thing is it looks I really need to close the socket in sub Run after the sub _thread opened the dequeued file_no. . In other words, instead of :

    while( $Qclean->pending ) { my $fno = $Qclean->dequeue(); close delete $self->{ Clients }{ $fno }; }

    I need to do:

    my $fno = $Qclean->dequeue(); close delete $self->{ Clients }{ $fno };

    before letting it arrive to the "local $_ = $server->accept " again.

    If I don't do that,then looks like the "close" of the fd I get in sub _thread does not really closes the socket? The thing is "$ua->request($req)" at the client does not return until I kill the server process if I don't actually close the socket in sub Run.

      This is untested as I have a bunch of guys with chainsaws and an industrial-sized wood-chipper making enough noise just outside my window to cause the dead to wake and migrate.

      Try substituting these replacements:

      sub Run { my $self = $_[0]; my( $server, $Qwork, $Qclean, $accept ) = @{ $self }{ qw[ Socket Qwork Qclean Accept ] }; while( local $_ = $server->accept ) { my $fileno = fileno $_; $self->{ Clients }{ $fileno } = $_; my $clientArgs = eval{ join chr(0), $accept->() } || ''; $Qwork->enqueue( "$fileno\0$clientArgs" ); while( my $fno = $Qclean->dequeue() ) { #warnf "Cleanup of %d\n", $fno; close delete $self->{ Clients }{ $fno }; } } } sub _thread { #warnf "Starting thread %d\n", threads->tid; my( $userCode, $Qwork, $Qin, $Qclean ) = @_; while( my $work = $Qwork->dequeue() ) { my( $fileno, @args ) = split chr(0), $work; open my $client, '+<&=' . $fileno or cluck "Failed to dup $fileno in ${ \threads->tid } : $! +\n" and next; $Qclean->enqueue( $fileno ); $userCode->( $client, $Qin, @args ); close $client; } }

      The basic problem is that your client won't attempt to move on to making the next connection until both copies of the socket are closed at the server; but the Accept loop copy of the socket won't get closed until someone (your client or another client), make another connection. In a reasonably active system, another client connecting causes the accept loop to cycle and the cleanup occurs; but on a system with only a single client that obviously can't happen.

      The solution above avoids that by having the accept loop wait for the client thread to queue the fileno back for cleanup, and then closing its copy of the socket immediately. I've also made the client thread queue the fileno back as soon as it has duped it, to minimise the impact upon the accept loop. It's not a perfect solution for performance, but then using Perl for a server is never going to be the ultimate high performance solution.

      I'll try to test this locally, and maybe come up with something better, once the ambient nose level here drops below 100dB; but please do feed back your findings.


      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.

        Yes, this make it work.In the previous post I forgot to say I also had the client thread enqueuing the "you can close this fd" message on Qclean as soon as it opened it's copy .

        Thanks a lot again. (and try to gather some wood chips when they finish : small ones are quite good as the first step in making a good fire !. Enjoy the Autumn :-)

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://550235]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others making s'mores by the fire in the courtyard of the Monastery: (7)
As of 2024-04-19 14:36 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found