Hello all, I am writing a program to convert a large number of XML files into RDF triples and also, based on certain fields in the XML, populate a hash with those fields' values. Once the hash has been populated, I run an algorithm that determines the relative similarity of each string to the others.
For the number of threads, I decided to find the number of CPU's in the machine so that the code could be easily ported to another server with more CPU's without any additional configuration.
The code works, both in the original single-threaded and subsequent multi-threaded versions. The issue I am running into is that when the program starts, CPU utilization is hovering around 50%, but as the program runs, it peeters out and eventually drops to around 3-4% and the program chugs along, albeit very slowly.
In addition to trying to figure out why the CPU utilization is not staying constant, I would like to find a way to dynamically add threads based on CPU utilization such that more work can be done when the CPU load falls below a certain threshold (arbitrarily, let's say 25%).
I would appreciate any feedback on this code. For brevity, I have not included the subroutine code for the XML conversion, but the rest of the code is here. Also, if there are any ways to speed up the algorithm that is comparing strings, I would welcome any suggestions.
#!/usr/bin/perl
use strict;
use warnings;
use Carp qw(carp cluck croak confess);
use XML::Hash;
use File::Slurp;
use Date::Parse;
binmode STDOUT, ":utf8";
use threads;
use threads::shared;
use Thread::Queue;
use Sys::CPU;
use Devel::Size qw(size total_size);
use List::MoreUtils qw(uniq);
#use Data::Dumper;
local $| = 1;
print `/bin/date`."\n";
our $THREADS = Sys::CPU::cpu_count()*2;
my $dir='/xmlFeeds';
my ($DIR,@files);
opendir($DIR,$dir);
foreach(readdir($DIR)) {
push @files, $_ if $_ =~ m/.*\.xml/;
}
closedir($DIR);
my $outFile='./out.nt';
my $OUTFILE;
open($OUTFILE,'>:utf8',$outFile);
my %similar :shared;
my $recordCount :shared;
$recordCount=1;
my $Qwork = new Thread::Queue;
## Create the pool of workers
my @pool = map{
threads->create( \&worker, $Qwork )
} 1 .. $THREADS;
$Qwork->enqueue(@files);
## Tell the workers there are no more work items
$Qwork->enqueue( (undef) x $THREADS );
## Clean up the threads
$_->join for @pool;
## Process similar domain names, file names, etc.
## Create the pool of workers
@pool=();
@pool = map{
threads->create( \&worker2, $Qwork )
} 1 .. $THREADS;
foreach (keys %similar) {
$Qwork->enqueue({$_ => $similar{$_}});
}
## Tell the workers there are no more work items
$Qwork->enqueue( (undef) x $THREADS );
## Clean up the threads
$_->join for @pool;
close($OUTFILE);
print `/bin/date`."\n";
sub worker {
my $tid = threads->tid;
my( $Qwork ) = @_;
while( my $file = $Qwork->dequeue ) {
my $triple=procXml($file);
print $OUTFILE $triple if defined $triple;
}
}
sub worker2 {
my $tid = threads->tid;
my( $Qwork ) = @_;
while( my $file = $Qwork->dequeue ) {
while ( my ($key, $val) = each(%$file) ) {
my $triple=simAlg($key,$val);
print $OUTFILE $triple if defined $triple;
}
}
}
sub simAlg {
my($dom,$type)=@_;
my $triple;
chomp $dom;
my @w1=unpack("(A2)*", $dom);
@w1=uniq(@w1);
my %w1H=map{$_ => 1} @w1;
foreach my $odom(keys %similar) {
chomp $odom;
my $innerType = $similar{$odom};
next if $odom eq $dom;
my @w2=unpack("(A2)*", $odom);
@w2=uniq(@w2);
my @counter=grep($w1H{$_},@w2);
my $value=((@counter)*2)/(@w1+@w2);
if($value >= 0.9) {
$triple .= qq|<http://cs.org/$type#$dom> <http://cs.org/p/
+similarName> <http://cs.org/$innerType#$odom> .\n|;
print $triple;
}
}
return $triple;
}
sub procXml {
[code here]
}