in reply to Re: threads on Windows
in thread threads on Windows

First, thank you for your detailed response.

As you have noted, this code is extremely impractical. It represents the smallest version of a more complex but impractical code that repeated the issue, which is why I posted it. In addition to being a generic learning exercise, my ultimate goal is the design of a main processing engine with worker threads that respond to pause/resume/kill requests. The more complex version follows:

#!/usr/bin/perl use strict; use warnings; use threads; use Thread::Queue; use Thread::Semaphore; # Create terminal watcher print "Create terminal watcher...\n"; my $Q_stdin = Thread::Queue->new; my $T_input = async { $Q_stdin->enqueue( $_ ) while defined( $_ = <S +TDIN> ); }->detach; # Create tracking arrays close STDIN; my $Q_found = Thread::Queue->new; my %workers = (); my %sema = (); my $cmd; #my $i = 0; print "Awaiting commands...\n"; MAIN_LOOP: while (not defined $cmd or $cmd !~ /^q/i) { # Process commands #print ++$i, "\n"; $cmd = $Q_stdin->dequeue_nb; if (defined $cmd) { chomp $cmd; if ($cmd =~ /^q/i) { # Quit print "Resolving open threads\n"; } elsif ($cmd =~ /^w/i) { # Work my (undef, $thread) = split /\s+/, $cmd; if ($thread =~ /\D/) { print "Noninteger thread $thread in $cmd\n"; } elsif (defined ($workers{$thread}) and $workers{$thread} +) { print "Value $thread already a worker\n"; } else { $sema{$thread} = Thread::Semaphore->new(1);; $workers{$thread} = threads->new(\&worker, $thread, $Q_found, $s +ema{$thread} ); } } elsif ($cmd =~ /^p/i) { # Pause my (undef, $thread) = split /\s+/, $cmd; if (defined ($workers{$thread}) and $workers{$thread}) { if (${$sema{$thread}}) { $sema{$thread}->down(); $workers{$thread}->kill('STOP'); print "Thread $thread paused\n"; } else { print "Thread $thread already paused\n"; } } else { print "Unrecognized thread $thread in $cmd\n"; } } elsif ($cmd =~ /^r/i) { #Resume my (undef, $thread) = split /\s+/, $cmd; if (defined ($workers{$thread}) and $workers{$thread}) { if (${$sema{$thread}}) { print "Thread $thread already running\n"; } else { $sema{$thread}->up(); print "Thread $thread resumed\n"; } } else { print "Unrecognized thread $thread in $cmd\n"; } } elsif ($cmd =~ /^s/i) { # Status my (undef, $thread) = split /\s+/, $cmd; if (defined ($workers{$thread}) and $workers{$thread}) { $workers{$thread}->kill('FPE'); } else { print "Unrecognized thread $thread in $cmd\n"; } } elsif ($cmd =~ /^k/i) { # Kill my (undef, $thread) = split /\s+/, $cmd; if (defined ($workers{$thread})) { if ($workers{$thread}) { $workers{$thread}->kill('KILL')->detach; print "Thread $thread killed\n"; $workers{$thread} = 0; } else { print "Thread $thread already dead\n"; } } else { print "Unrecognized thread $thread in $cmd\n"; } } else { print "Unknown/misformatted command $cmd\n"; } } # Print announcements while (defined(my $output = $Q_found->dequeue_nb)) { print $output, "\n"; } sleep(1); # Reduce load } # We're quitting - kill remaining threads for my $thread (grep $workers{$_}, keys %workers) { $workers{$thread}->kill('KILL')->detach; } #--------------------------------------------------------------------- +---------- sub worker { my ($thread, $queue, $semafore) = @_; my $cooldown = 0; # Thread signal handlers $SIG{KILL} = sub { threads->exit(); }; # Kill $SIG{STOP} = sub { $semafore->down(); $semafore->up(); }; # Paus +e $SIG{FPE} = sub { $queue->enqueue("Thread $thread has cooldown $co +oldown."); }; # Report $queue->enqueue("Thread name: $thread"); my %nodes = (); while (1) { $queue->enqueue("$thread step $cooldown"); sleep(5 * int(exp(0.1 * $cooldown++))); } }
.

It mostly comes down to me being unwilling to relinquish control over the scheduling a user interface. Ultimately, I plan on replacing the CLI with a GUI thread, though this is probably all an abuse of queues. I also note I am abusing the FPE signal, which came down to random selection of a signal on both my Linux and Windows boxes that Windows did not treat as fatal.

