in reply to Re^5: threads on Windows
in thread threads on Windows
I've thought about what you've said and played with code for a few days. And I come up with some new material, posted below. I have multiple threads, which I call king, minister, worker, and media. The king (the main thread) makes proclamations, i.e. takes keyboard input and sticks it on the input queue. The minister implements these by sitting on the input queue, deciding policy and delegating to workers or sticking errors on the output queue. The worker threads are unchanged from the code you posted a few nodes up. All output is handled by the media thread, which sits on the output queue. In this way, I tried to keep the message agents (minister and media) working only on one queue. If this does gain a GUI, the king would be where this would be implemented.
There are a few problems I see with what I have written:
As a final note, if I do not explicitly close all thread-inappropriate channels before proceeding, I get blocking behavior between the <> in the King thread and the new thread generation in the Minister thread. I still do not grok why this happens - if you know of any good materials that discuss how I/O is implemented on a MS system, particularly with regard to Perl behaviors, I'd appreciate a shove in that direction. I hate solutions that say "Just don't do that".
Any wisdom you care to share would be appreciated.
#! perl -slw use strict; use threads; use Thread::Queue; use Thread::Semaphore; my $Q_outbox = new Thread::Queue; my $Q_inbox = new Thread::Queue; my $gag = new Thread::Semaphore(0); my $media = new threads(\&media, $Q_outbox, $gag ); close STDOUT; close STDERR; $Q_outbox->enqueue("media initiated..."); my $minister = new threads(\&minister, $Q_inbox, $Q_outbox, $gag, ); $Q_outbox->enqueue("minister initiated..."); # It's good to be king $gag->up(2); #while ($minister->is_running) { $_ = 'something irrelevant'; while (not /^q/) { $_ = <STDIN>; chop; if (length) { $Q_inbox->enqueue( $_ ); } } # Clean up $minister->join; $media->join; #----------------------------------------------------------- sub media { # Condenses input from many sources into one stream my ($queue, $gag) = @_; close STDIN; $gag->down; while ( defined( $_ = $queue->dequeue ) ) { print STDOUT ">$_"; } } #----------------------------------------------------------- sub minister { # Receives orders from king and either dispatches specific # requests to workers or complains to media my ($Q_inbox, $Q_outbox, $gag) = @_; my %workers; my %sems; close STDIN; $gag->down; while (my $order = $Q_inbox->dequeue) { local $_; my $name; if ($order =~ m[^([a-zA-Z])[a-zA-Z]*\s*(\d+)?]) { $_ = $1; $name = $2; } else { $order ||= ' '; $Q_outbox->enqueue("Bad command $order"); next; } if (m[^q]i) { for( keys %workers ) { $sems{ $_ }->up; $workers{ $_ }->kill( 'KILL' ); } $_->join for values %workers; $Q_outbox->enqueue( undef ); # Terminate main loop last; } if (not defined $name) { $Q_outbox->enqueue("Non-integer thread name in '$order'"); next; } if( m[^w]i ) { if (not defined $name) { $Q_outbox->enqueue("You must give a name for the worke +r"); next; } if (exists $workers{ $name }) { $Q_outbox->enqueue("Worker $name already exists"); next; } $sems{ $name } = new Thread::Semaphore; $workers{ $name } = threads->create( \&worker, $name, $Q_outbox, $sem +s{ $name } ); next; } if (not exists $workers{ $name }) { $Q_outbox->enqueue("Worker $name does not exist"); next; } if (m[^k]i) { $sems{ $name }->up; ## Ensure the worker can respond delete( $workers{ $name } )->kill( 'KILL' )->join; next; } if( m[^p]i) { ${ $sems{ $name } } ? $sems{ $name }->down : $sems{ $name }->up; $workers{ $name }->kill( 'STOP' ); next; } if (m[^s]i) { $workers{ $name }->kill( 'FPE' ); $sems{ $name }->up; $sems{ $name }->down; next; } $Q_outbox->enqueue("Bad command $order"); } } #----------------------------------------------------------- sub worker { # Follow orders and report results to the media my( $name, $queue, $sem ) = @_; my $cooldown = 0; my $state = 0; $SIG{KILL} = sub { $queue->enqueue("Worker $name dying"); threads->exit(); }; $SIG{STOP} = sub { my $string = "Worker $name is "; $string .= ( $state ^= 1 ) ? 'paused' : 'resumed'; $queue->enqueue($string); $sem->down(); $sem->up(); }; $SIG{FPE } = sub { $sem->up(); $queue->enqueue("Worker $name has cooldown $cooldown"); $sem->down(); }; $queue->enqueue("Worker $name starting"); my %nodes = (); while (1) { $queue->enqueue("$name step $cooldown"); sleep 1; $cooldown++; } }
|
|---|
| Replies are listed 'Best First'. | |
|---|---|
|
Re^7: threads on Windows
by BrowserUk (Patriarch) on Feb 18, 2009 at 15:22 UTC | |
by kennethk (Abbot) on Feb 18, 2009 at 16:38 UTC | |
by BrowserUk (Patriarch) on Feb 19, 2009 at 04:34 UTC | |
by kennethk (Abbot) on Feb 19, 2009 at 14:38 UTC |