Dare I suggest threads? The following demos a simple architecture that might work for you.
It starts one thread per registry that makes a permanent, blocking connection to that registry. Each thread then enters a blocking loop reading from a dedicated (work) queue of requests.
Once the agent threads are running, the main thread creates a local, non-blocking, listening thread and loops over that and another (replies) queue.
When an inbound connection is made, it reads the request and enqueues it to the appropriate registry agent thread via its dedicate work queue.
When a reply is received from the replies queue, it is sent back to the requestor and the connection is dropped.
The only shared state or locking is inside the queues and these are unidirectional so there is no possibility of deadlocking, priority inversions or other nasties. Each registry agent object and connection are used only with a single thread.
There is one thread per registry, uses ~1.5 MB (less if you tailor the stack requirements), they run for the life of the connection and are blocking so no great resource is consumed. In my tests, around 1MB/agent and cpu usage never gets off zero no matter how hard I drive it.
Some sample code with dummied up agents that just sleep a bit and give back a stock answer:
#! perl -slw
use strict;
package Registry::Agent;
sub new {
my( $class, %config ) = @_;
return bless \%config, $class;
}
sub makeRequest{
die "makeRequest for class ${ ref $_[0] } not reified";
}
package Registry::Agent::A;
use base 'Registry::Agent';
sub makeRequest{ sleep rand 30; return "The Answer from A"; }
package Registry::Agent::B;
use base 'Registry::Agent';
sub makeRequest{ sleep rand 30; return "The Answer from B"; }
package Registry::Agent::C;
use base 'Registry::Agent';
sub makeRequest{ sleep rand 30; return "The Answer from C"; }
package main;
use threads;
use Thread::Queue;
sub registryAgent {
my( $class, $Qin, $Qout, %otherConfig ) = @_;
#require $class; ## Disabled for mocked up agents.
my $agent = $class->new( %otherConfig )
or die "Couldn't create '$class' object\n";
while( my $workItem = $Qin->dequeue ) {
my( $tag, $request ) = unpack 'N/A* N/A*', $workItem;
warn "$class: Got request for $tag:$request\n";
my $result = $agent->makeRequest( $request );
warn "$class: returning $result for $tag\n";
$Qout->enqueue( pack 'N/A* N/A*', $tag, $result );
}
}
my $Qresults = new Thread::Queue;
my %workQs;
my @agents = map{
my $Qwork = new Thread::Queue;
$workQs{ $_ } = $Qwork;
threads->create(
\®istryAgent, $_, $Qwork, $Qresults,
configA => 1, configB => 2
);
} qw[ Registry::Agent::A Registry::Agent::B Registry::Agent::C ];
require IO::Socket::INET;
my $listener = IO::Socket::INET->new(
LocalAddr => 'localhost:33333',
Listen => 100,
Blocking => 0,
) or die "Couldn't listen on localhost:33333; $!";
ioctl( $listener, 0x8004667e, \1 ); ## non-blocking on Win32
my $done = 0;
$SIG{ INT } = sub{
shutdown( $listener, 0 ); ## Stop listening
$_->enqueue( undef ) for values %workQs; ## Signal termination
+to threads
$_->join for @agents; ## Perform last rites.
$done = 1; ## Tell main thread we
+'re done.
};
my %clients;
while( !$done ) {
if( my $client = $listener->accept ) {
warn "main: Client:$client connected\n";
my $fno = fileno $client;
$clients{ $fno } = $client;
defined( my $request = <$client> )
or warn "Read failed: $!\n" and next;
my( $agent, $rest ) = split ':', $request, 2;
warn "main: Client: $client requests Agent:$agent Request:$res
+t\n";
$workQs{ "Registry::Agent::$agent" }->enqueue(
pack "N/A* N/A*", $fno, $rest
);
}
elsif( my $result = $Qresults->dequeue_nb ) {
my( $fno, $reply ) = unpack 'N/A* N/A*', $result;
warn "Got reply for request tag:$fno; sending...\n";
print { $clients{ $fno } } $reply;
close delete $clients{ $fno };
}
else {
sleep 1;
}
}
close $listener;
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.
|