pmarcoen has asked for the wisdom of the Perl Monks concerning the following question:

Dear Perl Gods, I have a perl script that launches 2 threads,one for each processor, I need it to wait for a thread to end, if one thread ends a new one is spawned. It seems that the join method blocks the rest of the program, therefore the second thread can't end untill everything the first thread does is done which sort of defeats its purpose .. I tried the is_joinable method but that doesn't seem to do it either. Here is some of my code :
use threads; use threads::shared; @file_list = @ARGV; #Our file list $nofiles = $#file_list + 1; #Real number of files $currfile = 1; #Current number of file to process my %MSG : shared; #shared hash $thr0 = threads->new(\&process, shift(@file_list)); $currfile++; $thr1 = threads->new(\&process, shift(@file_list)); $currfile++; while(1){ if ($thr0->is_joinable()) { $thr0->join; #check if there are files left to process if($currfile <= $nofiles){ $thr0 = threads->new(\&process, shift(@file_list)); $currfile++; } } if ($thr1->is_joinable()) { $thr1->join; #check if there are files left to process if($currfile <= $nofiles){ $thr1 = threads->new(\&process, shift(@file_list)); $currfile++; } } } sub process{ print "Opening $currfile of $nofiles\n"; #do some stuff if(some condition){ lock(%MSG); #write stuff to hash } print "Closing $currfile of $nofiles\n"; }
The output of this is :
Opening 1 of 4 Opening 2 of 4 Closing 1 of 4 Opening 3 of 4 Closing 3 of 4 Opening 4 of 4 Closing 2 of 4 Closing 4 of 4

Replies are listed 'Best First'.
Re: wait for threads to end in parallel
by BrowserUk (Patriarch) on Mar 16, 2010 at 10:34 UTC

    I think you're misinterpreting the output from your program.

    Opening 1 of 4 Thread 1 starts processing file 1 Opening 2 of 4 thread 2 starts processing file 2 Closing 1 of 4 Thread 1 finished processing file 1 and ends Opening 3 of 4 Thread 3 starts processing file 3; thread 2 + is still processing file 2 Closing 3 of 4 thread 3 finishes processing file 3 and ends; thread 2 + is still processing file 2 Opening 4 of 4 Thread 4 starts processing file 4; thread 2 + is still processing file 2 Closing 2 of 4 Thread 2 finishes processing file 2 and ends. Closing 4 of 4 Thread 4 finishes processing file 4 and ends.

    That all looks pretty much as might be expected to me. What makes you think something is wrong?

    The only problem I see is that you have no way to exit the loop when all files have been processed.


    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.
