#!/usr/bin/perl use strict; use warnings; use Data::Dumper qw(Dumper); use re::engine::RE2; use List::MoreUtils qw(uniq); use Sort::Naturally qw(nsort); use MCE; # This program reads an abstract sentence file and produces # output with the following format ... if ($#ARGV != 1) { print "usage: $0 \n"; } my $inputfile1 = $ARGV[0]; my $outputfile = $ARGV[1]; unless (-e $inputfile1) { die "Can't open $inputfile1: No such file or directory"; } # Make gather routine for the manager process. It returns a # closure block for preserving append-order as if processing # serially. my %hashunique; sub make_gather { my ($order_id, %tmp) = (1); return sub { my ($chunk_id, $hashref) = @_; $tmp{$chunk_id} = $hashref; while (exists $tmp{$order_id}) { $hashref = delete $tmp{$order_id}; for my $k (keys %{ $hashref }) { unless (exists $hashunique{$k}) { $hashunique{$k} = $hashref->{$k}; } else { $hashunique{$k} = $hashunique{$k}.'|'.$hashref->{$k}; } } $order_id++; } } } # The user function for MCE workers. Workers open a file handle to # a scalar ref due to using MCE option use_slurpio => 1. sub user_func { my ($mce, $slurp_ref, $chunk_id) = @_; my %localunique; open RF, '<', $slurp_ref; # A shared-hash is not necessary. The gist of it all is batching # to a local hash. Otherwise, a shared-hash inside a loop involves # high IPC overhead. local $/ = ''; # blank line, paragraph break # in the event worker receives 2 or more records while () { my @one = split /\n/, $_; my ($indexofdashinarray) = grep { $one[$_] =~ /\-\-/ } 0..$#one; for my $i (1..$#one) { next if $one[$i] =~ /^\-\-$/; while ($one[$i] =~ m/(\b)D\*(.*?)\*(.*?)\*D(\b)/g) { unless (exists $localunique{"D$2"}) { $localunique{"D$2"} = "$3"; } else { $localunique{"D$2"} = $localunique{"D$2"}.'|'."$3"; } } } } close RF; # Each worker must call gather one time when preserving order # is desired which is the case for this demonstration. MCE->gather($chunk_id, \%localunique); } # Am using the core MCE API. Workers read the input file directly and # sequentially, one worker at a time. my $mce = MCE->new( max_workers => 3, input_data => $inputfile1, chunk_size => 2 * 1024 * 1024, # 2 MiB RS => '', # important, blank line, paragraph break gather => make_gather(), user_func => \&user_func, use_slurpio => 1 ); $mce->run(); # Results. open WF, ">", $outputfile or die "Can't open $outputfile: $!"; foreach my $k (nsort keys %hashunique) { $hashunique{$k} = join ("\|", uniq split /\|/ , $hashunique{$k}); print WF "$k=>$hashunique{$k}\n"; } close WF;