Re: wait for threads to end in parallel
by BrowserUk (Patriarch) on Mar 16, 2010 at 10:34 UTC
|
Opening 1 of 4 Thread 1 starts processing file 1
Opening 2 of 4 thread 2 starts processing file 2
Closing 1 of 4 Thread 1 finished processing file 1 and ends
Opening 3 of 4 Thread 3 starts processing file 3; thread 2
+ is still processing file 2
Closing 3 of 4 thread 3 finishes processing file 3 and ends; thread 2
+ is still processing file 2
Opening 4 of 4 Thread 4 starts processing file 4; thread 2
+ is still processing file 2
Closing 2 of 4 Thread 2 finishes processing file 2 and ends.
Closing 4 of 4 Thread 4 finishes processing file 4 and ends.
That all looks pretty much as might be expected to me. What makes you think something is wrong?
The only problem I see is that you have no way to exit the loop when all files have been processed.
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.
| [reply] [d/l] |
Re: wait for threads to end in parallel
by ikegami (Patriarch) on Mar 16, 2010 at 13:48 UTC
|
use threads;
use threads::shared;
use Thread::Queue;
my @file_list = @ARGV;
my $num_workers = 2;
my %MSG :shared;
sub process {
my ($file) = @_;
print "Opening $file\n";
if(some condition){
lock(%MSG);
#write stuff to hash
}
print "Closing $file\n";
}
my $q = Thread::Queue->new(
@file_list,
(undef)x$num_workers,
);
for (1..$num_workers) {
async {
while (defined(my $file = $q->dequeue())) {
process($file);
}
}
}
$_->join() for threads::list();
Or since you never add anything to @file_list,
use threads;
use threads::shared;
use Thread::Queue;
my @file_list = @ARGV;
my $num_workers = 2;
my %MSG :shared;
sub process {
my ($file) = @_;
print "Opening $file\n";
if(some condition){
lock(%MSG);
#write stuff to hash
}
print "Closing $file\n";
}
my $q = Thread::Queue->new(@file_list);
for (1..$num_workers) {
async {
while (defined(my $file = $q->dequeue_nb())) {
process($file);
}
}
}
$_->join() for threads::list();
( Oops, way too slow at posting this ) | [reply] [d/l] [select] |
Re: wait for threads to end in parallel
by 7stud (Deacon) on Mar 16, 2010 at 11:31 UTC
|
It seems to me that there should be a problem--thr1 could be sitting their idle while thr0->join is blocking--but I don't know how you can see that problem in the output.
If it will work for you, you don't need to create new threads. You can just use the same two threads over and over:
use strict;
use warnings;
use 5.010;
use threads;
use threads::shared;
use Thread::Queue;
my $q : shared;
$q = Thread::Queue->new; #a Queue is thread safe, which means
#threads can read from it without
#interfering with each other.
my @files = (
'file1',
'file2',
'file3',
'file4',
'file5',
);
my $thread_count = 2;
my @threads;
for (1 .. $thread_count) {
my $thread_name = "thread$_";
push @threads, threads->create(\&do_stuff, $thread_name);
}
$q->enqueue(@files); #...and the starting gun sounds!
for (1 .. $thread_count) {
$q->enqueue("END_OF_QUEUE");
}
for my $thr (@threads) {
$thr->join();
}
sub do_stuff {
my $thr_name = shift;
while ( (my $file = $q->dequeue) ne "END_OF_QUEUE" ) {
#$q->dequeue() blocks until there is something to retrieve
#from the queue.
say "$thr_name is opening file: $file";
#doing some work:
sleep int(rand 4);
say "$thr_name is is done with: $file";
}
}
--output:--
thread1 is opening file: file1
thread2 is opening file: file2
thread2 is is done with: file2
thread2 is opening file: file3
thread1 is is done with: file1
thread1 is opening file: file4
thread2 is is done with: file3
thread2 is opening file: file5
thread2 is is done with: file5
thread1 is is done with: file4
| [reply] [d/l] |
|
|
| [reply] [d/l] [select] |
|
|
Ah, I see. I read is_joinable() in the code, but I didn't understand its significance. So the while loop polls each thread to discover when a thread is finished processing a file.
With some slight modifications of the op's code, the output shows that the threads do not appear to be waiting for each other (the code does get stuck in an infinite loop when the threads are done processing the files):
use strict;
use warnings;
use 5.010;
use threads;
use threads::shared;
my @file_list = (
'file1',
'file2',
'file3',
'file4',
'file5',
);
my $nofiles = @file_list; #number of files
my $currfile = 1; #current number of file to process
my %MSG : shared; #shared hash
my $thr0 = threads->new(\&process, shift(@file_list), 'thr0');
$currfile++;
my $thr1 = threads->new(\&process, shift(@file_list), 'thr1');
$currfile++;
while(1) {
if ($thr0->is_joinable()) {
$thr0->join;
#check if there are files left to process
if($currfile <= $nofiles){
$thr0 = threads->new(\&process, shift(@file_list), 'thr0');
$currfile++;
}
}
if ($thr1->is_joinable()) {
$thr1->join;
#check if there are files left to process
if($currfile <= $nofiles){
$thr1 = threads->new(\&process, shift(@file_list), 'thr1');
$currfile++;
}
}
}
sub process{
my($file, $thr_name) = @_;
print "$thr_name opening $currfile of $nofiles\n";
#do some stuff
sleep int(rand 5);
print "$thr_name done with $currfile of $nofiles\n";
}
--output:--
thr0 opening 1 of 5
thr1 opening 2 of 5
thr1 done with 2 of 5
thr1 opening 3 of 5
thr1 done with 3 of 5
thr1 opening 4 of 5
thr0 done with 1 of 5
thr0 opening 5 of 5
thr1 done with 4 of 5
thr0 done with 5 of 5
Ugh. That is really icky code. | [reply] [d/l] |
Re: wait for threads to end in parallel
by pmarcoen (Novice) on Mar 16, 2010 at 12:54 UTC
|
I ended up having each thread picking files of the array untill it's empty instead of creating a new thread for every file.
A solution suggested by a fellow monk here
Thanks anyway ! | [reply] |
|
|
use threads;
use threads::shared;
my @file_list :shared = @ARGV;
my $num_workers = 2;
my %MSG :shared;
sub process {
my ($file) = @_;
print "Opening $file\n";
if(some condition){
lock(%MSG);
#write stuff to hash
}
print "Closing $file\n";
}
for (1..$num_workers) {
async {
for (;;) {
my $file = do {
lock @file_list;
return if !@file_list;
shift(@file_list)
};
process($file);
}
}
}
$_->join() for threads::list();
This will work as long as you never add anything to the file list once the threads have started. (If you do, you might have worker threads exiting prematurely.)
Thread::Queue does the same, but hides a couple of the details. | [reply] [d/l] |
Re: wait for threads to end in parallel
by 7stud (Deacon) on Mar 16, 2010 at 13:45 UTC
|
| [reply] |
|
|
Hi 7stud, it isn't polite to curse; besides, syphilis is curable with penicillin.
| [reply] |
Re: wait for threads to end in parallel
by 7stud (Deacon) on Mar 16, 2010 at 14:17 UTC
|
| [reply] [d/l] |
|
|
async(\&process);
async { process(); };
thread->create(\&process);
With args:
async { process($arg); };
thread->create(\&process, $arg); # Safe??
| [reply] [d/l] [select] |
|
|
# 'async' is a function alias for the 'threads->create()' method
sub async (&;@)
{
unshift(@_, 'threads');
# Use "goto" trick to avoid pad problems from 5.8.1 (fixed in 5.8.
+2)
goto &create;
}
So async \&sub, $arg, 1, 'fred'; or even async{ ... } $arg, 1, 'fred';
are identical in effect to threads->create( \&sub, $arg, 1, 'fred' );
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.
| [reply] [d/l] [select] |
|
|
|
|
|