one of the threads is silently terminated (without intentional error injection) and I can't find why.
Your first thread to run will always terminate immediately because you are pushing a job id of zero:
for my $i ( 0 .. 50 ) { queueProcessRequest( $i ); }
Which when it is received:
while (my $job = $dispatcher->dequeue()) {
Test false and terminates the loop.
Aside from that, your code is very confused. I have no idea what syncIdle is meant to be doing, and removing it, along with various other chunks of code that seem to serve no good purpose, I got to this which runs perfectly:
That said, limiting your queue to 4 elements to feed 4 threads means that your threads are running in lock-step, which pretty much defeats the purpose of using a queue.
And using Thread::Semaphore as the mechanism for limiting the size of the queue is like hiring the Pinkerton's to manage the queue for the bathroom in the mornings. Not just overkill, but by forcing lock-step (synchronising your threads), throws away much of the gain from asynchronous processing.
Here's how I would write the same program:
#!/usr/bin/perl -lw use strict; use threads; use threads::shared; use threads::Q; my $semStdout :shared; $|++; sub tprint{ my $str = shift; my $tid = threads->tid(); lock $semStdout; print "[$tid] $str"; } sub processrefsthread { my $Q = shift; tprint 'Starting'; while( my $job = $Q->dq() ) { tprint "processing job $job"; my $d = int(rand(5)); for my $i (0 .. $d * 500000) { my $j = $i * $d; $j = int(rand($j)); } } tprint 'Ending'; } our $T //= 4; my $Q = threads::Q->new( $T * 4 ); ## Q self-limits to 4*$T elements my @threads = map threads->new( \&processrefsthread, $Q ), 1 .. $T; $Q->nq( $_ ) for 1 .. 50; $Q->nq( (undef) x $T ); $_->join for @threads;
You can get threads::Q (my own, efficient, self-limiting queue implementation) from here.
In reply to Re^3: How can I force thread switching?
by BrowserUk
in thread How can I force thread switching?
by ajl52
| For: | Use: | ||
| & | & | ||
| < | < | ||
| > | > | ||
| [ | [ | ||
| ] | ] |