Replies are listed 'Best First'.
Re^3: threads on Windows
by BrowserUk (Patriarch) on Feb 13, 2009 at 04:09 UTC

    I do not understand how you arrived at this architecture.

    Why start a thread to accept the commands from STDIN and then post them to a queue, and then have your main thread read that queue and action them?

    The only reason I can see for doing this is so that your main thread/loop can make busy work polling two queues, in non-blocking mode, and sleeping.

    Ignoring that the second queues only purpose seems to be to print messages from the workers to the console (which could just as easily have been directed there from within the worker threads), the actions essentially consist entirely of sending signals, so why not perform those actions in the STDIN thread?

    I spent ages thinking about your statement of intent to move to using a gui, and whether this conflation of loops was some attempt to bring all the interaction, input and output, together into a single thread in preparation for that. But that doesn't make any sense either. You aren't going to use a command line for input and a gui for output (are you!?), and if the interaction is all going to be via the gui, whether button or keyboard based, it's all going to be event driven. So conflating the two now, would just need to be undone when you make the switch.

    Anyway, on the basis of what you've posted, here's how I would do what you are attempting. It all works, though using Thread::Sempaphore and those dratted per thread signals together is awfully clumsy, given that a signal won't even awake a sleep, never mind a semaphore wait.

    This code is somewhat stylistically challenged, because it was a case of tacking things in here and there to see what works. I might have tried to rewrite it a little, but given the total disparity between our coding styles, you would have just rewritten it yourself anyway--so you might as well expend the effort :)

    The sample run at the bottom shows that all the commands: (w)orker; (s)tatus; (p)ause/resume; (k)ill; and (q)uit all work. To some degree at least:

    #! perl -slw use strict; use threads; use Thread::Queue; use Thread::Semaphore; my $Q = new Thread::Queue; my $boss = async { my %workers; my %sems; while( <STDIN> ) { ( local( $_ ), my $name ) = m[([a-zA-Z])\s*(\d+)?] or next; warn "Non-integer thread '$name'\n" and next if defined $name and $name =~m[\D]; if( m[^w]i ) { warn "You must give a name for the worker\n" and next unless defined $name; warn "Worker $name already exists\n" and next if exists $workers{ $name }; $sems{ $name } = new Thread::Semaphore; $workers{ $name } = threads->create( \&worker, $name, $Q, $sems{ $name } ); next; } warn "Worker $name does not exist\n" and next unless m[^q] or exists $workers{ $name }; m[^k] and $sems{ $name }->up, ## Ensure the worker can respon +d delete( $workers{ $name } )->kill( 'KILL' )->join and next; if( m[^p] ) { ${ $sems{ $name } } ? $sems{ $name }->down : $sems{ $name }->up; $workers{ $name }->kill( 'STOP' ); next; } m[^s] and $sems{ $name }->up, ## Ensure a paused thread is able to r +espond $workers{ $name }->kill( 'FPE' ), $sems{ $name }->down ## and set it back and next; m[^q] or next; for( keys %workers ) { $sems{ $_ }->up; $workers{ $_ }->kill( 'KILL' )->join; } $Q->enqueue( undef ); ## Terminate main loop last; } }; print ">$_" while defined( $_ = $Q->dequeue ); $boss->join; exit; sub worker { my( $name, $queue, $sem ) = @_; my $cooldown = 0; my $state = 0; $SIG{KILL} = sub { warn "Worker $name dying\n"; threads->exit(); }; $SIG{STOP} = sub { printf STDERR "Worker $name is %s\n", ( $state ^= 1 ) ? 'paused' : 'resumed'; $sem->down(); $sem->up(); }; $SIG{FPE } = sub { $queue->enqueue("Worker $name has cooldown $cooldown."); }; $queue->enqueue("Worker $name starting"); my %nodes = (); while (1) { $queue->enqueue("$name step $cooldown"); sleep 1; $cooldown++; } } __END__ C:\test>743398.pl w1 >Worker 1 starting >1 step 0 >1 step 1 p1>1 step 2 Worker 1 is paused s1 >Worker 1 has cooldown 2. w2 >Worker 2 starting >2 step 0 >2 step 1 >2 step 2 p>2 step 3 2 Worker 2 is paused s2 >Worker 2 has cooldown 3. k1 >1 step 3 Worker 1 dying q >2 step 4 Worker 2 dying

    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.

      The architectural concept came from trying to add a layer of abstraction between the interface and the workers. I'd like all GUI events filtered through a boss prior to being delegated appropriately and all threads reporting results to the boss so they don't need to have any visualization coupling. The final application I'm working toward (in a very long-term sense) is complex curve-fitting and visualization in chemical engineering, with poss. Given the individual complexity of each task, I want a clear bulkhead between the two sides.

      That means you are absolutely correct in saying

      this conflation of loops was some attempt to bring all the interaction, input and output, together into a single thread
      My intent was to perform most message passing by way of queues and use thread signaling for tasks that don't lend themselves to that approach. If the general concept (as contrasted with implementation) is flawed, I love to get set straight. This literally represents my first attempts to use threaded Perl, where I have a reasonable parallel computing background in MPI and OpenMP for scientific Fortran 9x.

      Thanks for the help, insight and homework.

        My intent was to perform most message passing by way of queues and use thread signaling for tasks that don't lend themselves to that approach. If the general concept (as contrasted with implementation) is flawed, I love to get set straight.

        In the abstract, it doesn't make sense (to me, in isolation of your ultimate goals), to have a thread who's only purpose in life is to read from the keyboard and then queue that input to another thread to action. Especially when that other thread already has other things to do which, on the basis of what you've told me so far, have no inherent relationship to the input (command) actions.

        You say that your "intent was to perform most message passing by way of queues" but the command input only becomes a "message", if you queue it. If you do not queue it, the input is simply input that needs to be actioned. And that can be done immediately by the reading thread as I've shown above.

        One of the most basic principles of message passing architectures is that you do not poll. If you are going to poll, you are better off using an event loop architecture--though that is rarely a good idea. A message agent should just wait on its input queue until there is something for it to do, and then do it and go back to waiting. Your implementation above where the same agent is polling two queues completely defeats that basic principle.

        But reading between the lines of your post, your ultimate goals seem to be very different from the code/descriptions that you've posted so far. In that respect, it is difficult, and probably pointless, to make further recommendations.

        If you have a description of the goals and requirements of what you are hoping to achieve, then it might be possible to suggest an architecture that lends itself to development towards those goals, as a starting point.


        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.