sub create_thread_pool { my $files_to_digest = shift; threads->create( threads_progress => $files_to_digest ); for ( 1 .. $opts->{threads} ) { my $thread_queue = Thread::Queue->new; my $worker_thread = threads->create( worker => $thread_queue ); $worker_queues->{ $worker_thread->tid } = $thread_queue; } lock $threads_init; $threads_init++; } sub get_dup_digests { my $size_dups = shift; my $dup_count = 0; my $queued = 0; $dup_count += @$_ for map { $size_dups->{ $_ } } keys %$size_dups; # creates thread pool, passing in as an argument the number of files # that the pool needs to digest. this is NOT equivalent to the number # of threads to be created; that is determined in the options ($opts) create_thread_pool( $dup_count ); sub get_tid { my $tid = $pool_queue->dequeue; return $tid; } my $tid = get_tid(); SIZESCAN: for my $size ( keys %$size_dups ) { my $group = $size_dups->{ $size }; for my $file ( @$group ) { $worker_queues->{ $tid }->enqueue( $file ) if !$thread_term; $queued++; $tid = get_tid() and $queued = 0 if $queued == $opts->{qsize} - 1; last SIZESCAN unless defined $tid; } } # wait for threads to finish while ( $d_counter < $dup_count ) { usleep 1000; # sleep for 1 millisecond } # ...tell the threads to exit end_wait_thread_pool(); # get rid of non-dupes delete $digests->{ $_ } for grep { @{ $digests->{ $_ } } == 1 } keys %$digests; my $priv_digests = {}; # sort dup groupings for my $digest ( keys %$digests ) { my @group = @{ $digests->{ $digest } }; $priv_digests->{ $digest } = [ sort { $a cmp $b } @group ]; } undef $digests; return $priv_digests; }