in reply to Re: Perl Multi Processing.
in thread Perl Multi Processing.

For Perl lacking threads support (i.e. not compiled with threads enabled), one may run threads-like code using MCE::Hobo. Here is similar code using MCE::Hobo and MCE::Shared.

#!/usr/bin/env perl use strict; use warnings; use MCE::Hobo; use MCE::Shared; use IO::Socket; my $nthreads = 20; my $in_file2 = 'rang.txt'; my $work_q = MCE::Shared->queue; my $result_q = MCE::Shared->queue; sub ip_checker { while ( my $ip = $work_q->dequeue ) { chomp($ip); my $host = IO::Socket::INET->new( PeerAddr => $ip, PeerPort => 80, proto => 'tcp', Timeout => 1 ); if ( defined $host ) { $result_q->enqueue($ip); } } } sub file_writer { open( my $output_fh, ">>", "port.txt" ) or die $!; while ( my $ip = $result_q->dequeue ) { print {$output_fh} "$ip\n"; } close($output_fh); } my @workers; for ( 1 .. $nthreads ) { push( @workers, MCE::Hobo->create( \&ip_checker ) ); } my $writer = MCE::Hobo->create( \&file_writer ); open( my $dat, "<", $in_file2 ) or die $!; $work_q->enqueue(<$dat>); close($dat); $work_q->end; foreach my $thr (@workers) { $thr->join(); } $result_q->end; $writer->join();

MCE::Child is also threads-like, but does not require MCE::Shared to run. The complementary module for exchanging data is MCE::Channel.

#!/usr/bin/env perl use strict; use warnings; use MCE::Child; use MCE::Channel; use IO::Socket; my $nthreads = 20; my $in_file2 = 'rang.txt'; my $work_q = MCE::Channel->new; my $result_q = MCE::Channel->new; sub ip_checker { while ( my $ip = $work_q->dequeue ) { chomp($ip); my $host = IO::Socket::INET->new( PeerAddr => $ip, PeerPort => 80, proto => 'tcp', Timeout => 1 ); if ( defined $host ) { $result_q->enqueue($ip); } } } sub file_writer { open( my $output_fh, ">>", "port.txt" ) or die $!; while ( my $ip = $result_q->dequeue ) { print {$output_fh} "$ip\n"; } close($output_fh); } my @workers; for ( 1 .. $nthreads ) { push( @workers, MCE::Child->create( \&ip_checker ) ); } my $writer = MCE::Child->create( \&file_writer ); open( my $dat, "<", $in_file2 ) or die $!; $work_q->enqueue(<$dat>); close($dat); $work_q->end; foreach my $thr (@workers) { $thr->join(); } $result_q->end; $writer->join();

The modules are interchangeable. One may use threads with MCE::Shared or MCE::Channel. Ditto for using Parallel::ForkManager with MCE::Shared or MCE::Channel. Here, we have a shared array.

#!/usr/bin/env perl use strict; use warnings; use Parallel::ForkManager; use MCE::Shared; use IO::Socket; my $in_file2 = 'rang.txt'; open( my $input, "<", $in_file2 ) or die $!; my $result = MCE::Shared->array; my $manager = Parallel::ForkManager->new(20); foreach my $ip (<$input>) { $manager->start and next; MCE::Shared->init(); # Optional # This is called automatically for threads, MCE, MCE::Hobo, # and MCE::Child. Calling MCE::Shared->init assigns a data # channel in a round-robin fashion to ForkManager workers. # Omitting init has the effect of 1 data channel versus 10. chomp($ip); my $host = IO::Socket::INET->new( PeerAddr => $ip, PeerPort => 80, proto => 'tcp', Timeout => 1 ); if ( defined $host ) { $result->push($ip); } $manager->finish; } $manager->wait_all_children; close($input); # move shared array to unshared array reference $result = $result->destroy({ unbless => 1 }); open( my $output, ">", "port.txt" ) or die $!; print {$output} "$_\n" for @{ $result }; close($output);

The shared array resides under the shared-manager process. This is where shared data resides. Upon completion, one can export the data or export-destroy via $result->destroy. The object is MCE::Shared::Array or an array reference [] for unbless => 1. Not exporting will involve making individual trips per each item. One can call $result->values which exports the values as well.

Replies are listed 'Best First'.
Re^3: Perl Multi Processing.
by marioroy (Prior) on Dec 07, 2021 at 04:27 UTC

    Here is MCE::Flow where workers read from the input file directly. MCE->gather is called to append to an array.

    #!/usr/bin/env perl use strict; use warnings; use MCE::Flow; use IO::Socket; my $nthreads = 20; my $in_file2 = 'rang.txt'; sub ip_checker { my $ip = $_; chomp($ip); my $host = IO::Socket::INET->new( PeerAddr => $ip, PeerPort => 80, proto => 'tcp', Timeout => 1 ); if ( defined $host ) { MCE->gather($ip); } } my @result = MCE::Flow->run_file( { chunk_size => 1, max_workers => $nthreads }, \&ip_checker, $in_file2, ); MCE::Flow->finish(); open( my $output, ">", "port.txt" ) or die $!; print {$output} "$_\n" for @result; close($output);

    For this one, workers write serially to the output file via MCE->print.

    use strict; use warnings; use MCE::Flow; use IO::Socket; my $nthreads = 20; my $in_file2 = 'rang.txt'; open( my $output, ">", "port.txt" ) or die $!; sub ip_checker { my $ip = $_; chomp($ip); my $host = IO::Socket::INET->new( PeerAddr => $ip, PeerPort => 80, proto => 'tcp', Timeout => 1 ); if ( defined $host ) { MCE->print($output, "$ip\n"); } } MCE::Flow->run_file( { chunk_size => 1, max_workers => $nthreads }, \&ip_checker, $in_file2, ); MCE::Flow->finish(); close($output);

    Perl provides various ways to concur parallel processing. There is Parallel::ForkManager. For threads-like API, there is MCE::Child and MCE::Hobo. MCE itself provides chunking capabilities. It has sugar syntax to gather into an array or hash. For output, MCE->print, MCE->printf, and MCE->say write serially to a file handle (default STDOUT if omitted).

    Basically, no problem if the Perl binary lacks threads support.