Okay. You took my point about your jobs queue only being accessed by a single thread and reverted that to being just an array rather than a queue. You also took my point about the fan of 1-per-worker queues, and did away with them. But now, as you've done away with the latter, it is necessary for the workers to access the former, so you've made the jobs array shared. Of course, that means that you need to lock the shared array before removing jobs from it, so you've added the get_next_job() subroutine that takes care of the locking. And what you've ended up doing is incompletely re-implementing Thread::Queue, which is just a shared array that does locking.
What you could (should?) have done is just do away with the per-worker queues and retained the jobs queue. Then your workers get their jobs from that queue using dequeue in the normal way and get_next_job() is unnecessary.
In your thread subroutine you push the tid and zero each time around the work loop. And when the work loop ends (no more jobs), you push the tid and -1. And in the do_work() sub you push the tid and 1.
Why? What code is reading and acting upon these messages?
The only place where you read this queue is in the main() sub. After you've placed all the jobs in the array and started all the workers, you enter a loop that dequeues values from the idle queue. But, it dequeues one value at a time and labels it $tid, even though each time you enqueued a tid you also enqueued a magic number (-1,0 or 1). It then decides that if the number (referred to as $tid, but half the time, just one of the magic numbers), is less than 0, then it exits that loop.
Lets say you queue up 4 jobs, and start 2 workers, and each worker processes 2 jobs. Then the contents of the idle queue--ignoring anything being removed for a moment--will be something like (the ordering is non-deterministic):
tid. magic 1 0 Worker 1 first job in while loop 1 1 Worker 1 first job in do_work 2 0 Worker 2 first job in while loop 2 1 Worker 2 first job in do_work 1 0 Worker 1 second job in while loop 1 1 Worker 1 second job in do_work 2 0 Worker 2 second job in while loop 2 1 Worker 2 second job in do_work 1 -1 Worker 1 after exiting while loop 2 -1 Worker 2 after exiting while loop
But, as mentioned above, that ordering is non-determanistic, so could also be:
tid. magic 1 0 Worker 1 first job in while loop 1 1 Worker 1 first job in do_work 1 0 Worker 1 second job in while loop 1 1 Worker 1 second job in do_work 1 -1 Worker 1 after exiting while loop 2 0 Worker 2 first job in while loop 2 1 Worker 2 first job in do_work 2 0 Worker 2 second job in while loop 2 1 Worker 2 second job in do_work 2 -1 Worker 2 after exiting while loop
tid. magic 2 0 Worker 2 first job in while loop 2 1 Worker 2 first job in do_work 2 0 Worker 2 second job in while loop 2 1 Worker 2 second job in do_work 2 -1 Worker 2 after exiting while loop 1 0 Worker 1 first job in while loop 1 1 Worker 1 first job in do_work 1 0 Worker 1 second job in while loop 1 1 Worker 1 second job in do_work 1 -1 Worker 1 after exiting while loop
Or any of a dozen other variations. So what is it telling the main loop?
Remember, that the loop in main will terminate the first time it sees a value less than zero. And if you look at the above possibilities, that will never actually indicate that all the threads have completed. Which I what I think was your intent. Even with only two threads and the most optimistic ordering, there will always be at least two values that will never be removed from that queue. In the worst case scenario, which will happen far more often that statistics alone would suggest, the while loop in main() will terminate when all the workers except one are still processing many jobs.
What affect will the worse case scenario have upon your program?
Actually, nothing. Because as soon as you exit the while loop in main(), you call shutdown_engine(), and the first thing it does is loop over all the thread objects and waits for them to join. Ie end.
The net results is that the entire idle queue mechanism is not only totally broken, it is entirely redundant.
Or at least it would be if you weren't detaching your threads which makes the joins redundant. But that also means that those return values you set up and return from your workers are also redundant.
Why are you queueing all your messages to a separate thread in order to have them printed?
Your answer will be something to the effect that you need to serialise the output from multiple threads to the single resource--the screen or file attached to STDOUT. And that is true. But do you need to use a separate thread and queue to do this?
And the answer is no. It is a waste of resource and an additional level of redundant complexity. A simple print wrapper combined with a single shared variable to act as a mutex is all that is required. Eg;
my $semSTDOUT : shared; sub tprint{ lock $semSTDOUT; print @_; } tprint "$tid: Some text";
Simpler to write and maintain and far more flexible and convenient to use.
sub run_external_command { my ( $job, $tid ) = @_; $q_PRINT->enqueue( "Thread $tid is running exteral job $job" ); local( *w_IN, *w_OUT, *w_ERR ); my $pid = open3( \*w_IN, \*w_OUT, \*w_ERR, '/bin/sleep', int( rand +($DEFAULTS->{'random'} ) ) ); waitpid( $pid, 0 ); return $DEFAULTS->{'ret_success'}; }
Why are you using open3? Instead of say, just system which does everything your code currently does.
I appreciate that you were originally limiting the time the command ran for, which means you need a pid in order to use kill, but you can also get a pid from piped open, and that doesn't suffer the problems associated with open3. (Namely, that juggling two output streams either of which can block the process if they fill, is notoriously difficult to get right.)
is never used.
Why? If all you are going to do is terminate, why not just let that happen?
You set up this hash named %DEFAULTS containing a bunch of named values.
But, there is no mechanism for modifying them, so they aren't defaults, but rather constants. So why not use constant?
In addition, none of those return values is ever assigned anywhere, much less checked or acted upon.
So, here is how I would write a program to do what I believe you intend your program to do.
Stylistically, it is written in my (infinitely preferable and superior :) style, but essentially does everything yours attempts to do. But it does correctly and in a simple, clear, easily understood and maintainable way.
#! perl -slw use strict; use threads; use threads::shared; use Thread::Queue; use constant { RANDOM => 4, THREADS => 4, JOBS => 20, }; my $semSTD :shared; sub tprint { my $tid = threads->tid; lock $semSTD; print "[$tid] ", @_; } my $die_early :shared = 0; $SIG{ INT } = sub { tprint "Early termination requested"; $die_early = 1; }; sub worker { tprint "worker started"; my( $Q ) = @_; while( !$die_early and defined( my $job = $Q->dequeue ) ) { tprint "processing job:$job"; my $pid = open my $PIPE, '-|', "sleep " . int( rand RANDOM ) o +r die $!; tprint "waiting for pid: $pid"; waitpid $pid, 0; tprint "pid: $pid done"; } tprint "Worker ending"; return 1; } my $Q = new Thread::Queue; $Q->enqueue( map "JOB-$_", 1 .. JOBS ); $Q->enqueue( (undef) x THREADS ); tprint "Queue populated"; my @threads = map threads->new( \&worker, $Q ), 1 .. THREADS; tprint "Workers started; waiting..."; $_->join for @threads; print "Program complete"; __END__ C:\test>881217 [0] Queue populated [1] worker started [1] processing job:JOB-1 [1] waiting for pid: 1740 [2] worker started [2] processing job:JOB-2 [1] waiting for pid: 1056 [2] pid: 2684 done [2] processing job:JOB-4 [1] waiting for pid: 2116 [3] worker started [3] processing job:JOB-6 [3] waiting for pid: 3244 [0] Workers started; waiting... [4] worker started [4] processing job:JOB-7 ... [3] pid: 1112 done [3] processing job:JOB-20 [3] waiting for pid: 4048 [2] pid: 1728 done [1] pid: 3432 done [2] Worker ending [1] Worker ending [4] pid: 3924 done [4] Worker ending [3] pid: 4048 done [3] Worker ending Program complete
In reply to Re^5: RFC: Using 'threads' common aspects
by BrowserUk
in thread RFC: Using 'threads' common aspects
by DeadPoet
| For: | Use: | ||
| & | & | ||
| < | < | ||
| > | > | ||
| [ | [ | ||
| ] | ] |