Re: Multi-thread combining the results together
by GrandFather (Saint) on Jul 25, 2019 at 07:06 UTC
|
use strict;
use warnings;
use threads;
use Thread::Queue;
my $workQueue = Thread::Queue->new();
my $doneQueue = Thread::Queue->new();
my @threads;
push @threads, threads->create(sub{DoWork($workQueue, $doneQueue)}) fo
+r 1 .. 4;
for (1 .. 10000) {
my $work = int(rand(10000));
$workQueue->enqueue($work);
}
$workQueue->end();
$_->join() for @threads;
print "$_\n" while $doneQueue->pending() && ($_ = $doneQueue->dequeue(
+));
exit;
sub DoWork {
my ($work, $done) = @_;
while (my $item = $work->dequeue()) {
$done->enqueue($item) if $item >= 10 && $item <= 20;
}
}
Prints (YMMV):
17
14
11
19
12
10
13
10
11
Optimising for fewest key strokes only makes sense transmitting to Pluto or beyond
| [reply] [d/l] [select] |
|
|
use strict;
use warnings;
use MCE::Hobo;
use MCE::Shared;
my $workQueue = MCE::Shared->queue();
my $doneQueue = MCE::Shared->queue();
my $numWorkers = 4;
MCE::Hobo->create(sub{DoWork($workQueue, $doneQueue)}) for 1 .. $numWo
+rkers;
for (1 .. 10000) {
my $work = int(rand(10000));
$workQueue->enqueue($work);
}
$workQueue->end();
my $count_done = 0;
while () {
my $result = $doneQueue->dequeue();
last if (!$result && ++$count_done == $numWorkers);
print "$result\n" if $result;
}
$_->join() for MCE::Hobo->list;
exit;
sub DoWork {
my ($work, $done) = @_;
while (my $item = $work->dequeue()) {
$done->enqueue($item) if $item >= 10 && $item <= 20;
}
$done->enqueue(0);
}
Regards, Mario | [reply] [d/l] |
|
|
Thank you very much!
I did have to merge some of your ideas together with TreadQueue.
My code works and is 3+x faster with 4 threads. That is about what I expected in terms of performance increase.
Thanks again - your code is much clearer than the Perl Doc and is a much easier to follow set of model code.
Update: some code:
| [reply] [d/l] |
|
|
Thank you Grandfather! This was perfect! This was a lot easier for me to understand than one example I saw in the docs. I was able to successfully adapt my code and meet my anticipated goal of 30 min with 4 cores. That is fast enough for me to continue on with the analysis work that I'm doing right now. I'd like to develop a better algorithm, but that will be the subject of another SOPW question - There has got to be some kind of tree structure that will work much, much faster but I'm not sure how to build something like that with wildcards. Thanks Again!
Oops, I see that I already posted a thank you. So this is redundant.
| [reply] |
|
|
| [reply] |
|
|
Anonymous Monk:
Yes, but that's done after all the work has been queued. So the main thread simply waits for all the work to complete before exiting.
In a "real" application, that would let your main thread collect some statistics and report before ending, to let you know what's going on.
...roboticus
When your only tool is a hammer, all problems look like your thumb.
| [reply] |
|
|
| [reply] |
|
|
No need for extra wrappers
push @threads, threads->create(\&DoWork, $workQueue, $doneQueue) for 1
+ .. 4;
| [reply] [d/l] |
Re: Multi-thread combining the results together
by marioroy (Prior) on Jul 25, 2019 at 11:44 UTC
|
Hi Marshall,
Inbound two MCE examples in response to monks Discipulus and 1nickt suggesting MCE. The 1st example stores to a hash.
use strict;
use warnings;
use MCE::Loop;
use Data::Dumper;
my @tokens = qw( ab bc cd de ef fg gh );
sub build_regex {
my ($token) = @_;
chop $token;
}
MCE::Loop->init(
chunk_size => 1,
max_workers => 4,
);
my %result = mce_loop {
my $token = $_;
my $regex = build_regex($token);
my @line_results = grep { $_ ne $token and /$regex/ } @tokens;
MCE->gather( $token => \@line_results ) if @line_results;
} @tokens;
MCE::Loop->finish;
print Dumper(\%result), "\n";
__END__
$VAR1 = {
'ef' => [
'fg'
],
'fg' => [
'gh'
],
'bc' => [
'cd'
],
'ab' => [
'bc'
],
'cd' => [
'de'
],
'de' => [
'ef'
]
};
The 2nd example stores to an array and maintains order, possible with MCE::Relay which runs orderly.
use strict;
use warnings;
use MCE::Loop;
use Data::Dumper;
my @tokens = qw( ab bc cd de ef fg gh );
sub build_regex {
my ($token) = @_;
chop $token;
}
MCE::Loop->init(
chunk_size => 1,
max_workers => 4,
init_relay => 1,
);
my @result = mce_loop {
my $token = $_;
my $regex = build_regex($token);
my @line_results = grep { $_ ne $token and /$regex/ } @tokens;
MCE::relay {
MCE->gather(\@line_results) if @line_results;
};
} @tokens;
MCE::Loop->finish;
print Dumper(\@result), "\n";
__END__
$VAR1 = [
[
'bc'
],
[
'cd'
],
[
'de'
],
[
'ef'
],
[
'fg'
],
[
'gh'
]
];
Regards, Mario | [reply] [d/l] [select] |
|
|
use strict;
use warnings;
use Parallel::ForkManager;
use MCE::Shared;
use Data::Dumper;
my @tokens = qw( ab bc cd de ef fg gh );
sub build_regex {
my ($token) = @_;
chop $token;
}
my $pm = Parallel::ForkManager->new(4);
$pm->set_waitpid_blocking_sleep(0);
tie my %result, 'MCE::Shared';
DATA_LOOP:
foreach my $token ( @tokens ) {
# forks and returns the pid for the child
my $pid = $pm->start and next DATA_LOOP;
my $regex = build_regex($token);
my @line_results = grep { $_ ne $token and /$regex/ } @tokens;
$result{ $token } = \@line_results if @line_results;
$pm->finish(0);
}
$pm->wait_all_children;
print Dumper( tied(%result)->export({ unbless => 1 }) ), "\n";
__END__
$VAR1 = {
'ef' => [
'fg'
],
'fg' => [
'gh'
],
'cd' => [
'de'
],
'de' => [
'ef'
],
'bc' => [
'cd'
],
'ab' => [
'bc'
]
};
Regards, Mario
| [reply] [d/l] |
Re: Multi-thread combining the results together
by bliako (Abbot) on Jul 25, 2019 at 09:41 UTC
|
I usually start with this: How to create thread pool of ithreads
I have also asked a question about sharing complex/nested data between threads here ithreads, locks, shared data: is that OK?.
You seem to need to share only an array and hash of scalars, which is so much simpler and faster for you and can be done via Thread::Queue (edit: see also: Re: Multi-thread combining the results together) for the array and see below for the hash. You may be tempted to share a hash between threads to store the %result. I would say don't in this case, because data will be duplicated in each thread. It is not clear to me if using threads::shared will actually share a reference with locking and semaphors or it does a transparent and sophisticated data duplication behind the scenes, from manual:
By default, variables are private to each thread, and each newly created thread gets a private copy of each existing variable. This module allows you to share variables across different threads
In order to avoid sharing a hash I use this trick: push into the done queue a string like "$token=[@line_results]". When all threads are joined I convert the strings to the results hash.
It will make a huge difference in performance if you minimise your reads/writes to the Queue by, for example, read (edit: and dequeue!) all the data you will need for that particular thread once at the beginning instead of in a loop. Do processing and write results to a temporary thread-private variable. Write that variable in one go to the Queue when done in order to eliminate the locking and unlocking each time you write to the Queue...
So, reducing your running time proportionally to the number of threads is a holy grail as there are data read/write costs. Which proves that parallelism can sometimes be worse for performance! Ah the eternal battle between cooks and romantics ("too many cooks in the kitchen" vs "many hands make light work"). Aim to share as little as possible...
Of course nothing stops you from requesting a memory segment shareable to all processes/threads via IPC::Shareable. There you can de-/serialise any complex data structure to be shared but you will need to implement your own locking. Recent article with some code: Re: IPC::Shareable sometimes leaks memory segments
bw, bliako | [reply] [d/l] [select] |
|
|
Thanks for the input! You, Grandfather and 1nickt have given some ideas to work on.
My single threaded code uses a hash for the output, but I don't need to do that. Each thread can push a ref to Array
(a row) onto a common output queue and I can deal with that after everybody is finished. Converting 80K
rows to a hash or sorting this is a "no brainer" compared with the time it takes to run the regex.
I wrote the build_regex() function back in 2007 and I'm at the point where what was easily fast enough
a decade ago no longer is. I will be rethinking the algorithm, but if I can "juice this baby up by a factor of 3-4",
that will give me enough time to ponder a new approach to the problem.
| [reply] |
|
|
Also, I found this very useful Threads From Hell #1: How To Share A Hash [SOLVED].
This is far-fetched but in case you want to run a server (as a separate script) which provides data to workers (separate single-thread processes) then this can get you started: Re: Disc burning. Good for allowing for DB access and distributing over a cluster - a grand design for sure.
| [reply] |
Re: Multi-thread combining the results together
by marioroy (Prior) on Jul 26, 2019 at 03:22 UTC
|
Hello, fellow monks of the monastary,
This post measures how long using the various parallel implementations. Please compare from left to right and not so much between OS'es. The results were captured on a MacBook Pro (late 2013 model). It's an i7 core CPU with 4 real cores. The specs says 2.6 GHz but may run up to 3.6 GHz on one core with Turbo Boost in effect.
Serial threads MCE::Hobo MCE::Loop MCE::Child
Linux 74.488s 25.345s 20.217s 17.932s 17.291s
macOS 84.010s 24.844s 24.505s 23.786s 23.069s
Windows 102.055s 29.702s 37.913s 28.376s 27.839s
Cygwin 162.474s ------- 50.092s 45.318s 43.949s
FreeBSD 107.569s ------- 31.138s 30.530s 30.050s
threads is slow on Cygwin, so stopped due to taking too long.
My FreeBSD has an older Thread::Queue missing the ->end method.
Serial Code
I very much like to know how long running on a single core. These days, processors have Turbo Boost capabilities. Thus, not easy comparing total time between serial and parallel. The reason is that the processor might be running at a higher clock speed when utilizing one core only. So be mindful of that.
use strict;
use warnings;
use Time::HiRes 'time';
my @tokens = ('aaa'...'zzz');
my $start = time;
sub build_regex {
my ($token) = @_;
chop $token;
$token .= 'a';
}
my %result;
foreach my $token ( @tokens ) {
my $regex = build_regex($token);
my @line_results = grep { $_ ne $token and /$regex/ } @tokens;
$result{$token} = \@line_results if @line_results;
};
printf "duration: %0.03f seconds\n", time - $start;
print scalar(keys %result), "\n"; # 16900
threads
This is based off Grandfather's example. It was modified to run the serial demonstration in parallel (i.e. same input data). Workers enqueuing key => val pair means that the manager process must also dequeue(2) items.
use strict;
use warnings;
use threads;
use Thread::Queue;
use Time::HiRes 'time';
my $workQueue = Thread::Queue->new();
my $doneQueue = Thread::Queue->new();
my $numWorkers = 4;
my @tokens = ('aaa'...'zzz');
my $start = time;
sub build_regex {
my ($token) = @_;
chop $token;
$token .= 'a';
}
threads->create(sub{DoWork($workQueue, $doneQueue)}) for 1..$numWorker
+s;
$workQueue->enqueue($_) for @tokens;
$workQueue->end();
my $count_finished = 0;
my %result;
while () {
my ($key, $val) = $doneQueue->dequeue(2);
last if (!$key && ++$count_finished == $numWorkers);
$result{$key} = $val if $key;
}
$_->join() for threads->list;
printf "duration: %0.03f seconds\n", time - $start;
print scalar(keys %result), "\n"; # 16900
exit;
sub DoWork {
my ($work, $done) = @_;
while (my $token = $work->dequeue()) {
my $regex = build_regex($token);
my @line_results = grep { $_ ne $token and /$regex/ } @tokens;
$done->enqueue($token => \@line_results) if @line_results;
}
$done->enqueue(0,0);
}
MCE::Hobo
Fortunately, one can run similarly using MCE::Hobo if your Perl binary lacks threads support.
use strict;
use warnings;
use MCE::Hobo;
use MCE::Shared;
use Time::HiRes 'time';
my $workQueue = MCE::Shared->queue();
my $doneQueue = MCE::Shared->queue();
my $numWorkers = 4;
my @tokens = ('aaa'...'zzz');
my $start = time;
sub build_regex {
my ($token) = @_;
chop $token;
$token .= 'a';
}
MCE::Hobo->create(sub{DoWork($workQueue, $doneQueue)}) for 1..$numWork
+ers;
$workQueue->enqueue($_) for @tokens;
$workQueue->end();
my $count_finished = 0;
my %result;
while () {
my ($key, $val) = $doneQueue->dequeue(2);
last if (!$key && ++$count_finished == $numWorkers);
$result{$key} = $val if $key;
}
$_->join() for MCE::Hobo->list;
printf "duration: %0.03f seconds\n", time - $start;
print scalar(keys %result), "\n"; # 16900
exit;
sub DoWork {
my ($work, $done) = @_;
while (my $token = $work->dequeue()) {
my $regex = build_regex($token);
my @line_results = grep { $_ ne $token and /$regex/ } @tokens;
$done->enqueue($token => \@line_results) if @line_results;
}
$done->enqueue(0,0);
}
MCE::Loop
Think of MCE's input mechanism as the workQueue. Likewise, think of MCE's gathering capability as sending to a doneQueue. This is basically what happens internally and handled efficiently.
use strict;
use warnings;
use MCE::Loop;
use Time::HiRes 'time';
my @tokens = ('aaa'...'zzz');
my $start = time;
sub build_regex {
my ($token) = @_;
chop $token;
$token .= 'a';
}
MCE::Loop->init(
chunk_size => 1,
max_workers => 4,
);
my %result = mce_loop {
my $token = $_;
my $regex = build_regex($token);
my @line_results = grep { $_ ne $token and /$regex/ } @tokens;
MCE->gather( $token => \@line_results ) if @line_results;
} @tokens;
MCE::Loop->finish;
printf "duration: %0.03f seconds\n", time - $start;
print scalar(keys %result), "\n"; # 16900
MCE::Child
Welcome to MCE::Child and MCE::Channel (both new). Unlike the prior two queue demonstrations, there is one important consideration to remember about MCE::Channel. The channel behaves more like a pipe (i.e. the data resides in the OS-level socket buffer and not in memory). There are some advantage to this such as not having to worry about the producer running faster than consumers.
I omitted the doneQueue. Instead, workers store the results locally and send upon termination. This does shorten the code if anything. The real reason (regarding MCE::Channel) is that I'm not having to worry about a dead-lock situation. Instantiating a doneQueue using MCE::Channel could lead to workers blocking because the manager process is not yet dequeuing.
use strict;
use warnings;
use MCE::Child;
use MCE::Channel;
use Time::HiRes 'time';
my $workQueue = MCE::Channel->new(impl => 'Mutex');
my $numWorkers = 4;
my @tokens = ('aaa'...'zzz');
my $start = time;
sub build_regex {
my ($token) = @_;
chop $token;
$token .= 'a';
}
MCE::Child->create(sub{DoWork($workQueue)}) for 1..$numWorkers;
$workQueue->enqueue($_) for @tokens;
$workQueue->end();
my %result = map { %{ $_->join() } } MCE::Child->list;
printf "duration: %0.03f seconds\n", time - $start;
print scalar(keys %result), "\n"; # 16900
exit;
sub DoWork {
my ($workQueue) = @_;
my %result; # store locally
while (my $token = $workQueue->dequeue()) {
my $regex = build_regex($token);
my @line_results = grep { $_ ne $token and /$regex/ } @tokens;
$result{$token} = \@line_results if @line_results;
}
return \%result;
}
The take-away from this is that one may have workers store result locally and send upon termination. This is a way of decreasing IPC overhead. Not shown here is MCE's chunking capabilities.
Regards, Mario | [reply] [d/l] [select] |
Re: Multi-thread combining the results together -- MCE
by Discipulus (Canon) on Jul 25, 2019 at 07:43 UTC
|
Hello Marshall,
> I'd appreciate some advice on multi-threading.
Unfortunately I'm not able to provide an example, but you should give a try to MCE by marioroy (if you are lucky and if he has time he will provide fo sure an up to date example).
You can see some link about parallel programming in my bibliotheca
L*
There are no rules, there are no thumbs..
Reinvent the wheel, then learn The Wheel; may be one day you reinvent one of THE WHEELS.
| [reply] [d/l] |
Re: Multi-thread combining the results together
by 1nickt (Canon) on Jul 25, 2019 at 09:46 UTC
|
Hi Marshall,
When you think parallel Perl, think MCE! It's easy to distribute work among child workers that all write to a shared data structure!
Here is an example I shared some time ago that could easily be adapted to your case: Re^2: (Easy Perl concurrency with MCE), although the latest version of MCE includes MCE::Channel which according to Mario is more suited for queue-like work.
Hope this helps!
The way forward always starts with a minimal test.
| [reply] |
Re: Multi-thread combining the results together
by vr (Curate) on Jul 25, 2019 at 10:13 UTC
|
It may be obvious and you have already considered this (then I'm sorry, and skip what follows), but you are starting regex engine 6+ billion times. If the %result is relatively sparsely populated in the end, and if tokens can be joined using clearly "alien" separator symbol (or sequence) to prevent matching across tokens, then matching against concatenated string (regex engine starts just N times) can help. In code below, if line A is un-commented, then block B is executed N*N = 1e6 times as expected, and each token "matches" all other tokens -- very uninteresting. Otherwise, with more picky criteria of a token to be related to another token, your goal of "at least 3x faster" is easily achieved even before parallelization.
use strict;
use warnings;
use feature 'say';
use Data::Dump 'dd';
use Time::HiRes 'time';
my $N = 1000;
srand 123;
my @tokens = map { int rand 1_000_000 } 1 .. $N;
sub build_regex {
# return qr/\d+/; # line A
my $s = shift;
my $d = substr $s, 0, 1;
qr/[0-9]$d\d{0,9}?$d/
}
{ # case 1
my $t = time;
my %result;
foreach my $token (@tokens)
{
my $regex = build_regex($token);
my @line_results = grep {$_ ne $token and /$regex/ }@tokens;
$result{$token} = [@line_results];
}
say time - $t;
}
{ # case 2
my $t = time;
my $count = 0;
my $sep = '~';
my $sep_len = length $sep;
my @idx;
for ( 0 .. $#tokens ) {
my $L = length $tokens[ $_ ];
@idx[ map{ $sep_len + @idx + $_ } 0 .. $L - 1 ] = ( $_ ) x $L
}
my $concat = join $sep, '', @tokens, '';
my %result;
for my $i ( 0 .. $#tokens ) {
my $token = $tokens[ $i ];
my $regex = build_regex( $token );
$result{ $token } = [];
my $prev = -1;
while ( $concat =~ /$regex/g ) { # block B
my $j = $idx[ $-[ 0 ]];
push @{ $result{ $token }}, $tokens[ $j ]
if $j != $i and $j != $prev;
$prev = $j;
$count ++;
}
}
say time - $t;
say $count;
}
__END__
# Output with "A" line un-commented
0.978141069412231
1.23276996612549
1000000
# Output with "A" line commented-out
0.648768901824951
0.150562047958374
78176
Edit: (1) replaced separator character with more neutral "~" from "|", so it doesn't look like
regex alternation; (2) added "comments" to output section, so it's more clear they are different runs. | [reply] [d/l] [select] |
|
|
Interesting.
Here is one example of what this does:
testing: "k5bai"
0: ^.5BAI$
1: ^K.BAI$
2: ^K5.AI$
3: ^K5B.I$
4: ^K5BA.$
5: .*5BAI\z
6: ^K5ABI\z <-N2 error
7: ^K5BIA\z <-N2 error
8: ^K5IBA\z <- really bad
9: ^K5AIB\z
10: ^KB5AI\z
11: ^K5BA\z
12: ^K5BI\z
13: ^K5AI\z
14: ^K5B\z
my regex = (^.5BAI$)|(^K.BAI$)|(^K5.AI$)|(^K5B.I$)|(^K5BA.$)|(.*5BAI\z
+)|(^K5ABI\z)|(^K5BIA\z)|(^K5IBA\z)|(^K5AIB\z)|(^KB5AI\z)|(^K5BA\z)|(^
+K5BI\z)|(^K5AI\z)|(^K5B\z)
Instead of running for each @tokens, I suspect that it would be faster to run the regex against a single string of the concatenation of all of the tokens.
I haven't thought about this code for many moons. Time for a re-think. | [reply] [d/l] |
|
|
| [reply] |
|
|
If original build_regex can return expression with "start of string" or "end of string" markers, these should be replaced, for my approach, with lookarounds for $sep, of course.
Huge regex with many alternations is different to what I suggested, and may suit better your data/expected output.
| [reply] [d/l] [select] |
|
|
|
|
|
Re: Multi-thread combining the results together
by karlgoethebier (Abbot) on Jul 25, 2019 at 13:30 UTC
|
Starting here (press the search button) might be helpful and inspiring for older soldiers 🤪. My notorious "Threads From Hell" series. Best regards and good luck, Karl
Minor update: Added help to use Super Search. Thanks roboticus.
«The Crux of the Biscuit is the Apostrophe»
perl -MCrypt::CBC -E 'say Crypt::CBC->new(-key=>'kgb',-cipher=>"Blowfish")->decrypt_hex($ENV{KARL});'Help
| [reply] [d/l] |
Re: Multi-thread combining the results together
by karlgoethebier (Abbot) on Jul 26, 2019 at 08:41 UTC
|
Starting here might be helpful and inspiring for older soldiers 🤪. My notorious "Threads From Hell" series. Best regards and good luck, Karl
«The Crux of the Biscuit is the Apostrophe»
perl -MCrypt::CBC -E 'say Crypt::CBC->new(-key=>'kgb',-cipher=>"Blowfish")->decrypt_hex($ENV{KARL});'Help
| [reply] [d/l] |