for my $i ( 0 .. 50 ) {
queueProcessRequest( $i );
}
####
while (my $job = $dispatcher->dequeue()) {
####
#!/usr/bin/perl -lw
use strict;
use threads;
use threads::shared;
use Thread::Queue;
use Thread::Semaphore;
use Time::HiRes ('usleep');
my $screenaccess :shared; # Lock for printing to screen
my $multi :shared; # Number simultaneous threads
my $logline; # Log line content
my $logsize; # Log line size (not counting attributes)
my @log :shared; # For debugging, incase a thread errors out
my $dispatcher :shared; # Job queue
my $queuelen; # Max queue length
my $throttle :shared; # Queue semaphore
my @thread;
my $semStdout :shared;
$|++;
sub tprint{
my $str = shift;
my $tid = threads->tid();
lock $semStdout;
print "[$tid] $str";
}
sub initThreadedOperation {
$multi = shift;
$queuelen = 1 * $multi;
$dispatcher = Thread::Queue->new();
$throttle = Thread::Semaphore->new($queuelen);
for( 1 .. $multi ) {
push( @thread, threads->create( \&processrefsthread, $_ ) );
}
print STDERR scalar(@thread), ' threads created', "\n";
}
sub endThreadedOperation {
$dispatcher->enqueue((undef) x scalar(@thread));
foreach (@thread) {
$_->join();
}
}
sub processrefsthread {
tprint 'Starting';
while (my $job = $dispatcher->dequeue()) {
tprint "processing job $job";
$throttle->up(); # Job removed from queue
my $d = int(rand(5));
for my $i (0 .. $d * 500000) {
my $j = $i * $d;
$j = int(rand($j));
}
}
tprint 'Ending';
}
sub queueProcessRequest {
my ($job) = @_;
$dispatcher->enqueue($job);
$throttle->down();
return undef
}
initThreadedOperation( 4 );
for my $i ( 1 .. 50 ) {
tprint "Qing job $i";
queueProcessRequest( $i );
}
endThreadedOperation();
####
#!/usr/bin/perl -lw
use strict;
use threads;
use threads::shared;
use threads::Q;
my $semStdout :shared;
$|++;
sub tprint{
my $str = shift;
my $tid = threads->tid();
lock $semStdout;
print "[$tid] $str";
}
sub processrefsthread {
my $Q = shift;
tprint 'Starting';
while( my $job = $Q->dq() ) {
tprint "processing job $job";
my $d = int(rand(5));
for my $i (0 .. $d * 500000) {
my $j = $i * $d;
$j = int(rand($j));
}
}
tprint 'Ending';
}
our $T //= 4;
my $Q = threads::Q->new( $T * 4 ); ## Q self-limits to 4*$T elements
my @threads = map threads->new( \&processrefsthread, $Q ), 1 .. $T;
$Q->nq( $_ ) for 1 .. 50;
$Q->nq( (undef) x $T );
$_->join for @threads;