#!/usr/bin/perl use strict; use warnings; select(STDERR); $| = 1; select(STDOUT); $| = 1; #use lib './'; #use Parler; my $par = undef; my $fh; open($fh, '<', '/usr/share/dict/words') || die "could not open dictionary, $!\n"; my @dictionary = (); while( <$fh> ){ chomp; push @dictionary, $_ } close($fh); $par = Parler->new({ 'dictionary' => \@dictionary, }); my $results = $par->process({ 'num-threads' => 4, 'max-items' => 10, 'words' => ['pearl', 'jam'], }); die "error calling process()" if not defined($results); # This package does "lexical analysis"(mock) given a huge # dictionary. In parallel over a number of thread-workers. # Cost to pay: duplicates the read-only dictionary # over each thread. # OO style. # by bliako # 19/09/2018 package Parler; use strict; use warnings; use threads; use threads::shared; use Thread::Queue; use Thread::Stack; sub new { my $class = $_[0]; my $params = $_[1] // {}; my $self = { # an array of words but can also be a hash - not nested 'dictionary' => $params->{'dictionary'}, 'dictionary-size' => scalar( @{$params->{'dictionary'}} ), # the stop-flag is a ref to an integer which is set to 1 # whenever stop() is called. 'stop-now-flag' => &share(\my $whatever), }; bless($self, $class); ${$self->{'stop-now-flag'}} = 0; # Here is the complex sub to find synonyms. # However, I am using just a mock for now which # spits out random words from the dictionary as arrayref $self->{'find-synonyms-sub'} = sub { my $aword = shift; return [ map { $self->{'dictionary'}->[ int(rand($self->{'dictionary-size'})) ] } (0..int(rand(3))) ] }; return $self } # a signal handler could just call this to abort... sub stop { ${$_[0]->{'stop-now-flag'}} = 1 } # spawns threads to do the processing. waits for the threads and returns results # based on comments by BrowserUK and others on # L sub process { my $self = $_[0]; my $params = $_[1] // {}; # required param my $inwords = $params->{'words'}; die "process() : 'words' needed." if ! defined($inwords); # optional input params: my $num_threads = $params->{'num-threads'} // 4; my $max_items_to_process : shared = $params->{'max-items'} // 20; my $work_Q = Thread::Queue->new(); my $failures_Q = Thread::Queue->new(); my $results_Q = Thread::Queue->new(); my $currently_working_on_Q = Thread::Queue->new(); my $num_items_processed_so_far : shared = 0; # spawn the workers my @thread_pool = (); for(1..$num_threads){ my $athread = threads->create( \&Parler::worker, # subref to worker $work_Q, $failures_Q, $results_Q, $currently_working_on_Q, \$num_items_processed_so_far, $max_items_to_process, $self->{'stop-now-flag'}, $self->{'find-synonyms-sub'}, time ); if( ! defined($athread) ){ print STDERR "process() : failed to create thread.\n"; } else { push(@thread_pool, $athread); } } # now add all the specified input words and let it start # lock it and in a block so that the lock goes away when out { lock($work_Q); $work_Q->enqueue($_) for @$inwords; } # wait until all threads finish for(@thread_pool){ $_->join; print "process() : thread terminated and joined.\n"; } my $astr; # did we have failures? while( defined($astr=$failures_Q->dequeue_nb()) ){ print "process() : got failure : $astr\n"; } # any items still pending? while( defined($astr=$currently_working_on_Q->dequeue_nb()) ){ print "process() : words in unfinished items queue: $astr\n"; } my @results; print "process() : done. Here are the results:\n"; while( defined($astr=$results_Q->dequeue_nb()) ){ my ($word, @synons) = split(/\t/, $astr); print " $word : ".join(", ", @synons)."\n"; push(@results, [$word, \@synons]); } print "process() : items found $num_items_processed_so_far (max was $max_items_to_process).\n"; return \@results } sub worker { my ( $WQ, # work queue $FQ, # failures queue $RQ, # send-results-back queue $CWQ, # items-currently-working-on queue $num_items_done_ref, # how many items all workers have completed $max_items_to_do, # how many items we need to shutdown $stop_now_ref, # a flag to stop if set to 1 $synonyms_finder_sub_ref, # a coderef to call to find the synonyms $time_started, ) = @_; my $tid = threads->tid(); print "thread $tid : started in ".(time-$time_started)."s. synonyms_finder_sub_ref=$synonyms_finder_sub_ref\n"; my $time_started2 = time; my ($word, $astr, $i); MAINLOOP: while( $$num_items_done_ref <= $max_items_to_do and $$stop_now_ref == 0 ){ print "thread $tid : looping and stop is ".$$stop_now_ref."\n"; # wait for work to appear for up the specified # time (do not set to zero or use a sleep) # or go back to check the loop conditions # our job is to get the next word from the Q # and find its synonyms next unless defined($word=$WQ->dequeue_timed(1)); print "thread $tid : got a word : '$word'\n"; # is anyone else working on this word? { lock($CWQ); # lock it first for($i=$CWQ->pending();$i-->0;){ next MAINLOOP if $CWQ->peek($i) =~ /^$word\t/ } } print "thread $tid : nobody works on this word: $word\n"; # word has been explored, don't do it again { lock($RQ); # lock it first for($i=$RQ->pending();$i-->0;){ next MAINLOOP if $RQ->peek($i) =~ /^$word\t/ } } print "thread $tid : word has not been explored: $word\n"; # fine we proceed: print "thread $tid : working on word $word\n"; # 1. register we are doing this word { lock($CWQ); $CWQ->enqueue($word."\t".$tid); } # 2. process it calling the sub-ref and possibly others # results is an array ref of synonyms my $synons = $synonyms_finder_sub_ref->($word); if( defined($synons) ){ print "thread $tid : got synonyms for '$word' : ".join(", ", @$synons)."\n"; # word has been processed ok # 1. insert it to the results { lock($RQ); $RQ->enqueue(join("\t", $word, @$synons)); } # 2. insert all synonyms found into the work queue # so all threads to explore them { lock($WQ); $WQ->enqueue($_) for @$synons; } # 3. update the total number of items completed (will shutdown when max) { lock($num_items_done_ref); $$num_items_done_ref = $$num_items_done_ref + scalar(@$synons); } } else { # oops error print STDERR "thread $tid : call to synonyms_finder_sub_ref() has failed for word : $word. Skipping this word ...\n"; { lock($FQ); $FQ->enqueue("thread $tid : failed for word '$word'"); } } # 4. remove it from the currently-working hash (even if failed) { lock($CWQ); for($i=$CWQ->pending();$i-->0;){ $CWQ->extract($i) if $CWQ->peek($i) =~ /^$word\t/ } } } print "thread $tid : shut down after ".(time-$time_started2)."s.\n"; } 1;