Beefy Boxes and Bandwidth Generously Provided by pair Networks
Problems? Is your data what you think it is?
 
PerlMonks  

Code brewing for the upcoming MCE 10 year anniversary

by marioroy (Prior)
on Oct 23, 2022 at 13:12 UTC ( [id://11147606]=perlmeditation: print w/replies, xml ) Need Help??

Greetings, all

The following is a glimpse of what's coming for MCE. There are two new modules; MCE::Semaphore and MCE::Simple. I completed the code tonight. Now, I need to finish the docs and more testing before releasing on Meta::CPAN.

MCE Simple

use MCE::Simple -strict, max_workers => 4; MCE::Simple->init( # mce options user_begin => sub { MCE->say("hello from ", MCE->wid); }, # spawn options on_finish => sub { my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_; say "@_"; }, ); mce_foreach my $i ( 1..10 ) { MCE->say(MCE->wid, ": $i * 2 = ", $i * 2); } spawn "Hello", sub { "one" }; spawn "There", sub { "two" }; foreach my $ident (qw/foo baz/) { spawn $ident, sub { my $text = "something from $$"; }; } sync; # clear or set options MCE::Simple->init(); sub fib { my $n = shift; return $n if $n < 2; spawn my $x = fib($n - 1); spawn my $y = fib($n - 2); sync $x; sync $y; return $x + $y; } say "fib(20) = ", fib(20);

Output

$ perl demo.pl hello from 1 hello from 2 hello from 3 hello from 4 3: 2 * 2 = 4 4: 1 * 2 = 2 2: 3 * 2 = 6 3: 5 * 2 = 10 4: 6 * 2 = 12 1: 4 * 2 = 8 4: 7 * 2 = 14 3: 8 * 2 = 16 2: 9 * 2 = 18 1: 10 * 2 = 20 10792 0 Hello 0 one 10793 0 There 0 two 10794 0 foo 0 something from 10794 10795 0 baz 0 something from 10795 fib(20) = 6765

MCE Semaphore

A fast pure-Perl implementation. Testing involves big number. Notice the mce_foreach_s keyword, for processing a sequence of numbers.

use MCE::Simple -strict, max_workers => 16; use MCE::Semaphore; use Time::HiRes 'time'; my $start = time; my $sem = MCE::Semaphore->new(8); mce_foreach_s ( 1 .. 1_000_000 ) { $sem->down; $sem->up; } printf "%0.3f seconds\n", time - $start;

Input file

mce_foreach_f ("/path/to/file") { MCE->print($_); } # what about file handles? no problem... open my $fh, "<", "/tmp/infile.txt" or die "open error: $!"; mce_foreach_f my $line ( $fh ) { MCE->print($line); } close $fh;

Replies are listed 'Best First'.
Re: Code brewing for the upcoming MCE 10 year anniversary
by marioroy (Prior) on Oct 23, 2022 at 13:45 UTC
    Behind the scene, I'm using MCE (chunk_size set to 1).
    mce_foreach my $var ( @list ) { ... } mce_foreach ( @list ) { ... } mce_foreach_f my $var ( $path or $fh ) { ... } mce_foreach_f ( $path or $fh ) { ... } mce_foreach_s my $var ( @list ) { ... } mce_foreach_s ( @list ) { ... }

    The best part is the spawn and sync keywords.

    # context scalar, array, or hash spawn (my|our)* $var @var or %var = sub { codeblock }; spawn (my|our)* $var @var or %var = sub { func(@_) }, args; spawn (my|our)* $var @var or %var = func(arg1, ...); spawn (my|our)* $var @var or %var = func(); sync $var; sync @var; sync %var; # anonymous code blocks spawn $identVar or "quotedString", sub { codeblock }; spawn sub { codeblock }; sync;

      Sometimes, the use-case calls for processing at the chunk level inside the code block. For example, the worker receives the chunk as a whole.

      mce_forchunk mce_forchunk_f mce_forchunk_s

      I will use MCE::Loop behind the scene.

        The mce_forchunk keywords are working. Sometimes, it's preferably to process at the chunk level. So, here it is.

        mce_forchunk

        use MCE::Simple -strict, max_workers => 4, init_relay => ''; mce_forchunk (11..19) { my ($mce, $chunk_ref, $chunk_id) = @_; for my $i (@{ $chunk_ref }) { MCE->say($i); } } mce_forchunk (31..39) { for my $i (@{ $_ }) { MCE->say($i); } } mce_forchunk my $chunk (51..59) { for my $i (@{ $chunk }) { MCE->say($i); } } mce_forchunk my $chunk (71..79) { my $output = "From chunk_id: ".MCE->chunk_id."\n"; for my $i (@{ $chunk }) { $output .= "$i\n"; } MCE::relay { # write directly to STDOUT, not involve manager process print $output; }; }

        Output

        15 13 11 16 14 17 12 18 19 33 34 31 39 32 37 35 38 36 51 57 53 52 55 58 54 56 59 From chunk_id: 1 71 72 From chunk_id: 2 73 74 From chunk_id: 3 75 76 From chunk_id: 4 77 78 From chunk_id: 5 79

        Spawn MCE asynchronously

        MCE::Simple expands keywords at compile time to parallel code.

        use MCE::Simple ( include => [qw/ -strict -signatures /], max_workers => 4, init_relay => '', ); MCE::Simple->init( on_finish => sub ( $pid, $exit, $ident, $signal, $error, @ret ) { say "MCE job ** $ident ** completed; status $exit."; print join('', @ret); }, ); spawn "foo", sub { my @results; MCE::Simple->init(gather => \@results); mce_forchunk my $chunk (91..99) { my $output = "From chunk_id: ".MCE->chunk_id."\n"; for my $i (@{ $chunk }) { $output .= "$i\n"; } MCE::relay { MCE->gather($output); }; } @results; }; # do something else sync;

        Output

        MCE job ** foo ** completed; status 0. From chunk_id: 1 91 92 From chunk_id: 2 93 94 From chunk_id: 3 95 96 From chunk_id: 4 97 98 From chunk_id: 5 99
Re: Code brewing for the upcoming MCE 10 year anniversary
by marioroy (Prior) on Oct 23, 2022 at 20:39 UTC

    I'm still undecided about including the count method in MCE::Semaphore. Most platforms are supported.

    The count method may be helpful for a future MCE::Barrior module. I obtained the FIONREAD value using QEMU. Oh what fun it was running a s390x emulated machine. Likewise for Alpha, AArch64, MIPS, PA-RISC, PowerPC, RISC-V, and SPARC.

    What about the Haiku OS? Yep, this one too is supported :)

    my $FIONREAD = do { use Config qw(config_re); my ($archname) = config_re('archname'); if ($archname =~ /(?:irix|mips)/i) { # IRIX; Linux on MIPS 0x467f; } elsif ($archname =~ /(?:alpha|powerpc|ppc|sparc)/i) { # OSF/1, Tru64; Linux on Alpha, PowerPC, SPARC 0x4004667f; } elsif ($^O =~ /(?:hp-?ux|linux)/i) { # HP-UX; Linux on AArch64, ARM, PA-RISC, RISC-V, s390x, x86_64 0x541b; } elsif ($^O =~ /(?:aix|bsd|darwin|dragonfly|mswin|mingw|msys|cygwin +|solaris)/i) { # AIX, BSD derivatives, macOS, Windows/Cygwin, Solaris 0x4004667f; } elsif ($^O =~ /haiku/i) { # Haiku (BeOS-like OS) 0xbe000001; } else { ## # MCE::Semaphore::count() unimplemented in this platform. # Pull request or email the author $^O, archname, and FIONREAD +. # # $ perl -MConfig -le 'print $^O, ": ", $Config{archname}' # $ python3 -c 'import termios; print(hex(termios.FIONREAD))' # # $ cat fionread.c # # #include <stdio.h> # #include <sys/ioctl.h> # int main() { # printf("%#08lx\n", FIONREAD); # return 0; # } # # $ cc -o fionread fionread.c # $ ./fionread ## 0x0; } }; # $sem->count sub count { die 'MCE::Semaphore::count() unimplemented in this platform' unless $FIONREAD; my $count = pack('L', 0); ioctl($_[0]->{r_sock}, $FIONREAD, $count); unpack('L', $count); }
Re: Code brewing for the upcoming MCE 10 year anniversary
by marioroy (Prior) on Nov 04, 2022 at 06:01 UTC

    I came across a wonderful site, by Jason Brownlee, demonstrating Barriers using Python. That's what I want for MCE::Barrier to accomplish. Hold on... I'm not finished yet. The MCE::Barrier, MCE::Semaphore, and MCE::Simple modules will be released once completing the documentation and test scripts.

    The barrier wait method supports two modes; list-context or exception-raised (if aborted or timed out). Notice also, the spawn keyword being used in a loop. That works too :)

    Barrier with a Timeout

    # Perl replica of using a Barrier with a Timeout # https://superfastpython.com/multiprocessing-barrier-in-python/ use MCE::Simple -strict, -signatures; use MCE::Barrier; use Time::HiRes 'sleep'; use Try::Tiny; STDOUT->autoflush(1); sub task ($barrier, $number) { # generate a unique value and block for a moment my $value = rand() * 10; sleep($value); # report result say "Process ${number} done, got: ${value}"; # wait on all other processes to complete # my ($remaining, $error) = $barrier->wait; # or via exception handling try { $barrier->wait; } catch { # pass }; } sub main () { # create a barrier plus one for the main process my $barrier = MCE::Barrier->new(5 + 1); # create the worker processes spawn task($barrier, $_) for 1..5; # wait for all processes to finish # my ($remaining, $error) = $barrier->wait; # or via exception handling try { $barrier->wait(timeout => 5); say "All processes have their result"; } catch { # $_ = "Operation timed out ..." say "Some processes did not finish on time..."; }; # reap the worker processes sync; } main(); __END__ Process 2 done, got: 2.20488202831806 Process 1 done, got: 4.51354761681618 Some processes did not finish on time... Process 5 done, got: 5.27888526282371 Process 4 done, got: 7.58755085132183 Process 3 done, got: 9.89621643981994

    Barrier with an Action

    # Perl replica of using a Barrier with an Action # https://superfastpython.com/multiprocessing-barrier-in-python/ use MCE::Simple -strict, -signatures; use MCE::Barrier; use Time::HiRes 'sleep'; STDOUT->autoflush(1); sub report () { # report once all processes are done say "All processes have their result"; } sub task ($barrier, $number) { # generate a unique value and block for a moment my $value = rand() * 10; sleep($value); # report result say "Process ${number} done, got: ${value}"; # wait on all other processes to complete my ($remaining, $error) = $barrier->wait; # an alternative to using the "action" argument if ($remaining == 0) { say "I, process ${number}, was last..."; } } sub main () { # create a barrier, not including the main process my $barrier = MCE::Barrier->new(5, before_release => \&report); # create the worker processes spawn task($barrier, $_) for 1..5; # reap the worker processes sync; } main(); __END__ Process 1 done, got: 1.25912054515542 Process 5 done, got: 2.02445819116296 Process 4 done, got: 4.33312377966107 Process 3 done, got: 6.64178936815919 Process 2 done, got: 8.9504549566573 All processes have their result I, process 2, was last...

    Aborting a Barrier

    # Perl replica of Aborting a Barrier # https://superfastpython.com/multiprocessing-barrier-in-python/ use MCE::Simple -strict, -signatures; use MCE::Barrier; use Time::HiRes 'sleep'; use Try::Tiny; STDOUT->autoflush(1); sub task ($barrier, $number) { # generate a unique value and block for a moment my $value = rand() * 10; sleep($value); # report result say "Process ${number} done, got: ${value}"; # check if the result was "bad" if ($value > 8) { say "Process ${number} aborting..."; $barrier->abort; } else { # wait on all other processes to complete # my ($remaining, $error) = $barrier->wait; # or via exception handling try { $barrier->wait; } catch { # pass }; } } sub main () { # create a barrier plus one for the main process my $barrier = MCE::Barrier->new(5 + 1); # create the worker processes spawn task($barrier, $_) for 1..5; # wait for all processes to finish # my ($remaining, $error) = $barrier->wait; # or via exception handling try { $barrier->wait(); say "All processes have their result"; } catch { # $_ = "Barrier aborted ..." say "At least one process aborted due to bad results"; }; # reap the worker processes sync; } main(); __END__ Process 1 done, got: 1.12563261603103 Process 5 done, got: 1.89097026203857 Process 4 done, got: 4.19963585053669 Process 3 done, got: 6.5083014390348 Process 2 done, got: 8.81696702753292 Process 2 aborting... At least one process aborted due to bad results

      Now, I wonder about Python vs Perl. And then, something wonderful...

      Python

      from sys import stdout from multiprocessing import Process from multiprocessing import Barrier def task(barrier, number): for i in range(1, 400 + 1): print(f'{i}: {number}') barrier.wait() if __name__ == '__main__': stdout.flush() num_workers = 500 workers = list() # create a barrier barrier = Barrier(num_workers) # create the worker processes for i in range(1, num_workers + 1): workers.append(Process(target=task, args=(barrier, i))) workers[-1].start() # wait for all processes to finish for w in workers: w.join()

      Perl

      This seems odd, I know. Folks cannot run this. MCE::Simple will be released soon. I want to share the performance uptake having a limiter.

      use MCE::Simple -strict, -signatures; use MCE::Barrier; sub task ($barrier, $number) { for (1 .. 400) { say "$_: $number"; $barrier->wait; } } sub main () { STDOUT->autoflush(1); my $num_workers = 500; # create a barrier my $barrier = MCE::Barrier->new($num_workers); # create the worker processes spawn task($barrier, $_) for 1 .. $num_workers; # wait for all processes to finish $_->join for MCE::Simple->list; # same as sync; } main();

      Results

      Python3 and Perl complete in 4.1 seconds. That was with the limiter commented out. The time is 1.5 seconds with the limiter enabled.

      sub new { ... # [10] limiter (!$is_winenv && !$tid) ? MCE::Semaphore->new(3,1) : (), ... } sub _wait { ... $obj->[10]->down if $obj->[10]; # limiter my ($parties, $synced, $timed_out, $aborted) = $obj->_up($flag); $obj->[10]->up if $obj->[10]; ... }

      The other oddity is that I cannot spin 1,000 processes in Python. It complains about running out of file handles. Perl, no problem!

        Using strace to watch what python is doing under the hood might give insight to the fd limit. Maybe it’s doing a dup or pipe behind the scenes for some sort of reason.

        The cake is a lie.
        The cake is a lie.
        The cake is a lie.

        Congratulations for your MCE module.

         It complains about running out of file handles.

        I thought that was OS specific, unless P. sets its own limits too?

      I have always wanted a Barrier module to complement MCE::Child, MCE::Hobo, or threads. The module works on Unix, Cygwin, including Windows. Tonight, I enhanced MCE::Simple to support the spawn loop syntax.

      # create the worker processes spawn task($barrier, $_) for 1..5;

      Which is transposed to the following. The ident argument defaults to blank '' if omitted.

      MCE::Simple::_reap_joinable(); do { MCE::Simple::_async_i('', \&task, $barrier, $_) } for 1..5;

      At this moment, the code is 100% completed in MCE::Barrier, MCE::Semaphore, and MCE::Simple. It's a relief. I tested MCE::Barrier on Linux with 4,000 workers, trying to break it :). All the one's come first, the two's, followed by three's and so on. Workers wait for all parties to reach the barrier.

      use strict; use warnings; use MCE; use MCE::Barrier; use Time::HiRes qw(time); my $num_workers = 4000; my $barrier = MCE::Barrier->new($num_workers); my $start = time(); MCE->new( max_workers => $num_workers, user_func => sub { my $id = MCE->wid; for (1..400) { MCE->say("$_: $id"); $barrier->wait; } } )->run(); printf {*STDERR} "\nduration: %0.3f\n\n", time() - $start;

      The sync implementation in MCE is two stages; sync and unsync. MCE::Barrier's implementation is one stage with alternating semaphores (1 or 2) k = 3 - k. A limiter prevents many workers attempting read IO concurrently or simultaneously. The result is a fast implementation no matter the number of parties.

Re: Code brewing for the upcoming MCE 10 year anniversary
by marioroy (Prior) on Nov 11, 2022 at 11:25 UTC

    I completed usability testing,... The following is a demonstration running MCE in the foreground and background. The spawn keyword is also used for spinning a child process per each element.

    use MCE::Simple -strict, ( spawn_limit => 2, # MCE::Child processes max_workers => 2, # number of MCE workers init_relay => 1, # define to enable MCE::relay ); # Run MCE in the foreground. mce_foreach my $i (10..18) { MCE::relay { # output orderly say "$$: $i"; }; } # Run MCE in the background. spawn sub { mce_foreach my $i (20..28) { MCE::relay { say "$$: $i"; }; } }; sync; # Spin a worker per each input element. # Up to "spawn_limit" will run simultaneously. # Blocking otherwise, until next availability. spawn my $i (30..38) { say "$$: $i"; } sync;

    output:

    60264: 10 # alternating PIDs 60265: 11 # two MCE workers 60264: 12 60265: 13 60264: 14 60265: 15 60264: 16 60265: 17 60264: 18 60267: 20 # alternating PIDs 60268: 21 # two MCE workers 60267: 22 60268: 23 60267: 24 60268: 25 60267: 26 60268: 27 60267: 28 60269: 30 # spawn keyword 60270: 31 # unique PIDs 60271: 32 60272: 33 60273: 34 60274: 35 60275: 36 60276: 37 60277: 38

    spawn keyword:

    spawn my $i (30..38) { say "$$: $i"; } # filtered/transposed to this, without altering line no. foreach my $i (30..38) { MCE::Simple::_spawn_a('', sub { say "$$: $i"; }); }

      Update: Changed to "spawn" without the "_each" suffix. Thank you, choroba.

      I updated the prior example. Seeing "spawneach" made me rename it to "spawn_each". I can leave out the "_each" suffix, but preferred to be clear that "spawn_each" will be spinning a child process per each element.

      # serial code foreach my $i (30..38) { say "$$: $i"; } # this seems clear? spawn_each my $i (30..38) { say "$$: $i"; } # will this confuse folks? spawn my $i (30..38) { say "$$: $i"; }

      Which one do you prefer (spawn_each, spawn) or something else? The "spawn" without the "_each" suffix is elegant, but wonder if folks will think not spawning a child process per element.

      MCE::Simple is quite nice. It does not alter the line count when filtering / transposing to parallel code.

        Hi Mario

        Maybe spawn_for_each ... ?


        The way forward always starts with a minimal test.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: perlmeditation [id://11147606]
Approved by 1nickt
Front-paged by 1nickt
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others admiring the Monastery: (3)
As of 2024-04-16 20:50 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found