in reply to Re^2: How can I force thread switching?
in thread How can I force thread switching?
one of the threads is silently terminated (without intentional error injection) and I can't find why.
Your first thread to run will always terminate immediately because you are pushing a job id of zero:
for my $i ( 0 .. 50 ) { queueProcessRequest( $i ); }
Which when it is received:
while (my $job = $dispatcher->dequeue()) {
Test false and terminates the loop.
Aside from that, your code is very confused. I have no idea what syncIdle is meant to be doing, and removing it, along with various other chunks of code that seem to serve no good purpose, I got to this which runs perfectly:
#!/usr/bin/perl -lw use strict; use threads; use threads::shared; use Thread::Queue; use Thread::Semaphore; use Time::HiRes ('usleep'); my $screenaccess :shared; # Lock for printing to screen my $multi :shared; # Number simultaneous threads my $logline; # Log line content my $logsize; # Log line size (not counting attributes) my @log :shared; # For debugging, incase a thread errors out my $dispatcher :shared; # Job queue my $queuelen; # Max queue length my $throttle :shared; # Queue semaphore my @thread; my $semStdout :shared; $|++; sub tprint{ my $str = shift; my $tid = threads->tid(); lock $semStdout; print "[$tid] $str"; } sub initThreadedOperation { $multi = shift; $queuelen = 1 * $multi; $dispatcher = Thread::Queue->new(); $throttle = Thread::Semaphore->new($queuelen); for( 1 .. $multi ) { push( @thread, threads->create( \&processrefsthread, $_ ) ); } print STDERR scalar(@thread), ' threads created', "\n"; } sub endThreadedOperation { $dispatcher->enqueue((undef) x scalar(@thread)); foreach (@thread) { $_->join(); } } sub processrefsthread { tprint 'Starting'; while (my $job = $dispatcher->dequeue()) { tprint "processing job $job"; $throttle->up(); # Job removed from queue my $d = int(rand(5)); for my $i (0 .. $d * 500000) { my $j = $i * $d; $j = int(rand($j)); } } tprint 'Ending'; } sub queueProcessRequest { my ($job) = @_; $dispatcher->enqueue($job); $throttle->down(); return undef } initThreadedOperation( 4 ); for my $i ( 1 .. 50 ) { tprint "Qing job $i"; queueProcessRequest( $i ); } endThreadedOperation();
That said, limiting your queue to 4 elements to feed 4 threads means that your threads are running in lock-step, which pretty much defeats the purpose of using a queue.
And using Thread::Semaphore as the mechanism for limiting the size of the queue is like hiring the Pinkerton's to manage the queue for the bathroom in the mornings. Not just overkill, but by forcing lock-step (synchronising your threads), throws away much of the gain from asynchronous processing.
Here's how I would write the same program:
#!/usr/bin/perl -lw use strict; use threads; use threads::shared; use threads::Q; my $semStdout :shared; $|++; sub tprint{ my $str = shift; my $tid = threads->tid(); lock $semStdout; print "[$tid] $str"; } sub processrefsthread { my $Q = shift; tprint 'Starting'; while( my $job = $Q->dq() ) { tprint "processing job $job"; my $d = int(rand(5)); for my $i (0 .. $d * 500000) { my $j = $i * $d; $j = int(rand($j)); } } tprint 'Ending'; } our $T //= 4; my $Q = threads::Q->new( $T * 4 ); ## Q self-limits to 4*$T elements my @threads = map threads->new( \&processrefsthread, $Q ), 1 .. $T; $Q->nq( $_ ) for 1 .. 50; $Q->nq( (undef) x $T ); $_->join for @threads;
You can get threads::Q (my own, efficient, self-limiting queue implementation) from here.
|
|---|
| Replies are listed 'Best First'. | |
|---|---|
|
Re^4: How can I force thread switching?
by ajl52 (Novice) on Sep 30, 2013 at 08:54 UTC | |
by BrowserUk (Patriarch) on Sep 30, 2013 at 10:44 UTC | |
by ajl52 (Novice) on Sep 30, 2013 at 12:36 UTC | |
by BrowserUk (Patriarch) on Sep 30, 2013 at 13:29 UTC |