cormanaz has asked for the wisdom of the Perl Monks concerning the following question:

Good day monks. I have an application that is threaded with Parallel::Forkmanager. I am trying to use a counter to keep track of progress like so (partial code):
our $ctr = 0; say "processing duplicates..."; LOOP: foreach my $pid (@duplicated) { $pm->start and next LOOP; # do the fork unless ($ctr % 1000) {say $ctr;} $ctr++; #do stuff $pm->finish; # exit the child process }
The counter is printing, but it's always zero. Why isn't it incrementing properly?

Replies are listed 'Best First'.
Re: Why is my counter not updating in a threaded application?
by Corion (Patriarch) on Mar 29, 2024 at 20:28 UTC

    You are not threading, you are forking. Your incrementing of the counter only happens in the child.

    If you move the counter before the $pm->start and next LOOP, it will increment properly as that runs in the main process:

    our $ctr = 0; say "processing duplicates..."; LOOP: foreach my $pid (@duplicated) { unless ($ctr % 1000) {say $ctr;} $ctr++; $pm->start and next LOOP; # do the fork #do stuff $pm->finish; # exit the child process }
      I realized I probably should have done that after posting this. But I thought child processes have access to the global variables. Does this mean anything I push onto our @results; (defined before this code segment) in the #do stuff section won't be retained either?

        I thought child processes have access to the global variables.

        The child process is a copy of the parent process, made at the point of fork(). So any variables (not just globals) will have the same value at the start of the child process as they did in the parent process at that time. But changes made in the child affect only the copy, not the original in the parent.

        The same is true for threads, except that in that case there is a special mechanism called sharing that lets you nominate specific variables to be shared across threads, such that a change made in one thread is visible in the other threads (see threads::shared). This is costly, however - access to such variables must be protected by locking - so should be used sparingly if you hope to retain any speed benefit from making an application multi-threaded.

        There are other more explicit mechanisms for sharing data between threads or processes - think for example of pipes, shared memory, or databases. Collectively such mechanisms are known as "IPC" (inter-process communication) - there are lots of modules in the IPC:: namespace that may give you ideas.

Re: Why is my counter not updating in a threaded application?
by marioroy (Prior) on Mar 30, 2024 at 15:45 UTC

    1. Passing data. This contains the $ctr suggestion by Corion.

    use v5.32; use Parallel::ForkManager; use Data::Dumper; my $pm = Parallel::ForkManager->new(8); $pm->set_waitpid_blocking_sleep(0); our @results; $pm->run_on_finish( sub { my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_ref) + = @_; push @results, ${$data_ref}; }); our @duplicated = (10..20); our $ctr = 0; say "processing duplicates..."; LOOP: foreach my $pid (@duplicated) { unless ($ctr % 1000) {say $ctr;} $ctr++; $pm->start and next LOOP; # do the fork #do stuff my $ans = $pid; $pm->finish(0, \$ans); # exit the child process } $pm->wait_all_children; say scalar(@results); # print Dumper(\@results);

    2. Using MCE::Child with Parallel::ForkManager similarities.

    use v5.32; use MCE::Child; use Data::Dumper; our @results; MCE::Child->init( max_workers => 8, posix_exit => 1, on_finish => sub { my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_ +ref) = @_; push @results, ${$data_ref}; } ); our @duplicated = (10..20); our $ctr = 0; say "processing duplicates..."; foreach my $pid (@duplicated) { unless ($ctr % 1000) {say $ctr;} $ctr++; MCE::Child->create(sub { #do stuff my $ans = $pid; return \$ans; }); } MCE::Child->wait_all; say scalar(@results); # print Dumper(\@results);

    3. Using MCE::Shared to construct a shared array.

    use v5.32; use MCE::Shared; use Parallel::ForkManager; use Data::Dumper; my $pm = Parallel::ForkManager->new(8); $pm->set_waitpid_blocking_sleep(0); # create a shared array object # this starts the shared-manager process our $results = MCE::Shared->array(); our @duplicated = (10..20); our $ctr = 0; say "processing duplicates..."; LOOP: foreach my $pid (@duplicated) { unless ($ctr % 1000) {say $ctr;} $ctr++; $pm->start and next LOOP; # do the fork #do stuff my $ans = $pid; $results->push($ans); $pm->finish; # exit the child process } $pm->wait_all_children; # destroy the shared object and retrieve the array # unblesses MCE::Shared::Array, becoming plain array ref $results = $results->destroy({ unbless => 1 }); # stop the shared-manager process MCE::Shared->stop; say scalar(@{$results}); # print Dumper($results);

    4. Using MCE and a shared counter variable.

    use v5.32; use MCE; use MCE::Shared; use Data::Dumper; our @results; our $shared_ctr = MCE::Shared->scalar(0); my $mce = MCE->new( gather => \@results, chunk_size => 1, max_workers => 8, posix_exit => 1, user_func => sub { my ($mce, $chunk_ref, $chunk_id) = @_; my $ctr = $shared_ctr->getincr; unless ($ctr % 1000) {MCE->say($ctr);} #do stuff my $ans = $chunk_ref->[0]; # or $_ MCE->gather($ans); } )->spawn; our @duplicated = (10..20); $mce->process({ input_data => \@duplicated }); $mce->shutdown; # stop the shared-manager process MCE::Shared->stop; say scalar(@results); # print Dumper(\@results);

    5. Using MCE without MCE::Shared.

    use v5.32; use MCE; use Data::Dumper; our @results; our $global_ctr = 0; sub getincr { my $ctr = $global_ctr++; return $ctr; } my $mce = MCE->new( gather => \@results, chunk_size => 1, max_workers => 8, posix_exit => 1, user_func => sub { my ($mce, $chunk_ref, $chunk_id) = @_; my $ctr = MCE->do('getincr'); unless ($ctr % 1000) {MCE->say($ctr);} #do stuff my $ans = $chunk_ref->[0]; # or $_ MCE->gather($ans); } )->spawn; our @duplicated = (10..20); $mce->process({ input_data => \@duplicated }); $mce->shutdown; say scalar(@results); # print Dumper(\@results);