Re: wait for threads to end in parallel
by ikegami (Patriarch) on Mar 16, 2010 at 13:48 UTC

    There's no need to create a new thread for each file.

    use threads; use threads::shared; use Thread::Queue; my @file_list = @ARGV; my $num_workers = 2; my %MSG :shared; sub process { my ($file) = @_; print "Opening $file\n"; if(some condition){ lock(%MSG); #write stuff to hash } print "Closing $file\n"; } my $q = Thread::Queue->new( @file_list, (undef)x$num_workers, ); for (1..$num_workers) { async { while (defined(my $file = $q->dequeue())) { process($file); } } } $_->join() for threads::list();

    Or since you never add anything to @file_list,

    use threads; use threads::shared; use Thread::Queue; my @file_list = @ARGV; my $num_workers = 2; my %MSG :shared; sub process { my ($file) = @_; print "Opening $file\n"; if(some condition){ lock(%MSG); #write stuff to hash } print "Closing $file\n"; } my $q = Thread::Queue->new(@file_list); for (1..$num_workers) { async { while (defined(my $file = $q->dequeue_nb())) { process($file); } } } $_->join() for threads::list();

    ( Oops, way too slow at posting this )

Re: wait for threads to end in parallel
by 7stud (Deacon) on Mar 16, 2010 at 11:31 UTC

    It seems to me that there should be a problem--thr1 could be sitting their idle while thr0->join is blocking--but I don't know how you can see that problem in the output.

    If it will work for you, you don't need to create new threads. You can just use the same two threads over and over:

    use strict; use warnings; use 5.010; use threads; use threads::shared; use Thread::Queue; my $q : shared; $q = Thread::Queue->new; #a Queue is thread safe, which means #threads can read from it without #interfering with each other. my @files = ( 'file1', 'file2', 'file3', 'file4', 'file5', ); my $thread_count = 2; my @threads; for (1 .. $thread_count) { my $thread_name = "thread$_"; push @threads, threads->create(\&do_stuff, $thread_name); } $q->enqueue(@files); #...and the starting gun sounds! for (1 .. $thread_count) { $q->enqueue("END_OF_QUEUE"); } for my $thr (@threads) { $thr->join(); } sub do_stuff { my $thr_name = shift; while ( (my $file = $q->dequeue) ne "END_OF_QUEUE" ) { #$q->dequeue() blocks until there is something to retrieve #from the queue. say "$thr_name is opening file: $file"; #doing some work: sleep int(rand 4); say "$thr_name is is done with: $file"; } } --output:-- thread1 is opening file: file1 thread2 is opening file: file2 thread2 is is done with: file2 thread2 is opening file: file3 thread1 is is done with: file1 thread1 is opening file: file4 thread2 is is done with: file3 thread2 is opening file: file5 thread2 is is done with: file5 thread1 is is done with: file4
      It seems to me that there should be a problem--thr1 could be sitting their idle while thr0->join is blocking

      Why would thr0->join block, when he has tested thr0->is_joinable?

      $thr->is_joinable() Returns true if the thread has finished running, is not detached and has not yet been joined. In other words, the thread is ready to be joined, and a call to $thr->join() will not block.


      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.

        Ah, I see. I read is_joinable() in the code, but I didn't understand its significance. So the while loop polls each thread to discover when a thread is finished processing a file.

        With some slight modifications of the op's code, the output shows that the threads do not appear to be waiting for each other (the code does get stuck in an infinite loop when the threads are done processing the files):

        use strict; use warnings; use 5.010; use threads; use threads::shared; my @file_list = ( 'file1', 'file2', 'file3', 'file4', 'file5', ); my $nofiles = @file_list; #number of files my $currfile = 1; #current number of file to process my %MSG : shared; #shared hash my $thr0 = threads->new(\&process, shift(@file_list), 'thr0'); $currfile++; my $thr1 = threads->new(\&process, shift(@file_list), 'thr1'); $currfile++; while(1) { if ($thr0->is_joinable()) { $thr0->join; #check if there are files left to process if($currfile <= $nofiles){ $thr0 = threads->new(\&process, shift(@file_list), 'thr0'); $currfile++; } } if ($thr1->is_joinable()) { $thr1->join; #check if there are files left to process if($currfile <= $nofiles){ $thr1 = threads->new(\&process, shift(@file_list), 'thr1'); $currfile++; } } } sub process{ my($file, $thr_name) = @_; print "$thr_name opening $currfile of $nofiles\n"; #do some stuff sleep int(rand 5); print "$thr_name done with $currfile of $nofiles\n"; } --output:-- thr0 opening 1 of 5 thr1 opening 2 of 5 thr1 done with 2 of 5 thr1 opening 3 of 5 thr1 done with 3 of 5 thr1 opening 4 of 5 thr0 done with 1 of 5 thr0 opening 5 of 5 thr1 done with 4 of 5 thr0 done with 5 of 5

        Ugh. That is really icky code.

Re: wait for threads to end in parallel
by pmarcoen (Novice) on Mar 16, 2010 at 12:54 UTC
    I ended up having each thread picking files of the array untill it's empty instead of creating a new thread for every file.

    A solution suggested by a fellow monk here

    Thanks anyway !
      Ah, like this?
      use threads; use threads::shared; my @file_list :shared = @ARGV; my $num_workers = 2; my %MSG :shared; sub process { my ($file) = @_; print "Opening $file\n"; if(some condition){ lock(%MSG); #write stuff to hash } print "Closing $file\n"; } for (1..$num_workers) { async { for (;;) { my $file = do { lock @file_list; return if !@file_list; shift(@file_list) }; process($file); } } } $_->join() for threads::list();

      This will work as long as you never add anything to the file list once the threads have started. (If you do, you might have worker threads exiting prematurely.)

      Thread::Queue does the same, but hides a couple of the details.

Re: wait for threads to end in parallel
by 7stud (Deacon) on Mar 16, 2010 at 13:45 UTC

    Ah. So you cross posted. And then you ignored the posts here. Pox on you.

      Hi 7stud, it isn't polite to curse; besides, syphilis is curable with penicillin.
Re: wait for threads to end in parallel
by 7stud (Deacon) on Mar 16, 2010 at 14:17 UTC

    What about:

    You'll also note that there's a semicolon after the closing brace. That's because async() treats the following block as an anonymous subroutine, so the semicolon is necessary.

    and:

    threads->list() returns a list of thread objects, one for each thread that's currently running and not detached. Handy for a number of things, including cleaning up at the end of your program (from the main Perl thread, of course):
    # Loop through all the threads foreach my $thr (threads->list()) { $thr->join(); }

    Is there any difference between threads->create() and async {...}?

      Syntax. async is shorter, but can only pass args via capture. Then again, passing args via create is known to be problematic.

      Without args:

      async(\&process); async { process(); }; thread->create(\&process);

      With args:

      async { process($arg); }; thread->create(\&process, $arg); # Safe??
        Syntax. async is shorter, but can only pass args via capture.

        Not so. async is identical to threads->create(), except that it is invoked as a function rather than a class method:

        # 'async' is a function alias for the 'threads->create()' method sub async (&;@) { unshift(@_, 'threads'); # Use "goto" trick to avoid pad problems from 5.8.1 (fixed in 5.8. +2) goto &create; }

        So async \&sub, $arg, 1, 'fred'; or even async{ ... } $arg, 1, 'fred';

        are identical in effect to threads->create( \&sub, $arg, 1, 'fred' );


        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.