TRoderic has asked for the wisdom of the Perl Monks concerning the following question:
1. I have a very large quantity ( 1.2 million) of images that will need to have mappings done of them, based on an as-yet-to-be-written process determining which ones to promote to the processor intensive stage.
2. the determination process itself is expected to take up to .2 seconds on a 3.4 ghz processor per file wholly in memory, based on some early prototypes
3. previous attempts have hit major stability and time snags, even at the prototyping stage due to the sheer volume of files that make up a comprehensive sample.
4. I believe this is a valid application of a threaded program ( though i'll freely admit I could be wrong), alowing e.g. 4 evaluation processes to be running at once on a 4 core system (and possibly distributed beyond one machine, but that's waaay beyond me right now)
5. the first performance snag I hit was the Disk I/O wall, which i'm trying to work around by using RAM as much as possible
6. because the full evaluation process to appply to all these is not yet complete, I need a way of quickly running prototypes on large samples of the data, which I see as being done by using one thread to consume the contents of a test directory directly into memory, while n other threads try processing them when there's something to process.
7. unfortunately, solving this through threaded directory structures consumption has not been successful, as it either goes above the physical limits of the hardware, or as the following code shows ( at least on my system), halts at a limit which is not being handled properly.
use strict;use warnings;use threads; use threads::shared; use File::Slurp; use Digest::MD5; use Thread::Queue;use Thread::Semaphore; main(@ARGV); sub main { my ($dir,$limit) = @_; my $q= Thread::Queue->new(); unless ($dir) {$dir = 'F:/';} unless($limit){$limit = 3145728; } #example my $done:shared; my %mem:shared; $done = 'n'; %mem = ( limit => $limit, total => 0, t_sem=> Thread::Semaphore->new(), f_count => 0, ); my $thread = threads->create(\&slurp_directory_as_bin,$dir,\$q,\% +mem,\$done); my $thread2 = threads->create(\&procqueue,\$q,\$done,\%mem); #my $thread3 = threads->create(\&procqueue,\$q,\$done,\%mem); #my $thread4 = threads->create(\&procqueue,\$q,\$done,\%mem); $thread->join(); print("concluded directory listing\n"); $thread2->join(); #$thread3->join(); #$thread4->join(); print("\nit is done, move on.") } sub procqueue{ my ($q,$done,$mem) = @_; while ($$done eq 'n'){ while ( my $bl = $$q->dequeue_nb()){ #to be replaced with darker magic once this works print("$$bl{name} ($$bl{fpath}) is $$bl{size} and it's md5 + is : " . Digest::MD5::md5_hex($$bl{bin}) . "\n"); $$mem{t_sem}->down(); $$mem{total} = $$mem{total} - $$bl{size}; print("popped to $$mem{total} with ". $$q->pending +()."left\n"); $$mem{t_sem}->up(); if($$mem{total} < 0){print("HAAALT!"); exit;} } sleep 1; print("thread waiting...\n"); } } sub slurp_directory_as_bin{ my ($dir,$queue,$mem,$r) = @_; unless($dir and (-d $dir)){die("first parameter to slurp_directory +_as_bin must be a valid directory [$!]"); } unless($dir =~ m|/$|){$dir .= '/';} # add on trailing slash for go +od measure my @subdirs = ();#list of subdirectories found in the directory, o +nly used in [r]ecursive mode opendir(DIR,$dir); while (my $file = readdir(DIR)){ next if ($file eq '.'|| $file eq '..' ); #skip self and parent + dir next if ($file eq 'RECYCLER' || $file eq 'System Volume Inform +ation'); #skip problem directories on windows root paths my $fpath =$dir.$file; if (-d $fpath){#if the current specimen is a directory if ($r){#if in recursive mode push (@subdirs,$fpath); #put the reference to the s +ubdirectory in storage for later } next;#dont try to process (yet) } my $bina = File::Slurp::read_file( $fpath, binmode => ':raw' ) +; #consume the file as it's component parts into memory my %data = ( bin => $bina, name => $file, fpath => $fpath, size => -s $fpath, ); my $nqd = 1; while ($nqd == 1){ #wait here until we can add the fil +e into the queue unless($data{size} < $$mem{limit}){die("$data{fpath} is la +rger than the total memory limit"); } my $new_total = $$mem{total} += $data{size}; if ($new_total <= $$mem{limit} ){ $$mem{t_sem}->down(); $$mem{total} += $data{size}; $$queue->enqueue(\%data); $$mem{t_sem}->up(); $$mem{f_count}++; $nqd = 0; }else{ print("waiting for space in the queue for $data{fpath} +, trying to add $data{size} to $$mem{total} to make $new_total with a + limit of $$mem{limit} \n"); print($$queue->pending() . "items left \n"); #sleep 1; $new_total = $$mem{total} + $data{size}; # recheck in +case it's changed since then... } } } # end of files in directory foreach my $sdir (@subdirs){ slurp_directory_as_bin($sdir,$queue,$mem,$r); } }
i'm sure i've missed some specifically thread handling documentation somewhere, as pretty much every other problem (e.g. 1 + 0 = 72) i've encountered making this was solved through perldocs or here
|
|---|