in reply to Perl Multi Processing.

There are, IMO, 3 core models of parallel processing in perl. Each have upsides and downsides, so I'll outline them with examples;

Threads

Threads in perl are not lightweight, so put aside any understanding of threads you have from other languages. This misconception is why threads are often considered 'bad'. It's not appropriate to spawn lots and lots of threads - that's very inefficient - but for a queue and workers model, it works really well.

It's quite well suited for IPC, meaning that passing things around to parallelise is easy. So for example, we can use Thread::Queue to go with threads:

#!/usr/bin/env perl use strict; use warnings; use threads; use Thread::Queue; use IO::Socket; my $nthreads = 20; my $in_file2 = 'rang.txt'; my $work_q = Thread::Queue->new; my $result_q = Thread::Queue->new; sub ip_checker { while ( my $ip = $work_q->dequeue ) { chomp($ip); $host = IO::Socket::INET->new( PeerAddr => $ip, PeerPort => 80, proto => 'tcp', Timeout => 1 ); if ( defined $host ) { $result_q->enqueue($ip); } } } sub file_writer { open( my $output_fh, ">>", "port.txt" ) or die $!; while ( my $ip = $result_q->dequeue ) { print {$output_fh} "$ip\n"; } close($output_fh); } for ( 1 .. $nthreads ) { push( @workers, threads->create( \&ip_checker ) ); } my $writer = threads->create( \&file_writer ); open( my $dat, "<", $in_file2 ) or die $!; $work_q->enqueue(<$dat>); close($dat); $work_q->end; foreach my $thr (@workers) { $thr->join(); } $result_q->end; $writer->join();

This uses a queue to feed a set of (20) worker threads with an IP list, and work their way through them, collating and printing results through the `writer` thread.

Forking

The second approach to parallelism in perl is via forking. Forks are more efficient if you're planning on spinning up more of them in 'disposable' subprocesses, but they're harder to IPC than threads

You can just use fork() on it's own, but I think it's far better to use the excellent Parallel::ForkManager library.

Doing approximately the same as the above:

#!/usr/bin/env perl use strict; use warnings; use Fcntl qw ( :flock ); use IO::Socket; my $in_file2 = 'rang.txt'; open( my $input, "<", $in_file2 ) or die $!; open( my $output, ">", "port.txt" ) or die $!; my $manager = Parallel::ForkManager->new(20); foreach my $ip (<$input>) { $manager->start and next; chomp($ip); my $host = IO::Socket::INET->new( PeerAddr => $ip, PeerPort => 80, proto => 'tcp', Timeout => 1 ); if ( defined $host ) { flock( $output, LOCK_EX ); #exclusive or write lock print {$output} $ip, "\n"; flock( $output, LOCK_UN ); #unlock } $manager->finish; } $manager->wait_all_children; close($output); close($input);

You need to be particularly careful of file IO when multiprocessing, because the whole point is your execution sequence is no longer well defined. So it's insanely easy to end up with different threads clobbering files that another thread has open, but hasn't flushed to disk.

But in both multiprocessing paradigms I outlined above (there are others, these are the most common) you still have to deal with the file IO serialisation. Note that your 'results' will be in a random order in both, because it'll very much depend on when the task completes. If that's important to you, then you'll need to collate and sort after your threads or forks complete.

Non Blocking IP

More a subset of parallel - but you can use something like IO::Select and/or IPC::Run - this allows you to open subprocesses, and read/write to them asynchronously. That's sometimes 'parallel enough' for your use case. IMO it's often the IO element of parallel that you want to run more efficiently, and that generally doesn't benefit from parallelism particularly anyway - you don't get more IO from a socket just by hammering it 5x in parallel, if anything you slow it down. If that's of particular interest, I'll try and mash up some example code to do that for you, but the thing I used from the above isn't particularly 'useful' for that.

