in reply to threads on Windows

Ignoring for a moment all the other "advise" in this thread, I have a question.

Given that, as your comments suggests, using enqueue() directly:

#$Q_found->enqueue( $cmd ) ; # using this in place of async works

Rather than starting a whole new thread for the sole purpose of doing something that takes approximately 1/1000 of the time it takes to start that thread, the program runs perfectly:

c:\test>junk9 Create terminal watcher... Awaiting commands... fred fred bill bill john john quit Resolving open threads

Why are you starting that thread?


But then, why are you queuing to a second queue from your main loop, when the only thing you do with what you queue, is to dequeue it later from that same main loop?

Which also means that you have to use dequeue_nb() rather than dequeue(), which in turn forces you to use sleep to stop it from running away with the cpu. Using non-blocking dequeue makes a complete nonsense of a message passing architecture.


Even your OP code works--as you've designed it, though possibly not as you expect--on Win32. The ONLY change I've made is to prefix the output with '>' to differentiate it from the input:

#!/usr/bin/perl use strict; use warnings; use threads; use Thread::Queue; # Create terminal watcher print "Create terminal watcher...\n"; my $Q_stdin = Thread::Queue->new; async { $Q_stdin->enqueue( $_ ) while defined( $_ = <STDIN> ); }->detach; my $Q_found = Thread::Queue->new; my $cmd; print "Awaiting commands...\n"; MAIN_LOOP: while (not defined $cmd or $cmd !~ /^q/i) { sleep(1); # Reduce load # Process commands $cmd = $Q_stdin->dequeue_nb; if (defined $cmd) { chomp $cmd; if ($cmd =~ /^q/i) { print "Resolving open threads\n"; } else { async { $Q_found->enqueue( $cmd ) ; }->detach; #$Q_found->enqueue( $cmd ) ; # using this in place of asyn +c works } } # Print announcements while (defined(my $output = $Q_found->dequeue_nb)) { print ">$output\n"; } } __END__ c:\test>junk9 Create terminal watcher... Awaiting commands... fred bill john >fred mary >bill quit >john Resolving open threads >mary

Why do you not get the first output until after the third input? Because that's exactly the way you've designed it to work.

Why does it work differently on Win32 to *nix? Because they are different systems. Specifically, the OS serialises read and write accesses to console devices.

Can I prove this is the case? Yes. If you supply the input via a pipe, rather than from the keyboard, then you'll see the bahaviour you are (wrongly) expecting:

c:\test>perl -wle" printf qq[%08d\n], $_ for 1 .. 10; print 'quit' " | + perl junk9.pl Create terminal watcher... Awaiting commands... >00000001 >00000002 >00000003 >00000004 >00000005 >00000006 >00000007 >00000008 >00000009 Resolving open threads >00000010

If you would describe what you are actually trying to do, I can probably suggest a simpler and more consistant architecture.


Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.
"Too many [] have been sedated by an oppressive environment of political correctness and risk aversion."

