Simple solution (assuming you're just passing scalar values
to your worker threads):
use Thread::Queue;
# in your "job shutdown" dispatcher:
#
# start the worker threads, passing the $q
#
my @workers = ();
my %queues = ();
my $q;
$q = Thread::Queue->new(),
push(@workers, threads->new(\&run, $q)),
$queues{$workers[-1]->tid()} = $q
foreach (1..$threadcount);
#
# wait for them all to tell you they're ready
#
my $resp = $_->dequeue()
foreach (values %queues);
#
# now do yer thang...when you get a "shutdown event":
#
$queues{$killtid}->enqueue('Kill job');
and a stripped down worker thread:
sub run {
my $q = shift;
$q->enqueue('ready');
#
# need to sleep or otherwise wait for the
# spawner to read the queue
#
while (1) {
my $cmd = $q->dequeue();
#
# process the command; you'll need to
# add a timer to come back and check it
#
}
}
All that being said, if you need to pass around more complex
resource objects between threads, you might consider
Thread::Queue::Duplex, and make your object threads::shared, e.g.,
package WorkerResource;
use Thread::Queue::Queueable;
use base qw(Thread::Queue::Queueable);
sub new {
my $class = shift;
my %obj : shared = ();
#
# ...do some object init..
#
return bless \%obj, $class;
}
#
# overload Thread::Queue::Queueable marshal methods
#
sub curse {
#
# marshall any complex object members into something
# that can be passed over a Thread::Queue::Duplex,
# i.e., either a scalar, or a threads::shared ref
# note that, if your object is threads::shared,
# it implies all its members are either scalar or
# threads::shared, so you can use the default TQQ::curse()
#
#
}
sub redeem {
my ($class, $obj) = @_;
#
# unmarshall any complex object members and rebless the
# object back into its class.
# note that, if your object is threads::shared,
# it implies all its members are either scalar or
# threads::shared, so you can use the default TQQ::redeem(),
# which just blesses
#
}
package main;
my $q = Thread::Queue::Duplex->new();
my $thread = threads->new(\&run, $q);
my $resource = WorkerResource->new();
my $msgid = $q->enqueue($resource);
my $result = $q->wait($msgid);
If the resource you're passing between threads needs
read/write locking, check Thread::Resource::RWLock.
However, be careful of passing around closures and filehandles.
The former just won't work; the latter needs some magic to
convert to fileno's in curse() and back to handles in redeem(). |