Replies are listed 'Best First'.
Re^2: Perl Multi Processing.
by marioroy (Prior) on Dec 07, 2021 at 03:37 UTC

    For Perl lacking threads support (i.e. not compiled with threads enabled), one may run threads-like code using MCE::Hobo. Here is similar code using MCE::Hobo and MCE::Shared.

    #!/usr/bin/env perl use strict; use warnings; use MCE::Hobo; use MCE::Shared; use IO::Socket; my $nthreads = 20; my $in_file2 = 'rang.txt'; my $work_q = MCE::Shared->queue; my $result_q = MCE::Shared->queue; sub ip_checker { while ( my $ip = $work_q->dequeue ) { chomp($ip); my $host = IO::Socket::INET->new( PeerAddr => $ip, PeerPort => 80, proto => 'tcp', Timeout => 1 ); if ( defined $host ) { $result_q->enqueue($ip); } } } sub file_writer { open( my $output_fh, ">>", "port.txt" ) or die $!; while ( my $ip = $result_q->dequeue ) { print {$output_fh} "$ip\n"; } close($output_fh); } my @workers; for ( 1 .. $nthreads ) { push( @workers, MCE::Hobo->create( \&ip_checker ) ); } my $writer = MCE::Hobo->create( \&file_writer ); open( my $dat, "<", $in_file2 ) or die $!; $work_q->enqueue(<$dat>); close($dat); $work_q->end; foreach my $thr (@workers) { $thr->join(); } $result_q->end; $writer->join();

    MCE::Child is also threads-like, but does not require MCE::Shared to run. The complementary module for exchanging data is MCE::Channel.

    #!/usr/bin/env perl use strict; use warnings; use MCE::Child; use MCE::Channel; use IO::Socket; my $nthreads = 20; my $in_file2 = 'rang.txt'; my $work_q = MCE::Channel->new; my $result_q = MCE::Channel->new; sub ip_checker { while ( my $ip = $work_q->dequeue ) { chomp($ip); my $host = IO::Socket::INET->new( PeerAddr => $ip, PeerPort => 80, proto => 'tcp', Timeout => 1 ); if ( defined $host ) { $result_q->enqueue($ip); } } } sub file_writer { open( my $output_fh, ">>", "port.txt" ) or die $!; while ( my $ip = $result_q->dequeue ) { print {$output_fh} "$ip\n"; } close($output_fh); } my @workers; for ( 1 .. $nthreads ) { push( @workers, MCE::Child->create( \&ip_checker ) ); } my $writer = MCE::Child->create( \&file_writer ); open( my $dat, "<", $in_file2 ) or die $!; $work_q->enqueue(<$dat>); close($dat); $work_q->end; foreach my $thr (@workers) { $thr->join(); } $result_q->end; $writer->join();

    The modules are interchangeable. One may use threads with MCE::Shared or MCE::Channel. Ditto for using Parallel::ForkManager with MCE::Shared or MCE::Channel. Here, we have a shared array.

    #!/usr/bin/env perl use strict; use warnings; use Parallel::ForkManager; use MCE::Shared; use IO::Socket; my $in_file2 = 'rang.txt'; open( my $input, "<", $in_file2 ) or die $!; my $result = MCE::Shared->array; my $manager = Parallel::ForkManager->new(20); foreach my $ip (<$input>) { $manager->start and next; MCE::Shared->init(); # Optional # This is called automatically for threads, MCE, MCE::Hobo, # and MCE::Child. Calling MCE::Shared->init assigns a data # channel in a round-robin fashion to ForkManager workers. # Omitting init has the effect of 1 data channel versus 10. chomp($ip); my $host = IO::Socket::INET->new( PeerAddr => $ip, PeerPort => 80, proto => 'tcp', Timeout => 1 ); if ( defined $host ) { $result->push($ip); } $manager->finish; } $manager->wait_all_children; close($input); # move shared array to unshared array reference $result = $result->destroy({ unbless => 1 }); open( my $output, ">", "port.txt" ) or die $!; print {$output} "$_\n" for @{ $result }; close($output);

    The shared array resides under the shared-manager process. This is where shared data resides. Upon completion, one can export the data or export-destroy via $result->destroy. The object is MCE::Shared::Array or an array reference [] for unbless => 1. Not exporting will involve making individual trips per each item. One can call $result->values which exports the values as well.

      Here is MCE::Flow where workers read from the input file directly. MCE->gather is called to append to an array.

      #!/usr/bin/env perl use strict; use warnings; use MCE::Flow; use IO::Socket; my $nthreads = 20; my $in_file2 = 'rang.txt'; sub ip_checker { my $ip = $_; chomp($ip); my $host = IO::Socket::INET->new( PeerAddr => $ip, PeerPort => 80, proto => 'tcp', Timeout => 1 ); if ( defined $host ) { MCE->gather($ip); } } my @result = MCE::Flow->run_file( { chunk_size => 1, max_workers => $nthreads }, \&ip_checker, $in_file2, ); MCE::Flow->finish(); open( my $output, ">", "port.txt" ) or die $!; print {$output} "$_\n" for @result; close($output);

      For this one, workers write serially to the output file via MCE->print.

      use strict; use warnings; use MCE::Flow; use IO::Socket; my $nthreads = 20; my $in_file2 = 'rang.txt'; open( my $output, ">", "port.txt" ) or die $!; sub ip_checker { my $ip = $_; chomp($ip); my $host = IO::Socket::INET->new( PeerAddr => $ip, PeerPort => 80, proto => 'tcp', Timeout => 1 ); if ( defined $host ) { MCE->print($output, "$ip\n"); } } MCE::Flow->run_file( { chunk_size => 1, max_workers => $nthreads }, \&ip_checker, $in_file2, ); MCE::Flow->finish(); close($output);

      Perl provides various ways to concur parallel processing. There is Parallel::ForkManager. For threads-like API, there is MCE::Child and MCE::Hobo. MCE itself provides chunking capabilities. It has sugar syntax to gather into an array or hash. For output, MCE->print, MCE->printf, and MCE->say write serially to a file handle (default STDOUT if omitted).

      Basically, no problem if the Perl binary lacks threads support.