#! perl -slw use strict; use threads; use Thread::Queue; use Thread::Semaphore; my $Q_outbox = new Thread::Queue; my $Q_inbox = new Thread::Queue; my $gag = new Thread::Semaphore(0); my $media = new threads(\&media, $Q_outbox, $gag ); close STDOUT; close STDERR; $Q_outbox->enqueue("media initiated..."); my $minister = new threads(\&minister, $Q_inbox, $Q_outbox, $gag, ); $Q_outbox->enqueue("minister initiated..."); # It's good to be king $gag->up(2); #while ($minister->is_running) { $_ = 'something irrelevant'; while (not /^q/) { $_ = ; chop; if (length) { $Q_inbox->enqueue( $_ ); } } # Clean up $minister->join; $media->join; #----------------------------------------------------------- sub media { # Condenses input from many sources into one stream my ($queue, $gag) = @_; close STDIN; $gag->down; while ( defined( $_ = $queue->dequeue ) ) { print STDOUT ">$_"; } } #----------------------------------------------------------- sub minister { # Receives orders from king and either dispatches specific # requests to workers or complains to media my ($Q_inbox, $Q_outbox, $gag) = @_; my %workers; my %sems; close STDIN; $gag->down; while (my $order = $Q_inbox->dequeue) { local $_; my $name; if ($order =~ m[^([a-zA-Z])[a-zA-Z]*\s*(\d+)?]) { $_ = $1; $name = $2; } else { $order ||= ' '; $Q_outbox->enqueue("Bad command $order"); next; } if (m[^q]i) { for( keys %workers ) { $sems{ $_ }->up; $workers{ $_ }->kill( 'KILL' ); } $_->join for values %workers; $Q_outbox->enqueue( undef ); # Terminate main loop last; } if (not defined $name) { $Q_outbox->enqueue("Non-integer thread name in '$order'"); next; } if( m[^w]i ) { if (not defined $name) { $Q_outbox->enqueue("You must give a name for the worker"); next; } if (exists $workers{ $name }) { $Q_outbox->enqueue("Worker $name already exists"); next; } $sems{ $name } = new Thread::Semaphore; $workers{ $name } = threads->create( \&worker, $name, $Q_outbox, $sems{ $name } ); next; } if (not exists $workers{ $name }) { $Q_outbox->enqueue("Worker $name does not exist"); next; } if (m[^k]i) { $sems{ $name }->up; ## Ensure the worker can respond delete( $workers{ $name } )->kill( 'KILL' )->join; next; } if( m[^p]i) { ${ $sems{ $name } } ? $sems{ $name }->down : $sems{ $name }->up; $workers{ $name }->kill( 'STOP' ); next; } if (m[^s]i) { $workers{ $name }->kill( 'FPE' ); $sems{ $name }->up; $sems{ $name }->down; next; } $Q_outbox->enqueue("Bad command $order"); } } #----------------------------------------------------------- sub worker { # Follow orders and report results to the media my( $name, $queue, $sem ) = @_; my $cooldown = 0; my $state = 0; $SIG{KILL} = sub { $queue->enqueue("Worker $name dying"); threads->exit(); }; $SIG{STOP} = sub { my $string = "Worker $name is "; $string .= ( $state ^= 1 ) ? 'paused' : 'resumed'; $queue->enqueue($string); $sem->down(); $sem->up(); }; $SIG{FPE } = sub { $sem->up(); $queue->enqueue("Worker $name has cooldown $cooldown"); $sem->down(); }; $queue->enqueue("Worker $name starting"); my %nodes = (); while (1) { $queue->enqueue("$name step $cooldown"); sleep 1; $cooldown++; } }