Beefy Boxes and Bandwidth Generously Provided by pair Networks
There's more than one way to do things
 
PerlMonks  

Perl Multi Processing.

by pritesh_ugrankar (Monk)
on Dec 05, 2021 at 17:14 UTC ( [id://11139408]=perlquestion: print w/replies, xml ) Need Help??

pritesh_ugrankar has asked for the wisdom of the Perl Monks concerning the following question:

Respected Monks,

Original question: I have some basic/intermediate Perl exposure. I am interested in parallel processing. Kindly let me know some good references.

Edited: Respected Monks, I have been writing Perl Scripts here and there to get some automation work done. I was always interested in Parallel Processing (Which I understand does not necessarily mean multi threading). I have very basic understanding of Linux. I am looking for a tutorial on parallel processing with some real time examples.

While there is no immediate need for me to get into parallel processing, I just need to learn it for the sake of learning. So that when the time comes I would have understood at least the basics of it and take it ahead from there. I would be very thankful for any help/pointers in that direction.

Replies are listed 'Best First'.
Re: Perl Multi Processing.
by kcott (Archbishop) on Dec 05, 2021 at 19:20 UTC
Re: Perl Multi Processing.
by hippo (Bishop) on Dec 05, 2021 at 21:29 UTC
Re: Perl Multi Processing.
by eyepopslikeamosquito (Archbishop) on Dec 05, 2021 at 21:49 UTC
Re: Perl Multi Processing.
by Preceptor (Deacon) on Dec 06, 2021 at 12:02 UTC

    There are, IMO, 3 core models of parallel processing in perl. Each have upsides and downsides, so I'll outline them with examples;

    Threads

    Threads in perl are not lightweight, so put aside any understanding of threads you have from other languages. This misconception is why threads are often considered 'bad'. It's not appropriate to spawn lots and lots of threads - that's very inefficient - but for a queue and workers model, it works really well.

    It's quite well suited for IPC, meaning that passing things around to parallelise is easy. So for example, we can use Thread::Queue to go with threads:

    #!/usr/bin/env perl use strict; use warnings; use threads; use Thread::Queue; use IO::Socket; my $nthreads = 20; my $in_file2 = 'rang.txt'; my $work_q = Thread::Queue->new; my $result_q = Thread::Queue->new; sub ip_checker { while ( my $ip = $work_q->dequeue ) { chomp($ip); $host = IO::Socket::INET->new( PeerAddr => $ip, PeerPort => 80, proto => 'tcp', Timeout => 1 ); if ( defined $host ) { $result_q->enqueue($ip); } } } sub file_writer { open( my $output_fh, ">>", "port.txt" ) or die $!; while ( my $ip = $result_q->dequeue ) { print {$output_fh} "$ip\n"; } close($output_fh); } for ( 1 .. $nthreads ) { push( @workers, threads->create( \&ip_checker ) ); } my $writer = threads->create( \&file_writer ); open( my $dat, "<", $in_file2 ) or die $!; $work_q->enqueue(<$dat>); close($dat); $work_q->end; foreach my $thr (@workers) { $thr->join(); } $result_q->end; $writer->join();

    This uses a queue to feed a set of (20) worker threads with an IP list, and work their way through them, collating and printing results through the `writer` thread.

    Forking

    The second approach to parallelism in perl is via forking. Forks are more efficient if you're planning on spinning up more of them in 'disposable' subprocesses, but they're harder to IPC than threads

    You can just use fork() on it's own, but I think it's far better to use the excellent Parallel::ForkManager library.

    Doing approximately the same as the above:

    #!/usr/bin/env perl use strict; use warnings; use Fcntl qw ( :flock ); use IO::Socket; my $in_file2 = 'rang.txt'; open( my $input, "<", $in_file2 ) or die $!; open( my $output, ">", "port.txt" ) or die $!; my $manager = Parallel::ForkManager->new(20); foreach my $ip (<$input>) { $manager->start and next; chomp($ip); my $host = IO::Socket::INET->new( PeerAddr => $ip, PeerPort => 80, proto => 'tcp', Timeout => 1 ); if ( defined $host ) { flock( $output, LOCK_EX ); #exclusive or write lock print {$output} $ip, "\n"; flock( $output, LOCK_UN ); #unlock } $manager->finish; } $manager->wait_all_children; close($output); close($input);

    You need to be particularly careful of file IO when multiprocessing, because the whole point is your execution sequence is no longer well defined. So it's insanely easy to end up with different threads clobbering files that another thread has open, but hasn't flushed to disk.

    But in both multiprocessing paradigms I outlined above (there are others, these are the most common) you still have to deal with the file IO serialisation. Note that your 'results' will be in a random order in both, because it'll very much depend on when the task completes. If that's important to you, then you'll need to collate and sort after your threads or forks complete.

    Non Blocking IP

    More a subset of parallel - but you can use something like IO::Select and/or IPC::Run - this allows you to open subprocesses, and read/write to them asynchronously. That's sometimes 'parallel enough' for your use case. IMO it's often the IO element of parallel that you want to run more efficiently, and that generally doesn't benefit from parallelism particularly anyway - you don't get more IO from a socket just by hammering it 5x in parallel, if anything you slow it down. If that's of particular interest, I'll try and mash up some example code to do that for you, but the thing I used from the above isn't particularly 'useful' for that.

      For Perl lacking threads support (i.e. not compiled with threads enabled), one may run threads-like code using MCE::Hobo. Here is similar code using MCE::Hobo and MCE::Shared.

      #!/usr/bin/env perl use strict; use warnings; use MCE::Hobo; use MCE::Shared; use IO::Socket; my $nthreads = 20; my $in_file2 = 'rang.txt'; my $work_q = MCE::Shared->queue; my $result_q = MCE::Shared->queue; sub ip_checker { while ( my $ip = $work_q->dequeue ) { chomp($ip); my $host = IO::Socket::INET->new( PeerAddr => $ip, PeerPort => 80, proto => 'tcp', Timeout => 1 ); if ( defined $host ) { $result_q->enqueue($ip); } } } sub file_writer { open( my $output_fh, ">>", "port.txt" ) or die $!; while ( my $ip = $result_q->dequeue ) { print {$output_fh} "$ip\n"; } close($output_fh); } my @workers; for ( 1 .. $nthreads ) { push( @workers, MCE::Hobo->create( \&ip_checker ) ); } my $writer = MCE::Hobo->create( \&file_writer ); open( my $dat, "<", $in_file2 ) or die $!; $work_q->enqueue(<$dat>); close($dat); $work_q->end; foreach my $thr (@workers) { $thr->join(); } $result_q->end; $writer->join();

      MCE::Child is also threads-like, but does not require MCE::Shared to run. The complementary module for exchanging data is MCE::Channel.

      #!/usr/bin/env perl use strict; use warnings; use MCE::Child; use MCE::Channel; use IO::Socket; my $nthreads = 20; my $in_file2 = 'rang.txt'; my $work_q = MCE::Channel->new; my $result_q = MCE::Channel->new; sub ip_checker { while ( my $ip = $work_q->dequeue ) { chomp($ip); my $host = IO::Socket::INET->new( PeerAddr => $ip, PeerPort => 80, proto => 'tcp', Timeout => 1 ); if ( defined $host ) { $result_q->enqueue($ip); } } } sub file_writer { open( my $output_fh, ">>", "port.txt" ) or die $!; while ( my $ip = $result_q->dequeue ) { print {$output_fh} "$ip\n"; } close($output_fh); } my @workers; for ( 1 .. $nthreads ) { push( @workers, MCE::Child->create( \&ip_checker ) ); } my $writer = MCE::Child->create( \&file_writer ); open( my $dat, "<", $in_file2 ) or die $!; $work_q->enqueue(<$dat>); close($dat); $work_q->end; foreach my $thr (@workers) { $thr->join(); } $result_q->end; $writer->join();

      The modules are interchangeable. One may use threads with MCE::Shared or MCE::Channel. Ditto for using Parallel::ForkManager with MCE::Shared or MCE::Channel. Here, we have a shared array.

      #!/usr/bin/env perl use strict; use warnings; use Parallel::ForkManager; use MCE::Shared; use IO::Socket; my $in_file2 = 'rang.txt'; open( my $input, "<", $in_file2 ) or die $!; my $result = MCE::Shared->array; my $manager = Parallel::ForkManager->new(20); foreach my $ip (<$input>) { $manager->start and next; MCE::Shared->init(); # Optional # This is called automatically for threads, MCE, MCE::Hobo, # and MCE::Child. Calling MCE::Shared->init assigns a data # channel in a round-robin fashion to ForkManager workers. # Omitting init has the effect of 1 data channel versus 10. chomp($ip); my $host = IO::Socket::INET->new( PeerAddr => $ip, PeerPort => 80, proto => 'tcp', Timeout => 1 ); if ( defined $host ) { $result->push($ip); } $manager->finish; } $manager->wait_all_children; close($input); # move shared array to unshared array reference $result = $result->destroy({ unbless => 1 }); open( my $output, ">", "port.txt" ) or die $!; print {$output} "$_\n" for @{ $result }; close($output);

      The shared array resides under the shared-manager process. This is where shared data resides. Upon completion, one can export the data or export-destroy via $result->destroy. The object is MCE::Shared::Array or an array reference [] for unbless => 1. Not exporting will involve making individual trips per each item. One can call $result->values which exports the values as well.

        Here is MCE::Flow where workers read from the input file directly. MCE->gather is called to append to an array.

        #!/usr/bin/env perl use strict; use warnings; use MCE::Flow; use IO::Socket; my $nthreads = 20; my $in_file2 = 'rang.txt'; sub ip_checker { my $ip = $_; chomp($ip); my $host = IO::Socket::INET->new( PeerAddr => $ip, PeerPort => 80, proto => 'tcp', Timeout => 1 ); if ( defined $host ) { MCE->gather($ip); } } my @result = MCE::Flow->run_file( { chunk_size => 1, max_workers => $nthreads }, \&ip_checker, $in_file2, ); MCE::Flow->finish(); open( my $output, ">", "port.txt" ) or die $!; print {$output} "$_\n" for @result; close($output);

        For this one, workers write serially to the output file via MCE->print.

        use strict; use warnings; use MCE::Flow; use IO::Socket; my $nthreads = 20; my $in_file2 = 'rang.txt'; open( my $output, ">", "port.txt" ) or die $!; sub ip_checker { my $ip = $_; chomp($ip); my $host = IO::Socket::INET->new( PeerAddr => $ip, PeerPort => 80, proto => 'tcp', Timeout => 1 ); if ( defined $host ) { MCE->print($output, "$ip\n"); } } MCE::Flow->run_file( { chunk_size => 1, max_workers => $nthreads }, \&ip_checker, $in_file2, ); MCE::Flow->finish(); close($output);

        Perl provides various ways to concur parallel processing. There is Parallel::ForkManager. For threads-like API, there is MCE::Child and MCE::Hobo. MCE itself provides chunking capabilities. It has sugar syntax to gather into an array or hash. For output, MCE->print, MCE->printf, and MCE->say write serially to a file handle (default STDOUT if omitted).

        Basically, no problem if the Perl binary lacks threads support.

Re: Perl Multi Processing.
by eyepopslikeamosquito (Archbishop) on Dec 07, 2021 at 00:20 UTC

      Chunking complements event-driven programming. For example reaching out to millions of devices.

      use strict; use warnings; use MCE::Flow; use MCE::Queue; use AnyEvent::SNMP; my $work_q = MCE::Queue->new; my $nthreads = 100; my $chunk_size = 300; sub consumer { while ( defined ( my $data = $work_q->dequeue ) ) { MCE->printf("wid %3d, size %d\n", MCE->wid, scalar(@$data)); MCE->yield; # Optional # The delay occurs serially among workers. # It is helpful to not all blast simultaneously. # do event-driven programming to 300 max items # ... } } sub provider { my @input = (1 .. 20_000); while (@input) { my @data = splice @input, 0, $chunk_size; $work_q->enqueue(\@data); } $work_q->end; } MCE::Flow->run( # 100 consumers, 1 provider { max_workers => [ $nthreads, 1 ], interval => {delay => 0.008} }, \&consumer, \&provider, );

      It depends on the app. I tend to set nthreads to about 4x the number of logical cores on the box. The delay is serially behind the scene. Each worker waits it's turn in milliseconds before initiating to the OS eventIO to hundreds of devices.

Re: Perl Multi Processing.
by alexander_lunev (Pilgrim) on Dec 05, 2021 at 18:13 UTC

    Hello!

    It's hard to make answer that will cover such laconic question, so I'll try to use telepathy here: I think you should start with threads and Thread::Queue. It's core modules, I used them for parallel processing, and they're works as expected.

      Hi,

      Thank you. In that case, I will edit my original question to be more specific.

Re: Perl Multi Processing.
by ForgotPasswordAgain (Priest) on Dec 06, 2021 at 18:54 UTC
Re: Perl Multi Processing.
by karlgoethebier (Abbot) on Dec 06, 2021 at 12:30 UTC

    Starting here might be inspiring. Regards, Karl

    «The Crux of the Biscuit is the Apostrophe»

Re: Perl Multi Processing.
by beautyfulman (Sexton) on Dec 06, 2021 at 21:35 UTC

    Log In?
    Username:
    Password:

    What's my password?
    Create A New User
    Domain Nodelet?
    Node Status?
    node history
    Node Type: perlquestion [id://11139408]
    Approved by Corion
    Front-paged by Discipulus
    help
    Chatterbox?
    and the web crawler heard nothing...

    How do I use this?Last hourOther CB clients
    Other Users?
    Others examining the Monastery: (5)
    As of 2024-04-16 10:59 GMT
    Sections?
    Information?
    Find Nodes?
    Leftovers?
      Voting Booth?

      No recent polls found