in reply to Resource pools and concurrency
Dare I suggest threads? The following demos a simple architecture that might work for you.
It starts one thread per registry that makes a permanent, blocking connection to that registry. Each thread then enters a blocking loop reading from a dedicated (work) queue of requests.
Once the agent threads are running, the main thread creates a local, non-blocking, listening thread and loops over that and another (replies) queue.
When an inbound connection is made, it reads the request and enqueues it to the appropriate registry agent thread via its dedicate work queue.
When a reply is received from the replies queue, it is sent back to the requestor and the connection is dropped.
The only shared state or locking is inside the queues and these are unidirectional so there is no possibility of deadlocking, priority inversions or other nasties. Each registry agent object and connection are used only with a single thread.
There is one thread per registry, uses ~1.5 MB (less if you tailor the stack requirements), they run for the life of the connection and are blocking so no great resource is consumed. In my tests, around 1MB/agent and cpu usage never gets off zero no matter how hard I drive it.
Some sample code with dummied up agents that just sleep a bit and give back a stock answer:
#! perl -slw use strict; package Registry::Agent; sub new { my( $class, %config ) = @_; return bless \%config, $class; } sub makeRequest{ die "makeRequest for class ${ ref $_[0] } not reified"; } package Registry::Agent::A; use base 'Registry::Agent'; sub makeRequest{ sleep rand 30; return "The Answer from A"; } package Registry::Agent::B; use base 'Registry::Agent'; sub makeRequest{ sleep rand 30; return "The Answer from B"; } package Registry::Agent::C; use base 'Registry::Agent'; sub makeRequest{ sleep rand 30; return "The Answer from C"; } package main; use threads; use Thread::Queue; sub registryAgent { my( $class, $Qin, $Qout, %otherConfig ) = @_; #require $class; ## Disabled for mocked up agents. my $agent = $class->new( %otherConfig ) or die "Couldn't create '$class' object\n"; while( my $workItem = $Qin->dequeue ) { my( $tag, $request ) = unpack 'N/A* N/A*', $workItem; warn "$class: Got request for $tag:$request\n"; my $result = $agent->makeRequest( $request ); warn "$class: returning $result for $tag\n"; $Qout->enqueue( pack 'N/A* N/A*', $tag, $result ); } } my $Qresults = new Thread::Queue; my %workQs; my @agents = map{ my $Qwork = new Thread::Queue; $workQs{ $_ } = $Qwork; threads->create( \®istryAgent, $_, $Qwork, $Qresults, configA => 1, configB => 2 ); } qw[ Registry::Agent::A Registry::Agent::B Registry::Agent::C ]; require IO::Socket::INET; my $listener = IO::Socket::INET->new( LocalAddr => 'localhost:33333', Listen => 100, Blocking => 0, ) or die "Couldn't listen on localhost:33333; $!"; ioctl( $listener, 0x8004667e, \1 ); ## non-blocking on Win32 my $done = 0; $SIG{ INT } = sub{ shutdown( $listener, 0 ); ## Stop listening $_->enqueue( undef ) for values %workQs; ## Signal termination +to threads $_->join for @agents; ## Perform last rites. $done = 1; ## Tell main thread we +'re done. }; my %clients; while( !$done ) { if( my $client = $listener->accept ) { warn "main: Client:$client connected\n"; my $fno = fileno $client; $clients{ $fno } = $client; defined( my $request = <$client> ) or warn "Read failed: $!\n" and next; my( $agent, $rest ) = split ':', $request, 2; warn "main: Client: $client requests Agent:$agent Request:$res +t\n"; $workQs{ "Registry::Agent::$agent" }->enqueue( pack "N/A* N/A*", $fno, $rest ); } elsif( my $result = $Qresults->dequeue_nb ) { my( $fno, $reply ) = unpack 'N/A* N/A*', $result; warn "Got reply for request tag:$fno; sending...\n"; print { $clients{ $fno } } $reply; close delete $clients{ $fno }; } else { sleep 1; } } close $listener;
|
---|
Replies are listed 'Best First'. | |
---|---|
Re^2: Resource pools and concurrency
by mattk (Pilgrim) on Jun 28, 2007 at 06:55 UTC |