Re: Code brewing for the upcoming MCE 10 year anniversary
by marioroy (Prior) on Oct 23, 2022 at 13:45 UTC
|
Behind the scene, I'm using MCE (chunk_size set to 1).
mce_foreach my $var ( @list ) { ... }
mce_foreach ( @list ) { ... }
mce_foreach_f my $var ( $path or $fh ) { ... }
mce_foreach_f ( $path or $fh ) { ... }
mce_foreach_s my $var ( @list ) { ... }
mce_foreach_s ( @list ) { ... }
The best part is the spawn and sync keywords.
# context scalar, array, or hash
spawn (my|our)* $var @var or %var = sub { codeblock };
spawn (my|our)* $var @var or %var = sub { func(@_) }, args;
spawn (my|our)* $var @var or %var = func(arg1, ...);
spawn (my|our)* $var @var or %var = func();
sync $var;
sync @var;
sync %var;
# anonymous code blocks
spawn $identVar or "quotedString", sub { codeblock };
spawn sub { codeblock };
sync;
| [reply] [d/l] [select] |
|
mce_forchunk
mce_forchunk_f
mce_forchunk_s
I will use MCE::Loop behind the scene. | [reply] [d/l] |
|
use MCE::Simple -strict, max_workers => 4, init_relay => '';
mce_forchunk (11..19) {
my ($mce, $chunk_ref, $chunk_id) = @_;
for my $i (@{ $chunk_ref }) {
MCE->say($i);
}
}
mce_forchunk (31..39) {
for my $i (@{ $_ }) {
MCE->say($i);
}
}
mce_forchunk my $chunk (51..59) {
for my $i (@{ $chunk }) {
MCE->say($i);
}
}
mce_forchunk my $chunk (71..79) {
my $output = "From chunk_id: ".MCE->chunk_id."\n";
for my $i (@{ $chunk }) {
$output .= "$i\n";
}
MCE::relay {
# write directly to STDOUT, not involve manager process
print $output;
};
}
Output
15
13
11
16
14
17
12
18
19
33
34
31
39
32
37
35
38
36
51
57
53
52
55
58
54
56
59
From chunk_id: 1
71
72
From chunk_id: 2
73
74
From chunk_id: 3
75
76
From chunk_id: 4
77
78
From chunk_id: 5
79
Spawn MCE asynchronously
MCE::Simple expands keywords at compile time to parallel code.
use MCE::Simple (
include => [qw/ -strict -signatures /],
max_workers => 4, init_relay => '',
);
MCE::Simple->init(
on_finish => sub ( $pid, $exit, $ident, $signal, $error, @ret ) {
say "MCE job ** $ident ** completed; status $exit.";
print join('', @ret);
},
);
spawn "foo", sub {
my @results;
MCE::Simple->init(gather => \@results);
mce_forchunk my $chunk (91..99) {
my $output = "From chunk_id: ".MCE->chunk_id."\n";
for my $i (@{ $chunk }) {
$output .= "$i\n";
}
MCE::relay {
MCE->gather($output);
};
}
@results;
};
# do something else
sync;
Output
MCE job ** foo ** completed; status 0.
From chunk_id: 1
91
92
From chunk_id: 2
93
94
From chunk_id: 3
95
96
From chunk_id: 4
97
98
From chunk_id: 5
99
| [reply] [d/l] [select] |
|
Re: Code brewing for the upcoming MCE 10 year anniversary
by marioroy (Prior) on Oct 23, 2022 at 20:39 UTC
|
I'm still undecided about including the count method in MCE::Semaphore. Most platforms are supported.
The count method may be helpful for a future MCE::Barrior module. I obtained the FIONREAD value using QEMU. Oh what fun it was running a s390x emulated machine. Likewise for Alpha, AArch64, MIPS, PA-RISC, PowerPC, RISC-V, and SPARC.
What about the Haiku OS? Yep, this one too is supported :)
my $FIONREAD = do {
use Config qw(config_re);
my ($archname) = config_re('archname');
if ($archname =~ /(?:irix|mips)/i) {
# IRIX; Linux on MIPS
0x467f;
}
elsif ($archname =~ /(?:alpha|powerpc|ppc|sparc)/i) {
# OSF/1, Tru64; Linux on Alpha, PowerPC, SPARC
0x4004667f;
}
elsif ($^O =~ /(?:hp-?ux|linux)/i) {
# HP-UX; Linux on AArch64, ARM, PA-RISC, RISC-V, s390x, x86_64
0x541b;
}
elsif ($^O =~ /(?:aix|bsd|darwin|dragonfly|mswin|mingw|msys|cygwin
+|solaris)/i) {
# AIX, BSD derivatives, macOS, Windows/Cygwin, Solaris
0x4004667f;
}
elsif ($^O =~ /haiku/i) {
# Haiku (BeOS-like OS)
0xbe000001;
}
else {
##
# MCE::Semaphore::count() unimplemented in this platform.
# Pull request or email the author $^O, archname, and FIONREAD
+.
#
# $ perl -MConfig -le 'print $^O, ": ", $Config{archname}'
# $ python3 -c 'import termios; print(hex(termios.FIONREAD))'
#
# $ cat fionread.c
#
# #include <stdio.h>
# #include <sys/ioctl.h>
# int main() {
# printf("%#08lx\n", FIONREAD);
# return 0;
# }
#
# $ cc -o fionread fionread.c
# $ ./fionread
##
0x0;
}
};
# $sem->count
sub count {
die 'MCE::Semaphore::count() unimplemented in this platform'
unless $FIONREAD;
my $count = pack('L', 0);
ioctl($_[0]->{r_sock}, $FIONREAD, $count);
unpack('L', $count);
}
| [reply] [d/l] |
Re: Code brewing for the upcoming MCE 10 year anniversary
by marioroy (Prior) on Nov 04, 2022 at 06:01 UTC
|
I came across a wonderful site, by Jason Brownlee, demonstrating Barriers using Python. That's what I want for MCE::Barrier to accomplish. Hold on... I'm not finished yet. The MCE::Barrier, MCE::Semaphore, and MCE::Simple modules will be released once completing the documentation and test scripts.
The barrier wait method supports two modes; list-context or exception-raised (if aborted or timed out). Notice also, the spawn keyword being used in a loop. That works too :)
Barrier with a Timeout
# Perl replica of using a Barrier with a Timeout
# https://superfastpython.com/multiprocessing-barrier-in-python/
use MCE::Simple -strict, -signatures;
use MCE::Barrier;
use Time::HiRes 'sleep';
use Try::Tiny;
STDOUT->autoflush(1);
sub task ($barrier, $number) {
# generate a unique value and block for a moment
my $value = rand() * 10; sleep($value);
# report result
say "Process ${number} done, got: ${value}";
# wait on all other processes to complete
# my ($remaining, $error) = $barrier->wait;
# or via exception handling
try {
$barrier->wait;
}
catch {
# pass
};
}
sub main () {
# create a barrier plus one for the main process
my $barrier = MCE::Barrier->new(5 + 1);
# create the worker processes
spawn task($barrier, $_) for 1..5;
# wait for all processes to finish
# my ($remaining, $error) = $barrier->wait;
# or via exception handling
try {
$barrier->wait(timeout => 5);
say "All processes have their result";
}
catch {
# $_ = "Operation timed out ..."
say "Some processes did not finish on time...";
};
# reap the worker processes
sync;
}
main();
__END__
Process 2 done, got: 2.20488202831806
Process 1 done, got: 4.51354761681618
Some processes did not finish on time...
Process 5 done, got: 5.27888526282371
Process 4 done, got: 7.58755085132183
Process 3 done, got: 9.89621643981994
Barrier with an Action
# Perl replica of using a Barrier with an Action
# https://superfastpython.com/multiprocessing-barrier-in-python/
use MCE::Simple -strict, -signatures;
use MCE::Barrier;
use Time::HiRes 'sleep';
STDOUT->autoflush(1);
sub report () {
# report once all processes are done
say "All processes have their result";
}
sub task ($barrier, $number) {
# generate a unique value and block for a moment
my $value = rand() * 10; sleep($value);
# report result
say "Process ${number} done, got: ${value}";
# wait on all other processes to complete
my ($remaining, $error) = $barrier->wait;
# an alternative to using the "action" argument
if ($remaining == 0) {
say "I, process ${number}, was last...";
}
}
sub main () {
# create a barrier, not including the main process
my $barrier = MCE::Barrier->new(5, before_release => \&report);
# create the worker processes
spawn task($barrier, $_) for 1..5;
# reap the worker processes
sync;
}
main();
__END__
Process 1 done, got: 1.25912054515542
Process 5 done, got: 2.02445819116296
Process 4 done, got: 4.33312377966107
Process 3 done, got: 6.64178936815919
Process 2 done, got: 8.9504549566573
All processes have their result
I, process 2, was last...
Aborting a Barrier
# Perl replica of Aborting a Barrier
# https://superfastpython.com/multiprocessing-barrier-in-python/
use MCE::Simple -strict, -signatures;
use MCE::Barrier;
use Time::HiRes 'sleep';
use Try::Tiny;
STDOUT->autoflush(1);
sub task ($barrier, $number) {
# generate a unique value and block for a moment
my $value = rand() * 10; sleep($value);
# report result
say "Process ${number} done, got: ${value}";
# check if the result was "bad"
if ($value > 8) {
say "Process ${number} aborting...";
$barrier->abort;
}
else {
# wait on all other processes to complete
# my ($remaining, $error) = $barrier->wait;
# or via exception handling
try {
$barrier->wait;
}
catch {
# pass
};
}
}
sub main () {
# create a barrier plus one for the main process
my $barrier = MCE::Barrier->new(5 + 1);
# create the worker processes
spawn task($barrier, $_) for 1..5;
# wait for all processes to finish
# my ($remaining, $error) = $barrier->wait;
# or via exception handling
try {
$barrier->wait();
say "All processes have their result";
}
catch {
# $_ = "Barrier aborted ..."
say "At least one process aborted due to bad results";
};
# reap the worker processes
sync;
}
main();
__END__
Process 1 done, got: 1.12563261603103
Process 5 done, got: 1.89097026203857
Process 4 done, got: 4.19963585053669
Process 3 done, got: 6.5083014390348
Process 2 done, got: 8.81696702753292
Process 2 aborting...
At least one process aborted due to bad results
| [reply] [d/l] [select] |
|
from sys import stdout
from multiprocessing import Process
from multiprocessing import Barrier
def task(barrier, number):
for i in range(1, 400 + 1):
print(f'{i}: {number}')
barrier.wait()
if __name__ == '__main__':
stdout.flush()
num_workers = 500
workers = list()
# create a barrier
barrier = Barrier(num_workers)
# create the worker processes
for i in range(1, num_workers + 1):
workers.append(Process(target=task, args=(barrier, i)))
workers[-1].start()
# wait for all processes to finish
for w in workers:
w.join()
Perl
This seems odd, I know. Folks cannot run this. MCE::Simple will be released soon. I want to share the performance uptake having a limiter.
use MCE::Simple -strict, -signatures;
use MCE::Barrier;
sub task ($barrier, $number) {
for (1 .. 400) {
say "$_: $number";
$barrier->wait;
}
}
sub main () {
STDOUT->autoflush(1);
my $num_workers = 500;
# create a barrier
my $barrier = MCE::Barrier->new($num_workers);
# create the worker processes
spawn task($barrier, $_) for 1 .. $num_workers;
# wait for all processes to finish
$_->join for MCE::Simple->list; # same as sync;
}
main();
Results
Python3 and Perl complete in 4.1 seconds. That was with the limiter commented out. The time is 1.5 seconds with the limiter enabled.
sub new {
...
# [10] limiter
(!$is_winenv && !$tid) ? MCE::Semaphore->new(3,1) : (),
...
}
sub _wait {
...
$obj->[10]->down if $obj->[10]; # limiter
my ($parties, $synced, $timed_out, $aborted) = $obj->_up($flag);
$obj->[10]->up if $obj->[10];
...
}
The other oddity is that I cannot spin 1,000 processes in Python. It complains about running out of file handles. Perl, no problem! | [reply] [d/l] [select] |
|
| [reply] [d/l] |
|
|
Congratulations for your MCE module.
It complains about running out of file handles.
I thought that was OS specific, unless P. sets its own limits too?
| [reply] [d/l] |
|
I have always wanted a Barrier module to complement MCE::Child, MCE::Hobo, or threads. The module works on Unix, Cygwin, including Windows. Tonight, I enhanced MCE::Simple to support the spawn loop syntax.
# create the worker processes
spawn task($barrier, $_) for 1..5;
Which is transposed to the following. The ident argument defaults to blank '' if omitted.
MCE::Simple::_reap_joinable();
do { MCE::Simple::_async_i('', \&task, $barrier, $_) } for 1..5;
At this moment, the code is 100% completed in MCE::Barrier, MCE::Semaphore, and MCE::Simple. It's a relief. I tested MCE::Barrier on Linux with 4,000 workers, trying to break it :). All the one's come first, the two's, followed by three's and so on. Workers wait for all parties to reach the barrier.
use strict;
use warnings;
use MCE;
use MCE::Barrier;
use Time::HiRes qw(time);
my $num_workers = 4000;
my $barrier = MCE::Barrier->new($num_workers);
my $start = time();
MCE->new(
max_workers => $num_workers,
user_func => sub {
my $id = MCE->wid;
for (1..400) {
MCE->say("$_: $id");
$barrier->wait;
}
}
)->run();
printf {*STDERR} "\nduration: %0.3f\n\n", time() - $start;
The sync implementation in MCE is two stages; sync and unsync. MCE::Barrier's implementation is one stage with alternating semaphores (1 or 2) k = 3 - k. A limiter prevents many workers attempting read IO concurrently or simultaneously. The result is a fast implementation no matter the number of parties. | [reply] [d/l] [select] |
Re: Code brewing for the upcoming MCE 10 year anniversary
by marioroy (Prior) on Nov 11, 2022 at 11:25 UTC
|
I completed usability testing,... The following is a demonstration running MCE in the foreground and background. The spawn keyword is also used for spinning a child process per each element.
use MCE::Simple -strict, (
spawn_limit => 2, # MCE::Child processes
max_workers => 2, # number of MCE workers
init_relay => 1, # define to enable MCE::relay
);
# Run MCE in the foreground.
mce_foreach my $i (10..18) {
MCE::relay {
# output orderly
say "$$: $i";
};
}
# Run MCE in the background.
spawn sub {
mce_foreach my $i (20..28) {
MCE::relay {
say "$$: $i";
};
}
};
sync;
# Spin a worker per each input element.
# Up to "spawn_limit" will run simultaneously.
# Blocking otherwise, until next availability.
spawn my $i (30..38) {
say "$$: $i";
}
sync;
output:
60264: 10 # alternating PIDs
60265: 11 # two MCE workers
60264: 12
60265: 13
60264: 14
60265: 15
60264: 16
60265: 17
60264: 18
60267: 20 # alternating PIDs
60268: 21 # two MCE workers
60267: 22
60268: 23
60267: 24
60268: 25
60267: 26
60268: 27
60267: 28
60269: 30 # spawn keyword
60270: 31 # unique PIDs
60271: 32
60272: 33
60273: 34
60274: 35
60275: 36
60276: 37
60277: 38
spawn keyword:
spawn my $i (30..38) {
say "$$: $i";
}
# filtered/transposed to this, without altering line no.
foreach my $i (30..38) { MCE::Simple::_spawn_a('', sub {
say "$$: $i";
}); }
| [reply] [d/l] [select] |
|
Update: Changed to "spawn" without the "_each" suffix. Thank you, choroba.
I updated the prior example. Seeing "spawneach" made me rename it to "spawn_each". I can leave out the "_each" suffix, but preferred to be clear that "spawn_each" will be spinning a child process per each element.
# serial code
foreach my $i (30..38) {
say "$$: $i";
}
# this seems clear?
spawn_each my $i (30..38) {
say "$$: $i";
}
# will this confuse folks?
spawn my $i (30..38) {
say "$$: $i";
}
Which one do you prefer (spawn_each, spawn) or something else? The "spawn" without the "_each" suffix is elegant, but wonder if folks will think not spawning a child process per element.
MCE::Simple is quite nice. It does not alter the line count when filtering / transposing to parallel code.
| [reply] [d/l] |
|
| [reply] [d/l] |
|