note
Anonymous Monk
<p> I don't understand this part , I think you have too many queues, and you should probably have two, one for files to process , and one for results of that processing
<p> Also the closures don't make sense to me
<p> The meat of the threading code :) <c>
sub create_thread_pool
{
my $files_to_digest = shift;
threads->create( threads_progress => $files_to_digest );
for ( 1 .. $opts->{threads} )
{
my $thread_queue = Thread::Queue->new;
my $worker_thread = threads->create( worker => $thread_queue );
$worker_queues->{ $worker_thread->tid } = $thread_queue;
}
lock $threads_init; $threads_init++;
}
sub get_dup_digests
{
my $size_dups = shift;
my $dup_count = 0;
my $queued = 0;
$dup_count += @$_ for map { $size_dups->{ $_ } } keys %$size_dups;
# creates thread pool, passing in as an argument the number of files
# that the pool needs to digest. this is NOT equivalent to the number
# of threads to be created; that is determined in the options ($opts)
create_thread_pool( $dup_count );
sub get_tid
{
my $tid = $pool_queue->dequeue;
return $tid;
}
my $tid = get_tid();
SIZESCAN: for my $size ( keys %$size_dups )
{
my $group = $size_dups->{ $size };
for my $file ( @$group )
{
$worker_queues->{ $tid }->enqueue( $file ) if !$thread_term;
$queued++;
$tid = get_tid() and $queued = 0 if $queued == $opts->{qsize} - 1;
last SIZESCAN unless defined $tid;
}
}
# wait for threads to finish
while ( $d_counter < $dup_count )
{
usleep 1000; # sleep for 1 millisecond
}
# ...tell the threads to exit
end_wait_thread_pool();
# get rid of non-dupes
delete $digests->{ $_ }
for grep { @{ $digests->{ $_ } } == 1 }
keys %$digests;
my $priv_digests = {};
# sort dup groupings
for my $digest ( keys %$digests )
{
my @group = @{ $digests->{ $digest } };
$priv_digests->{ $digest } = [ sort { $a cmp $b } @group ];
}
undef $digests;
return $priv_digests;
}
</c>
1069338
1069338
14