#!/usr/bin/perl use strict; use warnings; use threads; use Thread::Queue; use Thread::Semaphore; # Create terminal watcher print "Create terminal watcher...\n"; my $Q_stdin = Thread::Queue->new; my $T_input = async { $Q_stdin->enqueue( $_ ) while defined( $_ = ); }->detach; # Create tracking arrays close STDIN; my $Q_found = Thread::Queue->new; my %workers = (); my %sema = (); my $cmd; #my $i = 0; print "Awaiting commands...\n"; MAIN_LOOP: while (not defined $cmd or $cmd !~ /^q/i) { # Process commands #print ++$i, "\n"; $cmd = $Q_stdin->dequeue_nb; if (defined $cmd) { chomp $cmd; if ($cmd =~ /^q/i) { # Quit print "Resolving open threads\n"; } elsif ($cmd =~ /^w/i) { # Work my (undef, $thread) = split /\s+/, $cmd; if ($thread =~ /\D/) { print "Noninteger thread $thread in $cmd\n"; } elsif (defined ($workers{$thread}) and $workers{$thread}) { print "Value $thread already a worker\n"; } else { $sema{$thread} = Thread::Semaphore->new(1);; $workers{$thread} = threads->new(\&worker, $thread, $Q_found, $sema{$thread} ); } } elsif ($cmd =~ /^p/i) { # Pause my (undef, $thread) = split /\s+/, $cmd; if (defined ($workers{$thread}) and $workers{$thread}) { if (${$sema{$thread}}) { $sema{$thread}->down(); $workers{$thread}->kill('STOP'); print "Thread $thread paused\n"; } else { print "Thread $thread already paused\n"; } } else { print "Unrecognized thread $thread in $cmd\n"; } } elsif ($cmd =~ /^r/i) { #Resume my (undef, $thread) = split /\s+/, $cmd; if (defined ($workers{$thread}) and $workers{$thread}) { if (${$sema{$thread}}) { print "Thread $thread already running\n"; } else { $sema{$thread}->up(); print "Thread $thread resumed\n"; } } else { print "Unrecognized thread $thread in $cmd\n"; } } elsif ($cmd =~ /^s/i) { # Status my (undef, $thread) = split /\s+/, $cmd; if (defined ($workers{$thread}) and $workers{$thread}) { $workers{$thread}->kill('FPE'); } else { print "Unrecognized thread $thread in $cmd\n"; } } elsif ($cmd =~ /^k/i) { # Kill my (undef, $thread) = split /\s+/, $cmd; if (defined ($workers{$thread})) { if ($workers{$thread}) { $workers{$thread}->kill('KILL')->detach; print "Thread $thread killed\n"; $workers{$thread} = 0; } else { print "Thread $thread already dead\n"; } } else { print "Unrecognized thread $thread in $cmd\n"; } } else { print "Unknown/misformatted command $cmd\n"; } } # Print announcements while (defined(my $output = $Q_found->dequeue_nb)) { print $output, "\n"; } sleep(1); # Reduce load } # We're quitting - kill remaining threads for my $thread (grep $workers{$_}, keys %workers) { $workers{$thread}->kill('KILL')->detach; } #------------------------------------------------------------------------------- sub worker { my ($thread, $queue, $semafore) = @_; my $cooldown = 0; # Thread signal handlers $SIG{KILL} = sub { threads->exit(); }; # Kill $SIG{STOP} = sub { $semafore->down(); $semafore->up(); }; # Pause $SIG{FPE} = sub { $queue->enqueue("Thread $thread has cooldown $cooldown."); }; # Report $queue->enqueue("Thread name: $thread"); my %nodes = (); while (1) { $queue->enqueue("$thread step $cooldown"); sleep(5 * int(exp(0.1 * $cooldown++))); } }