imrags has asked for the wisdom of the Perl Monks concerning the following question:

I've a code...i want it to fork (i'm just trying threads/semaphore) 2 threads at a time...
The code i have, spawns 2 threads only first time...i want it to happen in a loop...
Your wisdom is appreciated!
#! use strict; use warnings; use threads qw(yield); use Thread::Semaphore; my @array = qw(a b c d e f g h); my @children; my @results; my $semaphore = Thread::Semaphore->new(2); for my $i (1..5) { for my $j(1..2) { threads->create(sub{\&sub_fork(threads->tid(),@array)}); } sleep(1); threads->yield(); } sub sub_fork { my ($tid,@temp) = (shift,@_); $semaphore->down(1); my $pid = fork(); if ($pid) { push (@children, $pid); print "$tid -- Running another child process - $pid.\n"; } else { print "@array\n"; } $semaphore->up(1); # sleep (2); }
Raghu

Replies are listed 'Best First'.
Re: Perl thread/semaphore
by poolpi (Hermit) on Jan 22, 2009 at 09:28 UTC

    From perlthrtut

    Thinking of mixing fork() and threads? Please lie down and wait until the feeling passes. Be aware that the semantics of fork() vary between platforms. For example, some UNIX systems copy all the current threads into the child process, while others only copy the thread that called fork(). You have been warned!

    ;)


    hth,
    PooLpi

    'Ebry haffa hoe hab im tik a bush'. Jamaican proverb
