in reply to Re^2: Problem in Inter Process Communication
in thread Problem in Inter Process Communication

Okay, I guess the question I have is, why do you need to fork before creating threads? Are you running this on win32? some nix flavor? If its win32, then the threads and actually forks, and using threads aren't really saving you much of anything

Note, in the code you've posted, you have a scope problem:
if ( $thr->tid && !threads::equal( $thr, th +reads->self ) + ) { ##########Not Sure ####### foreach my $client ( keys %sharedHash ) { print " client $client\n"; my $thrnew = thread->new( \&compare, $client ); $sema->up; }

$sema is not defined in this context, its local to the previous loop. there are numerous other bugs, but i'll leave that to you to find ;)

Again, I would refer you the the thread primer http://perlmonks.org/?node_id=691785 and the like http://perlmonks.org/?node_id=704138, but I've made a stab at doing what I _think_ you are looking to have done

#!/usr/bin/perl use strict; use threads; use threads::shared; use Time::HiRes qw (usleep); use POSIX; my @clientList = qw(client1 client2 client3 client4 client5 client6 client7 client8 c +lient9 client10 client11 client12); my @dmServers = qw(A B C); my @queryPath = qw(AA BB CC DD EE FF); my @shared; my $queryIndex = 0; my $maxnoofThreads = 3; my %sharedHash : shared; for ( my $i = 0 ; $i < @dmServers ; $i++ ) { my $cpid = fork(); die unless defined $cpid; if ( !$cpid ) { # This is the child # my $wait = int rand 10; # sleep $wait; callChild( $queryPath[ $queryIndex++ ], $dmServers[$i], $i ); #print "Child $$ exiting after seconds\n"; exit; } } # Just parent code, after this while ( ( my $cpid = wait() ) != -1 ) { #print "Waited for child $cpid\n"; #print " length of array shared",length(@shared); } #print "Parent Exiting\n"; sub callChild { my $query = shift; my $dmserver = shift; my $i = shift; my %shash; #print " query fired $query on dmserver $dmserver index $i\n"; my @clientList_temp = @clientList; for ( my $index = $i ; $index < @queryPath ; $index = $index + @dm +Servers ) { print "process $$ query at $index is ", $queryPath[$index], "\ +n"; #prepare all of the threads for ( my $a = 1 ; $a <= $maxnoofThreads ; $a++ ) { share $shash{$$}{$a}{'data'}; $shash{$$}{$a}{'data'} = undef; share $shash{$$}{$a}{'go'}; $shash{$$}{$a}{'go'} = 0; share $shash{$$}{$a}{'done'}; $shash{$$}{$a}{'done'} = 0; my @temp = splice( @clientList_temp, 0, ceil( @clientList / $maxnoofThreads ) ); $shash{$$}{$a}{'thread'} = threads->new( \&worker, $a, \@temp, $queryPath[$index], +$$ ); print "pid: $$ tid is: ", $shash{$$}{$a}{'thread'}->tid,"\ +n"; } #launch all of the threads for ( my $a = 1 ; $a <= $maxnoofThreads ; $a++ ) { print "launching pid:$$ tid: $a\n"; $shash{$$}{$a}{'go'} = 1; } #now wait on them to finish for ( my $a = 1 ; $a <= $maxnoofThreads ; $a++ ) { $shash{$$}{$a}{'thread'}->join; } } sub worker { my $thr_num = shift; my $myClientArrayRef = shift; my $query = shift; my $pid = shift; WORKER_LOOP: while (1) { #wait for $go_control if ( $shash{$pid}{$thr_num}{'go'} ) { #and $go) { for ( my $i=0 ; $i < @$myClientArrayRef ; $i++ ) { print " $$myClientArrayRef[$i] client \n"; $shash{$pid}{$thr_num}{'data'} = "$$myClientArrayR +ef[$i]"; sleep 1; shift @$myClientArrayRef; } $shash{$pid}{$a}{'done'} = 1; #print "done\n"; last WORKER_LOOP; } else { usleep 5000; #essential sleep or will flog cpu waiting + on while loop } # sleep until awakened } #end WORKER_LOOP return; } sub compare { my $client = shift; print " hash value ", $sharedHash{$client}, "\n"; } }
Note that the control structure of what I've done is _extremely_ sloppy and I would not reccommend it.

note that i don't quite understand why you need to thread your compares, but, I guess if that is also a time consuming operation, it could make sense

also note that I made your worker sub and compare sub local to the sub that is running in the fork. I did this for my own readability, as well as to remind myself that they are really running in the scope of the forked process and not the main program

In that vein, in my shared hash, I did not need the pid as a key in the hash, I just put it there for clarity, each forked process will be local unto itself, only copying the values, etc that are available at the time the process is forked (at least the way I understand the world

anyhow, hope this is helpful

Replies are listed 'Best First'.
Re^4: Problem in Inter Process Communication
by libvenus (Sexton) on Aug 21, 2008 at 06:29 UTC
    Hi JoeKamel Thanks a ton for your help !!!

    >>> why do you need to fork before creating threads ?

    Well the kind of model that i have thought off is that the application starts , based on the no of children i can fork( the app starter defines it based on the server load), i fork that many children. Each child notes the index of the query that has been passed to it by the parent and increments it with the total no of children spawned to come to its batch of next query.Each child now has to do the following tasks : pick the query + build ( replace client placeholders with real values) + fire to prod/test and then compare the results(time taking)

    So each child constructs 3 threads ( user defined) and divides the client array into 3 equal parts and distributes to each, along with the query that they must build(replace client placeholders with real values) and fire.Now each thread is looping on the total no of client s(1/3 of the total).In each loop it builds the query , fires it and stores the results in some Data structure and lets its main process know that it has got the result of one query+client , then supsends itself( if i do not suspend the thread all the results would slurp in the memory). The main process now knows that it has some result on which either it can work on or launch a new thread to work parallely and not block until the comparison finshes. In each comparison thread if some differences are found in prod and test results they are appended to a file and the memory deallocated.

    >>>>Are you running this on win32? some nix flavor?

    linux

    >>>>Note, in the code you've posted, you have a scope problem

    yeah i know , i was trying to do the suspend/resume via semaphore and i didn't succeed. :-((

    >>>>note that i don't quite understand why you need to thread your compares

    i guess i have answered that above

      Well the kind of model that i have thought off is that the application starts , based on the no of children i can fork( the app starter defines it based on the server load), i fork that many children.

      I have to concur that there is no benefit to mixing forks and threads in the way you have. If you want 9 threads to run, starts 9 threads in a single process rather than forking 3 times and starting 3 in each.

      I've spent a good while going over the code you've posted, and reading your descriptions of the application, but I still can't make sense of what you are actually trying to do. You've described how you think you should do something, but no real detail of either what you are doing, or why you think you should do it this way.

      For example: You start your query threads with a subsection of the work items. The your architecture calls for those threads to process one item then signal the main thread and suspend, whilst the main thread starts another thread to further process the results obtained. And, presumably, once the started thread finishes that further processing, it signals the main thread and dies, and the main thread signals the suspended thread to move onto the next work item.

      That's way too complicated and very wasteful of resources. You are using two threads to process each work item, but only one of them can actually run at any given time. And you are going to have to start a second thread (an expensive process) to finish processing each work item, whilst the thread that started processing that work item sits around idle. Not to mention all the complexities of the signalling.

      If would be far better to have the worker threads:

      1. pick one item off a shared queue;
      2. perform the query for that item;
      3. perform the comparison for that item.
      4. perform whatever outputting and clean up is required.
      5. Loop back to 1 and process the next work item.

      The basic pseudo code for the main thread is:

      1. Create a queue (Thread::Queue).
      2. Start N worker threads passing the queue handle. (Storing the thread handles.)
      3. Push the list of work items (clients) onto the queue.
      4. Push N x undef into the queue (to terminate the threads when there are no more work items.
      5. Call join() on the accumulated array of thread handles. (Thereby blocking until the workers are done).

      And basic pseudo-code for the worker threads is:

      sub worker { my( $Q ) = shift: while( my $workItem = $Q->dequeue ) { ## Perform query ## Perform comparison ## Perform output/cleanup } }

      No signalling, no locking, no forking, no user-explicit sharing, and completely scalable. The queue manages the entire process without any further effort.

      Just start with one worker thread until you sure that the processing logic is correct. Then increase the number slowly until you see no further improvement in the throughput. The processing of each item is completely linear, but multiple work items are processed concurrently. Very low complexity, no timing issues or deadlock possibilities.

      The only additional complexity I foresee, reading between the lines of your various posts, is that if you are outputting your results to a single file, then you would need to employ a mutex to prevent the output from the worker threads getting interleaved. But that involves just a single shared variable and a simple lock:

      ## in the main thread: my $outputMutex : shared; ... open OUTFILE, '>', ... ## In the worker threads ... { lock $outputMutex; print OUTFILE ... }

      I seriously urge you to consider what benefits you think you will get from mixing forks and threads? Actually, on the basis of the information available so far, you could probably write your application to use either, but mixing the two is completely unnecessary as far as I can tell.

      Likewise, what benefit is there in suspending one thread and starting another to finish the processing of single work item? Especially in the light of the cost of starting and discarding use-once threads, and the complexities of the signalling it requires.


      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.

        Thanks for your inputs...

        Well i still haven't finalized the strategy yet... i just wanted some inputs .. in my case there are thousands of queries and numerous clients. Each query <CLIENT> placeholder which must be replaced at runtime and then fired and outputs compared . I just wanted to separate the query firing and the comparision processes.

        Also in the model that u have proposed how do i add support for handling multiple queries,i can use the same strategy as i had used before...

        use strict; use threads; use threads::shared; use Thread::Queue; my $maxnoofThreads = 1; my @clientList = qw(client1 client2 client3 client4 client5 client6 + client7 client8 client9 client10 client11 client12); my %hash; my $q = new Thread::Queue; $q->enqueue(@clientList); for(my $i=0;$i<= $maxnoofThreads; $i++) { $hash{$i} = threads->new( \&worker, $q,$i); } $q->enqueue("undef"); foreach my $thr (values %hash) { # Don't join the main thread or ourselves #if ($thr->tid && !threads::equal($thr, threads->self)) # { $thr->join; # } } sub worker { my( $Q ) = shift; my $i = shift; while( my $workItem = $Q->dequeue ) { return unless($workItem); print " workitem from thread $i -->$workItem\n"; ## Perform query ## Perform comparison ## Perform output/cleanup } }