in reply to I search for an async IPC module that transfers arbitrary-length strings (preferably binary, but ASCII is fine too) via full-duplex, keep-alive TCP connection

Personally, I think you are reaching for the wrong solution.

I recently had a somewhat similar requirement and this is a cut down version of my solution. It runs an asynchronous (non-blocking) select server in a thread to handle the connects, reads and disconnects. It also takes care of buffering the reads and accumulating until the correct lengths have been received. At which point it queues them back to the main thread.

The main threads responsibilities are then simply to read the complete packets from the queue and process them:

#! perl -slw use strict; use threads; use Thread::Queue; use IO::Socket; use IO::Select; use constant PACKETLEN => 30; my $Q = new Thread::Queue; async { my $noBlock = 1; my $lsn = IO::Socket::INET->new( Proto => 'tcp', LocalPort => 12345, Listen => SOMAXCONN, Reuse => 1, TIMEOUT => 3, ) or die "Server failed to create listener: $^E"; binmode $lsn; ioctl( $lsn, 0x8004667e, \$noBlock ); my $sel = IO::Select->new( $lsn ); my %inbufs; while( 1 ) { my @ready = $sel->can_read(1); for my $ready ( @ready ) { if( $ready == $lsn ) { my $client = $lsn->accept or next; binmode $client; ioctl( $client, 0x8004667e, \$noBlock ); $sel->add( $client ); $inbufs{ $client } = ''; } else { if( read( $ready, $inbufs{ $ready }, PACKETLEN - length( $inbufs{ $ready } ), length( $inbufs{ $ready } ) ) == 0 ) { $ready->shutdown( 2 ); $sel->remove( $ready ); $ready->close; delete $inbufs{ $ready }; next; } length( $inbufs{ $ready } ) == PACKETLEN or next; $Q->enqueue( $inbufs{ $ready } ); $inbufs{ $ready } = ''; } } } }->detach; my $c = 0; while( my $packet = $Q->dequeue ) { ## Do something with the packet printf "\r%u: $packet", ++$c; }

By moving the packet processing into a separate thread you decouple the IO handling and the data processing and so get the best of both worlds. The IO thread is freed from the burden of processing the incoming data, ensuring that it is able to deal with incoming connects, packets and disconnects in a timely manner. Meanwhile the main thread can process each packet in a strictly linear fashion avoiding the need for elaborate state machines and time-critical event handlers, greatly simplifying things.

And if a single thread is unable to keep up with the inbound data rate, you can easily start a second thread reading from the same queue to handle it.

The posted code is light on error checking and handling, but it might form the basis of something to get you going, but even as is, it has proven remarkably robust, capable of handling 10s of clients and high volumes of data with aplomb.


With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.

Replies are listed 'Best First'.
Re^2: I search for an async IPC module (...)
by anykeyman (Novice) on May 28, 2013 at 07:40 UTC

    What wrong solution do you think I am reaching? Running program in event loop?

    Well, I can move event loop and costly data processing in separate threads for simplicity of code. If multi-core optimization is required, I can push event processing to pool of threads using Thread::Queue, right?

    BTW, thanks for this piece of code, it will help me a bit with sockets. Though, your post didn't answer my original question.

      What wrong solution do you think I am reaching?

      Vis: I search for an async IPC module ...

      For practical (rather than philosophical) reasons, I believe that looking for an async/event-driven IO module -- which by its very nature means a complete dynasty of framework-type modules -- is the wrong approach to solving the problem you describe.

      Frameworks -- modules which call you back when they decide you should do something; as opposed to libraries which you call when you want something done -- are inevitably heavyweight families of modules requiring an(other) framework-author-integrated module for every new type of event or IO (eg. POE::* and Event::*). And even then, you have to be very careful to tune your callbacks so that they don't run for too long, otherwise the event processing becomes unresponsive.

      The problems arise when your data processing of a single unit of data requires more time than is compatible with the response-time requirements of the IO code. You are then forced to try and break up that single unit of code into subunits handled by separate (artificial) events & callbacks; and that forces the saving and storing of intermediates states of the data between those callbacks. It all just gets complicated, messy and fragile.

      If multi-core optimization is required, I can push event processing to pool of threads using Thread::Queue, right?

      Arranging for multiple threads to have access to all the (internal) state used by such async-IO frameworks so that any of the event threads can respond to any of the events is nigh impossible.

      Equally, segregating your possible events into two (or more) subsets so that two or more event loop threads can process them independently is also a loosing game. The problem here is balancing those subsets so that no one set becomes the bottleneck to your processing.

      Though, your post didn't answer my original question.

      It would be hard to answer with "look at such-and-such asyncIO module"; when the premise of my post is that you should not be considering such a module for your application.

      But there is nothing to argue about here. It is just my opinion and if that doesn't fit with your philosophy, you are of course completely free to simply ignore it.


      With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.