Personally, I think you are reaching for the wrong solution.
I recently had a somewhat similar requirement and this is a cut down version of my solution. It runs an asynchronous (non-blocking) select server in a thread to handle the connects, reads and disconnects. It also takes care of buffering the reads and accumulating until the correct lengths have been received. At which point it queues them back to the main thread.
The main threads responsibilities are then simply to read the complete packets from the queue and process them:
#! perl -slw use strict; use threads; use Thread::Queue; use IO::Socket; use IO::Select; use constant PACKETLEN => 30; my $Q = new Thread::Queue; async { my $noBlock = 1; my $lsn = IO::Socket::INET->new( Proto => 'tcp', LocalPort => 12345, Listen => SOMAXCONN, Reuse => 1, TIMEOUT => 3, ) or die "Server failed to create listener: $^E"; binmode $lsn; ioctl( $lsn, 0x8004667e, \$noBlock ); my $sel = IO::Select->new( $lsn ); my %inbufs; while( 1 ) { my @ready = $sel->can_read(1); for my $ready ( @ready ) { if( $ready == $lsn ) { my $client = $lsn->accept or next; binmode $client; ioctl( $client, 0x8004667e, \$noBlock ); $sel->add( $client ); $inbufs{ $client } = ''; } else { if( read( $ready, $inbufs{ $ready }, PACKETLEN - length( $inbufs{ $ready } ), length( $inbufs{ $ready } ) ) == 0 ) { $ready->shutdown( 2 ); $sel->remove( $ready ); $ready->close; delete $inbufs{ $ready }; next; } length( $inbufs{ $ready } ) == PACKETLEN or next; $Q->enqueue( $inbufs{ $ready } ); $inbufs{ $ready } = ''; } } } }->detach; my $c = 0; while( my $packet = $Q->dequeue ) { ## Do something with the packet printf "\r%u: $packet", ++$c; }
By moving the packet processing into a separate thread you decouple the IO handling and the data processing and so get the best of both worlds. The IO thread is freed from the burden of processing the incoming data, ensuring that it is able to deal with incoming connects, packets and disconnects in a timely manner. Meanwhile the main thread can process each packet in a strictly linear fashion avoiding the need for elaborate state machines and time-critical event handlers, greatly simplifying things.
And if a single thread is unable to keep up with the inbound data rate, you can easily start a second thread reading from the same queue to handle it.
The posted code is light on error checking and handling, but it might form the basis of something to get you going, but even as is, it has proven remarkably robust, capable of handling 10s of clients and high volumes of data with aplomb.
In reply to Re: I search for an async IPC module (...)
by BrowserUk
in thread I search for an async IPC module that transfers arbitrary-length strings (preferably binary, but ASCII is fine too) via full-duplex, keep-alive TCP connection
by anykeyman
| For: | Use: | ||
| & | & | ||
| < | < | ||
| > | > | ||
| [ | [ | ||
| ] | ] |