bago has asked for the wisdom of the Perl Monks concerning the following question:
I have some questions about multi-threading with perl.
I have to parse large textfiles for information and produce output in an different format. One line - one data record. After observing that the parsing alone took around 3/4 of time I changed my implementation as follows:
It worked. But it took more time than the single thread version. Some research and testing pointed to queue->enqueue. If you enqueue a hash a deep copy is made. So I converted the hashes to strings. And everything worked as expected.
I made up 2 example programs showing my line of process. I left out the reading and parsing of the textfile. Instead I put in an sub to make a random hash large enough. On my notebook version one takes ~4sec version 2 takes 1.8sec.
I am not satisfied with the 2. version, because it ist some kind of poor mans work around. It only works for pure key/data pairs and depends on a clever choosing of the string delimiter.
I am on win7 64Bit and activePerl 5.1008 and strawberry 5.12
use strict; use threads; use Thread::Queue; my ( $cnt, $_th_count_parse, $_queue_parse, $_queue_qualify, @th ); # get time of start use win32; my ($runtime) = Win32::GetTickCount(); # feed worker for ( my $i = 0 ; $i < 10000 ; $i++ ) { # read from file and put in $_ here my $dummy = 'dummy'; $cnt++; $_queue_parse->enqueue( [ $cnt, \$dummy ] ); } # send command: stop for every thread $_queue_parse->enqueue( [ -1, 0 ] ); $_queue_parse->enqueue( [ -1, 0 ] ); # recieve answer RecieveAnswer(); # join threads $th[0]->join(); $th[1]->join(); $runtime = ( Win32::GetTickCount() - $runtime ) / 1000; print "Zeit: $runtime sec\n"; # sub RecieveAnswer { my $thclosed; while ( my $temp = $_queue_qualify->dequeue ) { if ( $$temp{'queue_id'} == -1 ) { $thclosed++; } if ( $thclosed == 2 ) { return 0 } # do work here } } # mock-sub for producing a hash sub ParseDok { my %hash; for ( my $i = int( rand(20) ) ; $i < 100 ; $i++ ) { $hash{ $i * 100 + int( rand(100) ) } = 'Test' . $i; } return \%hash; } # thread handler sub handle_thread_parser { my $whoiam = shift; my $wq = shift; my $qq = shift; my ( $temp, $element, $wrkcnt ); # work loop while ( $element = $wq->dequeue ) { # work finished? if ( $$element[0] == -1 ) { my $t = { "queue_id" => -1 }; $qq->enqueue($t); print "Thread $whoiam closing on command with workload $wr +kcnt\n"; return 0; } # do work $wrkcnt++; $temp = ParseDok( ${ $$element[1] } ) || ''; $$temp{'queue_id'} = $$element[0]; # deliver $qq->enqueue($temp); } } # setup worker threads BEGIN { $_th_count_parse = 2; # Queues $_queue_parse = Thread::Queue->new; $_queue_qualify = Thread::Queue->new; # go! for ( my $i = 0 ; $i < $_th_count_parse ; $i++ ) { $th[$i] = threads->new( \&handle_thread_parser, $i, $_queue_pa +rse, $_queue_qualify ); }
Here the 2. version.
I changed sub RecieveAnswer and sub handle_thread_parser to to pass a string instead of a hash.
use strict; use threads; use Thread::Queue; my ( $cnt, $_th_count_parse, $_queue_parse, $_queue_qualify, @th ); # get time of start use win32; my ($runtime) = Win32::GetTickCount(); # feed worker for ( my $i = 0 ; $i < 10000 ; $i++ ) { # read from file and put in $_ here my $dummy = 'dummy'; $cnt++; $_queue_parse->enqueue( [ $cnt, \$dummy ] ); } # send command: stop for every thread $_queue_parse->enqueue( [ -1, 0 ] ); $_queue_parse->enqueue( [ -1, 0 ] ); # recieve answer RecieveAnswer(); # join threads $th[0]->join(); $th[1]->join(); $runtime = ( Win32::GetTickCount() - $runtime ) / 1000; print "Zeit: $runtime sec\n"; # sub RecieveAnswer { my $thclosed; while ( my $temp = $_queue_qualify->dequeue ) { my %current = split( '::', $$temp ); if ( $current{'queue_id'} == -1 ) { $thclosed++; } if ( $thclosed == 2 ) { return 0 } # do work here } } # mock-sub for producing a hash sub ParseDok { my %hash; for ( my $i = int( rand(20) ) ; $i < 100 ; $i++ ) { $hash{ $i * 100 + int( rand(100) ) } = 'Test' . $i; } return \%hash; } # thread handler sub handle_thread_parser { my $whoiam = shift; my $wq = shift; my $qq = shift; my ( $temp, $element, $wrkcnt ); # work loop while ( $element = $wq->dequeue ) { # work finished? if ( $$element[0] == -1 ) { my $t = { "queue_id" => -1 }; my $temp1 = join( '::', %$t ); $qq->enqueue( \$temp1 ); print "Thread $whoiam closing on command with workload $wr +kcnt\n"; return 0; } # do work $wrkcnt++; $temp = ParseDok( ${ $$element[1] } ) || ''; $$temp{'queue_id'} = $$element[0]; # deliver my $temp1 = join( '::', %$temp ); $qq->enqueue( \$temp1 ); } } # setup worker threads BEGIN { $_th_count_parse = 2; # Queues $_queue_parse = Thread::Queue->new; $_queue_qualify = Thread::Queue->new; # go! for ( my $i = 0 ; $i < $_th_count_parse ; $i++ ) { $th[$i] = threads->new( \&handle_thread_parser, $i, $_queue_pa +rse, $_queue_qualify ); } }
|
|---|
| Replies are listed 'Best First'. | |
|---|---|
|
Re: passing hashes between threads
by BrowserUk (Patriarch) on Sep 18, 2011 at 01:21 UTC | |
by bago (Scribe) on Sep 18, 2011 at 11:56 UTC | |
by BrowserUk (Patriarch) on Sep 18, 2011 at 12:22 UTC | |
by bago (Scribe) on Sep 18, 2011 at 12:33 UTC | |
by BrowserUk (Patriarch) on Sep 18, 2011 at 12:52 UTC |