Replies are listed 'Best First'.
Re^2: threads on Windows
by kennethk (Abbot) on Feb 12, 2009 at 17:19 UTC

    First, thank you for your detailed response.

    As you have noted, this code is extremely impractical. It represents the smallest version of a more complex but impractical code that repeated the issue, which is why I posted it. In addition to being a generic learning exercise, my ultimate goal is the design of a main processing engine with worker threads that respond to pause/resume/kill requests. The more complex version follows:

    .

    It mostly comes down to me being unwilling to relinquish control over the scheduling a user interface. Ultimately, I plan on replacing the CLI with a GUI thread, though this is probably all an abuse of queues. I also note I am abusing the FPE signal, which came down to random selection of a signal on both my Linux and Windows boxes that Windows did not treat as fatal.

      I do not understand how you arrived at this architecture.

      Why start a thread to accept the commands from STDIN and then post them to a queue, and then have your main thread read that queue and action them?

      The only reason I can see for doing this is so that your main thread/loop can make busy work polling two queues, in non-blocking mode, and sleeping.

      Ignoring that the second queues only purpose seems to be to print messages from the workers to the console (which could just as easily have been directed there from within the worker threads), the actions essentially consist entirely of sending signals, so why not perform those actions in the STDIN thread?

      I spent ages thinking about your statement of intent to move to using a gui, and whether this conflation of loops was some attempt to bring all the interaction, input and output, together into a single thread in preparation for that. But that doesn't make any sense either. You aren't going to use a command line for input and a gui for output (are you!?), and if the interaction is all going to be via the gui, whether button or keyboard based, it's all going to be event driven. So conflating the two now, would just need to be undone when you make the switch.

      Anyway, on the basis of what you've posted, here's how I would do what you are attempting. It all works, though using Thread::Sempaphore and those dratted per thread signals together is awfully clumsy, given that a signal won't even awake a sleep, never mind a semaphore wait.

      This code is somewhat stylistically challenged, because it was a case of tacking things in here and there to see what works. I might have tried to rewrite it a little, but given the total disparity between our coding styles, you would have just rewritten it yourself anyway--so you might as well expend the effort :)

      The sample run at the bottom shows that all the commands: (w)orker; (s)tatus; (p)ause/resume; (k)ill; and (q)uit all work. To some degree at least:

      #! perl -slw use strict; use threads; use Thread::Queue; use Thread::Semaphore; my $Q = new Thread::Queue; my $boss = async { my %workers; my %sems; while( <STDIN> ) { ( local( $_ ), my $name ) = m[([a-zA-Z])\s*(\d+)?] or next; warn "Non-integer thread '$name'\n" and next if defined $name and $name =~m[\D]; if( m[^w]i ) { warn "You must give a name for the worker\n" and next unless defined $name; warn "Worker $name already exists\n" and next if exists $workers{ $name }; $sems{ $name } = new Thread::Semaphore; $workers{ $name } = threads->create( \&worker, $name, $Q, $sems{ $name } ); next; } warn "Worker $name does not exist\n" and next unless m[^q] or exists $workers{ $name }; m[^k] and $sems{ $name }->up, ## Ensure the worker can respon +d delete( $workers{ $name } )->kill( 'KILL' )->join and next; if( m[^p] ) { ${ $sems{ $name } } ? $sems{ $name }->down : $sems{ $name }->up; $workers{ $name }->kill( 'STOP' ); next; } m[^s] and $sems{ $name }->up, ## Ensure a paused thread is able to r +espond $workers{ $name }->kill( 'FPE' ), $sems{ $name }->down ## and set it back and next; m[^q] or next; for( keys %workers ) { $sems{ $_ }->up; $workers{ $_ }->kill( 'KILL' )->join; } $Q->enqueue( undef ); ## Terminate main loop last; } }; print ">$_" while defined( $_ = $Q->dequeue ); $boss->join; exit; sub worker { my( $name, $queue, $sem ) = @_; my $cooldown = 0; my $state = 0; $SIG{KILL} = sub { warn "Worker $name dying\n"; threads->exit(); }; $SIG{STOP} = sub { printf STDERR "Worker $name is %s\n", ( $state ^= 1 ) ? 'paused' : 'resumed'; $sem->down(); $sem->up(); }; $SIG{FPE } = sub { $queue->enqueue("Worker $name has cooldown $cooldown."); }; $queue->enqueue("Worker $name starting"); my %nodes = (); while (1) { $queue->enqueue("$name step $cooldown"); sleep 1; $cooldown++; } } __END__ C:\test>743398.pl w1 >Worker 1 starting >1 step 0 >1 step 1 p1>1 step 2 Worker 1 is paused s1 >Worker 1 has cooldown 2. w2 >Worker 2 starting >2 step 0 >2 step 1 >2 step 2 p>2 step 3 2 Worker 2 is paused s2 >Worker 2 has cooldown 3. k1 >1 step 3 Worker 1 dying q >2 step 4 Worker 2 dying

      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.

        The architectural concept came from trying to add a layer of abstraction between the interface and the workers. I'd like all GUI events filtered through a boss prior to being delegated appropriately and all threads reporting results to the boss so they don't need to have any visualization coupling. The final application I'm working toward (in a very long-term sense) is complex curve-fitting and visualization in chemical engineering, with poss. Given the individual complexity of each task, I want a clear bulkhead between the two sides.

        That means you are absolutely correct in saying

        this conflation of loops was some attempt to bring all the interaction, input and output, together into a single thread
        My intent was to perform most message passing by way of queues and use thread signaling for tasks that don't lend themselves to that approach. If the general concept (as contrasted with implementation) is flawed, I love to get set straight. This literally represents my first attempts to use threaded Perl, where I have a reasonable parallel computing background in MPI and OpenMP for scientific Fortran 9x.

        Thanks for the help, insight and homework.