I notice that in the code you provide the hash is not being reduced at each threads creation.
If by that you mean that as well as id:1 being compared against id:2 that id:2 is also compared against id:1; then that is easily addressed:
#! perl -slw
use strict;
use threads;
use threads::shared;
use Time::HiRes qw[ sleep time ];
use Text::LevenshteinXS qw(distance);
our $M //= 4;
our $S //= 25;
our $L //= 100;
our $C //= 50;
srand 1;
my $minimum_score = $C;
my $max_childs = $M;
my %contacts :shared;
get_db_data( \%contacts );
my @ids = keys %contacts;
print STDERR "Starting with max: $max_childs concurrent threads";
my $start = time;
for my $idn1 ( 0 .. $#ids ) {
my $id = $ids[ $idn1 ];
sleep 0.1 while threads->list >= $max_childs;
async( sub {
my $source_name = shift;
my $counter = 0;
for my $idn2 ( $idn1+1 .. $#ids ) {
my $contact = $ids[ $idn2 ];
my $dest_name = $contacts{ $contact };
my $distance = distance( $source_name, $dest_name );
my $score = 100 - ( $distance * 100 / length( $dest_name )
+ || 1 );
if( $score >= $minimum_score ) {
print "$id looks like $contact";
}
++$counter;
}
printf "Thread %d compared $id with $counter contacts\n", thre
+ads->tid;
threads->self->detach;
}, $contacts{ $id } );
}
sleep 0.1 while threads->list;
printf STDERR "Took %6f seconds\n", time() - $start;
exit;
sub get_db_data { ## mock_up
my $ref = shift;
$ref->{ $_ } = join '', map{ ( 'A', 'C', 'G', 'T' )[ rand 4 ] } 1
+.. $L
for 1 .. $S;
return;
}
__END__
c:\test>866682
Starting with max: 4 concurrent threads
Thread 1 compared 11 with 24 contacts
21 looks like 23
Thread 2 compared 21 with 23 contacts
Thread 3 compared 7 with 22 contacts
17 looks like 12
Thread 4 compared 17 with 21 contacts
2 looks like 12
Thread 5 compared 2 with 20 contacts
Thread 6 compared 22 with 19 contacts
Thread 7 compared 1 with 18 contacts
Thread 8 compared 18 with 17 contacts
23 looks like 14
23 looks like 4
Thread 9 compared 23 with 16 contacts
16 looks like 12
16 looks like 15
Thread 10 compared 16 with 15 contacts
Thread 11 compared 13 with 14 contacts
Thread 12 compared 25 with 13 contacts
6 looks like 12
Thread 13 compared 6 with 12 contacts
Thread 14 compared 3 with 11 contacts
Thread 15 compared 9 with 10 contacts
Thread 16 compared 12 with 9 contacts
Thread 17 compared 20 with 8 contacts
Thread 18 compared 14 with 7 contacts
Thread 19 compared 15 with 6 contacts
Thread 20 compared 8 with 5 contacts
4 looks like 5
Thread 21 compared 4 with 4 contacts
Thread 22 compared 24 with 3 contacts
Thread 23 compared 19 with 2 contacts
Thread 24 compared 10 with 1 contacts
Thread 25 compared 5 with 0 contacts
Took 0.438257 seconds
As for locking. There is not actual need for user locking in this code--though there is obviously some internal locking going on. How effective the concurrency is will depend on the ratio between the time to spawn the thread and the time spent performing the comparison. And that is totally dependant upon the length of the sequences you are comparing.
If the sequences are short (say less than a couple of hundred characters), the overhead of one thread per id will not be very effective. In those circumstances--and actually even with longer sequences--I would opt for a different strategy. That of just starting one thread per core and queuing the sequences to them. I only posted that version first because it mirrored your forking version, which makes it easy to contrast the approaches.
So here is a thread-pool-queue version. It's actually simpler: #! perl -slw
use strict;
use threads;
use threads::shared;
use Thread::Queue;
use Time::HiRes qw[ sleep time ];
use Text::LevenshteinXS qw(distance);
our $M //= 4;
our $S //= 25;
our $L //= 100;
our $C //= 50;
my $minimum_score = $C;
my $max_childs = $M;
sub worker {
my $Q = shift;
while( $_ = $Q->dequeue ) {
my( $srcId, $srcSeq, $dstId, $dstSeq ) = map split( ':' ), spl
+it;
my $dist = distance( $srcSeq, $dstSeq );
my $score = 100 - ( $dist * 100 / length( $dstSeq ) || 1 );
if( $score >= $minimum_score ) {
print "$srcId looks like $dstId";
}
}
}
srand 1;
my $Q = new Thread::Queue;
my @workers = map async( \&worker, $Q ), 1 .. $max_childs;
my %contacts;
get_db_data( \%contacts );
my @ids = keys %contacts;
print STDERR "Starting with max: $max_childs concurrent threads";
my $start = time;
for my $idn1 ( 0 .. $#ids ) {
my $srcId = $ids[ $idn1 ];
for my $idn2 ( $idn1 + 1 .. $#ids ) {
my $dstid = $ids[ $idn2 ];
sleep 0.1 while $Q->pending > 10;
$Q->enqueue( "$srcId:$contacts{ $srcId } $dstId:$contacts{ $ds
+tId }" );
}
}
$Q->enqueue( (undef) x $max_childs );
$_->join for @workers;
printf STDERR "Took %6f seconds\n", time() - $start;
exit;
sub get_db_data { ## mock_up
my $ref = shift;
$ref->{ $_ } = join '', map{ ( 'A', 'C', 'G', 'T' )[ rand 4 ] } 1
+.. $L
for 1 .. $S;
return;
}
__END__
c:\test>866682-2
Starting with max: 4 concurrent threads
21 looks like 23
17 looks like 12
2 looks like 12
23 looks like 21
23 looks like 14
23 looks like 4
16 looks like 12
16 looks like 15
6 looks like 12
12 looks like 17
12 looks like 2
12 looks like 16
12 looks like 6
14 looks like 23
15 looks like 16
4 looks like 23
4 looks like 5
5 looks like 4
Took 1.167890 seconds
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.
|