http://qs1969.pair.com?node_id=623539

mattk has asked for the wisdom of the Perl Monks concerning the following question:

I'm looking into a fun problem at my new job at an internet registrar. We have data connections with each registry (for domain creation, transfer notification etc) that take a while to initiate - sometimes querying a domain name can take upwards of 30 seconds to cycle through all the registries.

Ideally we'd like to drop an RPC server in between the calling scripts and the registry modules, that accepts incoming requests from clients, uses an existing connection handle for the current transaction, and then passes the reply back. My first stop was with ResourcePool and Frontier::Daemon::Forking, before I realised that when the child process exits, all the connections will be shutdown when the registry objects are DESTROYed, and the next time a client connects each registry connection will have to be re-initialised, nicely eating up any speed increases already gained. My next thought was to do the registry side with non-blocking IO, but the problem here is that I can't handle those sockets directly, I need to call a method in of the existing classes and let that parse the data from the socket.

POE, specifically POE::Component::Generic::Object, looks like it might be the right way to go - specifically, the yield method. Event might also be worth checking. Ideas? This seems like a fairly solvable problem, but I'm not sure which path is the right one to take.

Replies are listed 'Best First'.
Re: Resource pools and concurrency
by BrowserUk (Patriarch) on Jun 27, 2007 at 12:03 UTC

    Dare I suggest threads? The following demos a simple architecture that might work for you.

    It starts one thread per registry that makes a permanent, blocking connection to that registry. Each thread then enters a blocking loop reading from a dedicated (work) queue of requests.

    Once the agent threads are running, the main thread creates a local, non-blocking, listening thread and loops over that and another (replies) queue.

    When an inbound connection is made, it reads the request and enqueues it to the appropriate registry agent thread via its dedicate work queue.

    When a reply is received from the replies queue, it is sent back to the requestor and the connection is dropped.

    The only shared state or locking is inside the queues and these are unidirectional so there is no possibility of deadlocking, priority inversions or other nasties. Each registry agent object and connection are used only with a single thread.

    There is one thread per registry, uses ~1.5 MB (less if you tailor the stack requirements), they run for the life of the connection and are blocking so no great resource is consumed. In my tests, around 1MB/agent and cpu usage never gets off zero no matter how hard I drive it.

    Some sample code with dummied up agents that just sleep a bit and give back a stock answer:

    #! perl -slw use strict; package Registry::Agent; sub new { my( $class, %config ) = @_; return bless \%config, $class; } sub makeRequest{ die "makeRequest for class ${ ref $_[0] } not reified"; } package Registry::Agent::A; use base 'Registry::Agent'; sub makeRequest{ sleep rand 30; return "The Answer from A"; } package Registry::Agent::B; use base 'Registry::Agent'; sub makeRequest{ sleep rand 30; return "The Answer from B"; } package Registry::Agent::C; use base 'Registry::Agent'; sub makeRequest{ sleep rand 30; return "The Answer from C"; } package main; use threads; use Thread::Queue; sub registryAgent { my( $class, $Qin, $Qout, %otherConfig ) = @_; #require $class; ## Disabled for mocked up agents. my $agent = $class->new( %otherConfig ) or die "Couldn't create '$class' object\n"; while( my $workItem = $Qin->dequeue ) { my( $tag, $request ) = unpack 'N/A* N/A*', $workItem; warn "$class: Got request for $tag:$request\n"; my $result = $agent->makeRequest( $request ); warn "$class: returning $result for $tag\n"; $Qout->enqueue( pack 'N/A* N/A*', $tag, $result ); } } my $Qresults = new Thread::Queue; my %workQs; my @agents = map{ my $Qwork = new Thread::Queue; $workQs{ $_ } = $Qwork; threads->create( \&registryAgent, $_, $Qwork, $Qresults, configA => 1, configB => 2 ); } qw[ Registry::Agent::A Registry::Agent::B Registry::Agent::C ]; require IO::Socket::INET; my $listener = IO::Socket::INET->new( LocalAddr => 'localhost:33333', Listen => 100, Blocking => 0, ) or die "Couldn't listen on localhost:33333; $!"; ioctl( $listener, 0x8004667e, \1 ); ## non-blocking on Win32 my $done = 0; $SIG{ INT } = sub{ shutdown( $listener, 0 ); ## Stop listening $_->enqueue( undef ) for values %workQs; ## Signal termination +to threads $_->join for @agents; ## Perform last rites. $done = 1; ## Tell main thread we +'re done. }; my %clients; while( !$done ) { if( my $client = $listener->accept ) { warn "main: Client:$client connected\n"; my $fno = fileno $client; $clients{ $fno } = $client; defined( my $request = <$client> ) or warn "Read failed: $!\n" and next; my( $agent, $rest ) = split ':', $request, 2; warn "main: Client: $client requests Agent:$agent Request:$res +t\n"; $workQs{ "Registry::Agent::$agent" }->enqueue( pack "N/A* N/A*", $fno, $rest ); } elsif( my $result = $Qresults->dequeue_nb ) { my( $fno, $reply ) = unpack 'N/A* N/A*', $result; warn "Got reply for request tag:$fno; sending...\n"; print { $clients{ $fno } } $reply; close delete $clients{ $fno }; } else { sleep 1; } } close $listener;

    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.
      This is exactly what I needed :) After hacking on it for most of the day I've got a thread pool for interacting with the clients via RPC, and another for interacting with the registries. Works beautifully!
Re: Resource pools and concurrency
by Moron (Curate) on Jun 27, 2007 at 09:59 UTC
    Can you say what the existing interface to the registry modules is like - is it persistent? is it blocking? And what platform is it running on?
    __________________________________________________________________________________

    ^M Free your mind!

      At the moment, we have a few background processes that continually poll the registries to deal with anything that needs (near) real-time processing. There are also customer facing tools (APIs, web pages) that use the same modules and these would mainly benefit from the speed increase. The daemons would just benefit from a slightly more abstracted design. Since it takes 4 or 5 seconds to set up a connection, and only 0.5s to make a query, it would be ideal to keep the handles alive and share them when needed. And they're blocking right now... It would be a pretty big timesink to move all the existing modules onto non-blocking IO. The box is running a 2.4.22 kernel, Perl 5.8.1.
        First idea is I would reject the idea of trying to share handles with a single daemon - why not clone the daemons instead and have a transaction monitoring layer that sorts out which daemons are busy or not - keep a stock of so many available daemons of a particular kind (say 8) that have completed initiation but are not yet servicing requests, so that if two are busy handling requests, a ninth and tenth start initiating so that the stock control count of 8 identical daemons (for example) is always ready to handle new requests that are not yet received. When more than the required stock of clones is ready for new requests, kill off the excess to control their number. The transaction monitoring layer needs to identify requesters and keep a table of which cloned processes are allocated which requests and which are free.

        Update: I have wound backward my thinking to the functional design stage which I myself tend to want to be in a feasible state before I feel safe in suggesting a choice in regard to such clones being independent processes, forks, threads or POes.

        __________________________________________________________________________________

        ^M Free your mind!