Re: Perl thread/semaphore
by gone2015 (Deacon) on Jan 22, 2009 at 11:47 UTC

    As brother poolpi has indicated, your code is an ugly mish mosh of threads and process forking...

    ...assuming mean threads then the code can be reduced somewhat:

    use strict ; use warnings ; use threads ; my @array = qw(a b c d e f g h); my @children; my @results; for (1..5) { for (1..2) { push @children, threads->create(\&sub_thread, @array) ; } ; while (@children) { my $child = shift(@children) ; push @results, $child->join() ; } ; } ; sub sub_thread { my (@temp) = (@_) ; my $tid = threads->tid() ; print "child $tid: @temp\n" ; sleep (2) ; return 'whatever' ; } ;
    where:

    • no semaphore is required. The main thread creates two child threads (note that there's no need to create an anonymous subroutine to do that), and then waits for them both to return (using $child->join()).

      This example shows the thread returning a single scalar. It is important to understand that when a thread is started it receives a copy of the parent's state (including all variables) -- and once started, it cannot change the parent's state. The thread can return stuff to the parent as it terminates, but if you want any other communication between thread and parent (or other thread) you need shared variables or queues or semaphores or ...

      Note especially that the context in which the threads->create() is called is implicity the context for the return from the thread -- unless you use the explicit context setting form. In this case the push actually provides a list context... even though it is returning a single value. (This works, but you may wish to be less sloppy !) Beware, however, of calling threads->create() in void context !

    • the thread subroutine does not need to fork. Note that the parameters passed to the thread subroutine are the parameters passed to threads->create() less the subroutine reference parameter (the first parameter).

    • no sleeping or yielding is required in the parent, the $child->join() will block the parent until the next child returns.

      Note that the way this is done means that the parent will wait for the children to complete (a) indefinitely, and (b) in the order they were created. If the second child finishes first, the parent will still wait for the first to finish.

    If you want to have two threads on the go at the same time at all times, then you need a mechanism for each thread to signal to the parent that it has finished. One way of doing that is a queue. For example:

    use strict ; use warnings ; use threads ; use Thread::Queue ; my @array = qw(a b c d e f g h); my @results; my $finished_queue = new Thread::Queue ; my $c = 2 ; # Number active at once my $n = 10 ; # Total number to dispatch my $m = $n ; # Number to collect while ($m) { while ($n && $c) { my $child = threads->create(\&sub_thread, @array) ; $n-- ; $c-- ; } ; my $tid = $finished_queue->dequeue() ; my $child = threads->object($tid) ; push @results, $child->join() ; $m-- ; $c++ ; } ; sub sub_thread { my (@temp) = (@_) ; my $tid = threads->tid() ; print "child $tid: @temp\n" ; sleep(rand(2)+2) ; $finished_queue->enqueue($tid) ; print "child $tid: finished\n" ; return "whatever from child $tid" ; } ;
    noting: (a) that the order of the results is not necessarily the same as the order of dispatch (which requires some extra information related to the $tid); and (b) that even though it doesn't need the $child while it is creating each one, it does need to set scalar context !

    Finally, the other way to run two threads is to start two and pass "parameters" and results back and forth using two queues... along the lines of:

    use strict ; use warnings ; use threads ; use Thread::Queue ; my @array = qw(a b c d e f g h); my @results; my $param_queue = new Thread::Queue ; my $result_queue = new Thread::Queue ; my $c = 2 ; # concurrent threads my $n = 10 ; # amount of work my @children ; for (1..$c) { # Start the threads push @children, threads->create(\&sub_thread) ; } ; for (1..$n) { # Dispatch the work $param_queue->enqueue([@array]) ; } ; for (1..$n) { # Collect the results push @results, $result_queue->dequeue() ; } ; $param_queue->enqueue((undef) x $c) ; # Signal the threads to stop foreach (@children) { # Collect the terminated threads $_->join() ; } ; sub sub_thread { my $task = 0 ; my $tid = threads->tid() ; print "child $tid: started" ; while (my $p = $param_queue->dequeue()) { $task++ ; my @temp = @$p ; print "child $tid/$task: @temp\n" ; sleep(rand(2)+2) ; $result_queue->enqueue("whatever from child $tid/$task") ; print "child $tid/$task: done\n" ; } ; print "child $tid: terminated\n" ; } ;
    which I hope is reasonably self explanatory.

      Dear oshalla, thank you for your codes, it really helped me to grasp several ideas about threading in Perl. My only addition would be that's in your 1st and 2nd examples you clone the arguments (@array) in the first two cycles, meaning you're creating two threads with the same arguments: for (1..2) - in the 1st example and while ($c) - where $c=2 - in the 2nd example To fix that I used a new variable to count dispatched threads.
      #!/usr/bin/perl use strict ; use warnings ; use threads ; use Thread::Queue ; my @array = qw(a b c d e f g h); my @results; my $finished_queue = new Thread::Queue ; my $c = 5 ; # Number active at once my $n = 10 ; # Total number to dispatch my $m = $n ; # Number to collect my $d = 0; #count created threads while ($n) { while ($m) { $d++; my $child = threads->create(\&sub_thread, @array) ; my $tid;my $childa; if ($d == $c) { $tid = $finished_queue->dequeue() ; $childa = threads->object($tid) ; push @results, $childa->join() ; $d--; $n--; } $m-- ; } if (!$m) { my $tid = $finished_queue->dequeue() ; my $childa = threads->object($tid) ; push @results, $childa->join() ; $n--; } } sub sub_thread { my (@temp) = (@_) ; my $tid = threads->tid() ; print "child $tid: @temp\n" ; sleep(rand(2)+2) ; $finished_queue->enqueue($tid) ; print "child $tid: finished\n" ; return "whatever from child $tid" ; } ; exit;
      It's not elegant, but it works and all I needed was a quick fix. P.S. It's an old thread, but it was the most useful for me just now, so I'd rather share the solution in this old thread.
Re: Perl thread/semaphore
by weismat (Friar) on Jan 22, 2009 at 08:54 UTC
    You may use the same values or the default values for constructor, up and down operations...
      That is to control number of threads...
      in this, max number of threads can be 2 and not more than 2...at least that is what i want to achieve..
      Logic is thus:
      Initially a thread goes to the subroutine, decrements semaphore (which = 2 earlier), another thread also decrements it (so sem = 0)
      so no more threads can access it...
      that's why i used the sem down<->up ...
      Raghu