Beefy Boxes and Bandwidth Generously Provided by pair Networks
more useful options
 
PerlMonks  

Re: Code brewing for the upcoming MCE 10 year anniversary

by marioroy (Prior)
on Nov 04, 2022 at 06:01 UTC ( [id://11147958]=note: print w/replies, xml ) Need Help??


in reply to Code brewing for the upcoming MCE 10 year anniversary

I came across a wonderful site, by Jason Brownlee, demonstrating Barriers using Python. That's what I want for MCE::Barrier to accomplish. Hold on... I'm not finished yet. The MCE::Barrier, MCE::Semaphore, and MCE::Simple modules will be released once completing the documentation and test scripts.

The barrier wait method supports two modes; list-context or exception-raised (if aborted or timed out). Notice also, the spawn keyword being used in a loop. That works too :)

Barrier with a Timeout

# Perl replica of using a Barrier with a Timeout # https://superfastpython.com/multiprocessing-barrier-in-python/ use MCE::Simple -strict, -signatures; use MCE::Barrier; use Time::HiRes 'sleep'; use Try::Tiny; STDOUT->autoflush(1); sub task ($barrier, $number) { # generate a unique value and block for a moment my $value = rand() * 10; sleep($value); # report result say "Process ${number} done, got: ${value}"; # wait on all other processes to complete # my ($remaining, $error) = $barrier->wait; # or via exception handling try { $barrier->wait; } catch { # pass }; } sub main () { # create a barrier plus one for the main process my $barrier = MCE::Barrier->new(5 + 1); # create the worker processes spawn task($barrier, $_) for 1..5; # wait for all processes to finish # my ($remaining, $error) = $barrier->wait; # or via exception handling try { $barrier->wait(timeout => 5); say "All processes have their result"; } catch { # $_ = "Operation timed out ..." say "Some processes did not finish on time..."; }; # reap the worker processes sync; } main(); __END__ Process 2 done, got: 2.20488202831806 Process 1 done, got: 4.51354761681618 Some processes did not finish on time... Process 5 done, got: 5.27888526282371 Process 4 done, got: 7.58755085132183 Process 3 done, got: 9.89621643981994

Barrier with an Action

# Perl replica of using a Barrier with an Action # https://superfastpython.com/multiprocessing-barrier-in-python/ use MCE::Simple -strict, -signatures; use MCE::Barrier; use Time::HiRes 'sleep'; STDOUT->autoflush(1); sub report () { # report once all processes are done say "All processes have their result"; } sub task ($barrier, $number) { # generate a unique value and block for a moment my $value = rand() * 10; sleep($value); # report result say "Process ${number} done, got: ${value}"; # wait on all other processes to complete my ($remaining, $error) = $barrier->wait; # an alternative to using the "action" argument if ($remaining == 0) { say "I, process ${number}, was last..."; } } sub main () { # create a barrier, not including the main process my $barrier = MCE::Barrier->new(5, before_release => \&report); # create the worker processes spawn task($barrier, $_) for 1..5; # reap the worker processes sync; } main(); __END__ Process 1 done, got: 1.25912054515542 Process 5 done, got: 2.02445819116296 Process 4 done, got: 4.33312377966107 Process 3 done, got: 6.64178936815919 Process 2 done, got: 8.9504549566573 All processes have their result I, process 2, was last...

Aborting a Barrier

# Perl replica of Aborting a Barrier # https://superfastpython.com/multiprocessing-barrier-in-python/ use MCE::Simple -strict, -signatures; use MCE::Barrier; use Time::HiRes 'sleep'; use Try::Tiny; STDOUT->autoflush(1); sub task ($barrier, $number) { # generate a unique value and block for a moment my $value = rand() * 10; sleep($value); # report result say "Process ${number} done, got: ${value}"; # check if the result was "bad" if ($value > 8) { say "Process ${number} aborting..."; $barrier->abort; } else { # wait on all other processes to complete # my ($remaining, $error) = $barrier->wait; # or via exception handling try { $barrier->wait; } catch { # pass }; } } sub main () { # create a barrier plus one for the main process my $barrier = MCE::Barrier->new(5 + 1); # create the worker processes spawn task($barrier, $_) for 1..5; # wait for all processes to finish # my ($remaining, $error) = $barrier->wait; # or via exception handling try { $barrier->wait(); say "All processes have their result"; } catch { # $_ = "Barrier aborted ..." say "At least one process aborted due to bad results"; }; # reap the worker processes sync; } main(); __END__ Process 1 done, got: 1.12563261603103 Process 5 done, got: 1.89097026203857 Process 4 done, got: 4.19963585053669 Process 3 done, got: 6.5083014390348 Process 2 done, got: 8.81696702753292 Process 2 aborting... At least one process aborted due to bad results

Replies are listed 'Best First'.
Re^2: Code brewing for the upcoming MCE 10 year anniversary
by marioroy (Prior) on Nov 04, 2022 at 10:55 UTC

    Now, I wonder about Python vs Perl. And then, something wonderful...

    Python

    from sys import stdout from multiprocessing import Process from multiprocessing import Barrier def task(barrier, number): for i in range(1, 400 + 1): print(f'{i}: {number}') barrier.wait() if __name__ == '__main__': stdout.flush() num_workers = 500 workers = list() # create a barrier barrier = Barrier(num_workers) # create the worker processes for i in range(1, num_workers + 1): workers.append(Process(target=task, args=(barrier, i))) workers[-1].start() # wait for all processes to finish for w in workers: w.join()

    Perl

    This seems odd, I know. Folks cannot run this. MCE::Simple will be released soon. I want to share the performance uptake having a limiter.

    use MCE::Simple -strict, -signatures; use MCE::Barrier; sub task ($barrier, $number) { for (1 .. 400) { say "$_: $number"; $barrier->wait; } } sub main () { STDOUT->autoflush(1); my $num_workers = 500; # create a barrier my $barrier = MCE::Barrier->new($num_workers); # create the worker processes spawn task($barrier, $_) for 1 .. $num_workers; # wait for all processes to finish $_->join for MCE::Simple->list; # same as sync; } main();

    Results

    Python3 and Perl complete in 4.1 seconds. That was with the limiter commented out. The time is 1.5 seconds with the limiter enabled.

    sub new { ... # [10] limiter (!$is_winenv && !$tid) ? MCE::Semaphore->new(3,1) : (), ... } sub _wait { ... $obj->[10]->down if $obj->[10]; # limiter my ($parties, $synced, $timed_out, $aborted) = $obj->_up($flag); $obj->[10]->up if $obj->[10]; ... }

    The other oddity is that I cannot spin 1,000 processes in Python. It complains about running out of file handles. Perl, no problem!

      Using strace to watch what python is doing under the hood might give insight to the fd limit. Maybe it’s doing a dup or pipe behind the scenes for some sort of reason.

      The cake is a lie.
      The cake is a lie.
      The cake is a lie.

        On my box, the open files ulimit is 1024. No wonder !! Recently, I experienced the same issue running MCE with init_relay enabled. MCE::Relay requires MCE to configure a read-write channel per each worker.

      Congratulations for your MCE module.

       It complains about running out of file handles.

      I thought that was OS specific, unless P. sets its own limits too?

Re^2: Code brewing for the upcoming MCE 10 year anniversary
by marioroy (Prior) on Nov 04, 2022 at 06:53 UTC

    I have always wanted a Barrier module to complement MCE::Child, MCE::Hobo, or threads. The module works on Unix, Cygwin, including Windows. Tonight, I enhanced MCE::Simple to support the spawn loop syntax.

    # create the worker processes spawn task($barrier, $_) for 1..5;

    Which is transposed to the following. The ident argument defaults to blank '' if omitted.

    MCE::Simple::_reap_joinable(); do { MCE::Simple::_async_i('', \&task, $barrier, $_) } for 1..5;

    At this moment, the code is 100% completed in MCE::Barrier, MCE::Semaphore, and MCE::Simple. It's a relief. I tested MCE::Barrier on Linux with 4,000 workers, trying to break it :). All the one's come first, the two's, followed by three's and so on. Workers wait for all parties to reach the barrier.

    use strict; use warnings; use MCE; use MCE::Barrier; use Time::HiRes qw(time); my $num_workers = 4000; my $barrier = MCE::Barrier->new($num_workers); my $start = time(); MCE->new( max_workers => $num_workers, user_func => sub { my $id = MCE->wid; for (1..400) { MCE->say("$_: $id"); $barrier->wait; } } )->run(); printf {*STDERR} "\nduration: %0.3f\n\n", time() - $start;

    The sync implementation in MCE is two stages; sync and unsync. MCE::Barrier's implementation is one stage with alternating semaphores (1 or 2) k = 3 - k. A limiter prevents many workers attempting read IO concurrently or simultaneously. The result is a fast implementation no matter the number of parties.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://11147958]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others wandering the Monastery: (7)
As of 2024-04-23 07:45 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found