Beefy Boxes and Bandwidth Generously Provided by pair Networks
Clear questions and runnable code
get the best and fastest answer
 
PerlMonks  

Re^5: shared scalar freed early

by marioroy (Prior)
on Feb 25, 2017 at 11:03 UTC ( [id://1182812]=note: print w/replies, xml ) Need Help??


in reply to Re^4: shared scalar freed early
in thread shared scalar freed early

Hello, chris212. Welcome to the monastery.

Hello fellow monks. I've been busy, for some time. I saw this thread and wanted to pass on knowledge. This is an interesting problem. The following is a demonstration one might write using the MCE module. Please notice the lack of semaphore and locking at the application level. The time to run is similar to the testa example by Chris. The output file (m.txt), generated here, matches with (a.txt).

The script runs on all the major platforms, including Windows. The workers persist between chunks. Output order is preserved as per requirement. Both input and output iterators are handled by the MCE-manager process, the main process. Workers request the next input chunk, compute data, then submit their results via MCE->gather. This works reasonably well.

#!/opt/perl/bin/perl ## http://www.perlmonks.org/?node_id=1182580 use strict; use warnings; use MCE; use Time::HiRes 'time'; my $iterations = 100; my $chunksize = 50; my $threads = 5; my $output = "m.txt"; my %data = (); foreach ('a'..'z') { $data{$_} = $_ x 200; } test_mce(); sub test_mce { my $start = time; my $mce = MCE->new( max_workers => $threads, chunk_size => $chunksize, input_data => make_iter_input($chunksize, $iterations), gather => make_iter_output($output), user_func => \&work, )->run(); printf STDERR "testa done in %0.02f seconds\n", time - $start; } # MCE task to run in parallel sub work { my ($mce, $chunk_ref, $chunk_id) = @_; my $data = $chunk_ref->[0]; my @ret = (); foreach my $chunk (@$data) { my %output = (); foreach my $key (keys %$chunk) { if ($key eq '.') { $output{$key} = $$chunk{$key}; next; } my $val = $$chunk{$key}; my $uc = uc($key); $val =~ s/$key/$uc/g; $output{$key} = $val; } push(@ret,\%output); } MCE->gather($chunk_id, \@ret); } # make an input closure, returns an iterator sub make_iter_input { my ($chunk_size, $iterations) = @_; my $seq_a = 1; return sub { return if $seq_a > $iterations; my ($chunk_size) = @_; my @chunk = (); foreach my $seq_b ( 1 .. $chunk_size ) { my %retdata = %data; $retdata{'.'} = $seq_a * $seq_b; push @chunk, \%retdata; } $seq_a += 1; return \@chunk; }; } # make an output closure, returns an iterator sub make_iter_output { my ($path) = @_; my %hold; my $order_id = 1; open my $fh, '>', $path or die "open error: $!"; return sub { my $chunk_id = shift; # hold temporarily, until orderly $hold{$chunk_id} = shift; # \@ret while (1) { last unless exists $hold{$order_id}; foreach my $data (@{ delete $hold{$order_id} }) { foreach my $key (sort keys %$data) { print {$fh} $$data{$key}; } print {$fh} "\n"; } $order_id++; } }; }

Regards, Mario.

Replies are listed 'Best First'.
Re^6: shared scalar freed early
by marioroy (Prior) on Feb 25, 2017 at 11:56 UTC

    Upon closer inspection, the manager process appears to be doing some heavy lifting inside the output iterator. The change below moves some of that to the worker process. This change frees the manager process from unnecessary overhead.

    # MCE task to run in parallel sub work { my ($mce, $chunk_ref, $chunk_id) = @_; my $data = $chunk_ref->[0]; my @ret = (); foreach my $chunk (@$data) { my %output = (); foreach my $key (keys %$chunk) { if ($key eq '.') { $output{$key} = $$chunk{$key}; next; } my $val = $$chunk{$key}; my $uc = uc($key); $val =~ s/$key/$uc/g; $output{$key} = $val; } push(@ret,\%output); } my $buf = ''; foreach my $data (@ret) { foreach my $key (sort keys %$data) { $buf .= $$data{$key}; } $buf .= "\n"; } MCE->gather($chunk_id, $buf); } # make an output closure, returns an iterator sub make_iter_output { my ($path) = @_; my %hold; my $order_id = 1; open my $fh, '>', $path or die "open error: $!"; return sub { my $chunk_id = shift; # hold temporarily, until orderly $hold{$chunk_id} = $_[0]; # $buf while (1) { last unless exists($hold{$order_id}); print {$fh} delete($hold{$order_id}); $order_id++; } }; }

    Regards, Mario.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://1182812]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others perusing the Monastery: (1)
As of 2024-04-24 14:31 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found