#!/opt/perl/bin/perl use strict; use threads; use Thread::Queue; use Thread::Semaphore; use threads::shared; use Time::HiRes qw( time ); my $iterations = 100; my $chunksize = 50; my $threads = 5; my $aoutput = "a.txt"; my $boutput = "b.txt"; my $q; my $s; my $outq; my %data = (); foreach('a'..'z') { $data{$_} = $_ x 200; } testa(); testb(); system($^O eq 'MSWin32' ? 'fc' : 'diff',$aoutput,$boutput); sub testa { my $t0 = time; $q = Thread::Queue->new; $s = Thread::Semaphore->new($threads); my ($outth) = threads->create(\&outputa); inputa(); $q->end; $outth->join(); printf STDERR "testa done in %0.02f seconds\n",time - $t0; } sub worka { my ($data) = @_; my @ret = (); foreach my $chunk(@$data) { my %output = (); foreach my $key(keys %$chunk) { if($key eq '.') { $output{$key} = $$chunk{$key}; next; } my $val = $$chunk{$key}; my $uc = uc($key); $val =~ s/$key/$uc/g; $output{$key} = $val; } push(@ret,\%output); } return(\@ret); } sub inputa { foreach my $a(1..$iterations) { my @chunk = (); foreach my $b(1..$chunksize) { my %adata = %data; $adata{'.'} = $a * $b; push(@chunk,\%adata); } $s->down(); my ($th) = threads->create(\&worka,\@chunk); $q->enqueue($th); } } sub outputa { open(my $fh,'>',$aoutput); while(1) { my $th = $q->dequeue(); last unless(defined $th); my ($output) = $th->join(); $s->up(); foreach my $data(@$output) { foreach my $key(sort keys %$data) { print {$fh} $$data{$key}; } print {$fh} "\n"; } } close($fh); } sub testb { my $t0 = time; $q = Thread::Queue->new; $outq = Thread::Queue->new; $s = Thread::Semaphore->new($threads); my @threads = (); foreach(1..$threads) { my $th = threads->create(\&workb); push(@threads,$th); } my ($outth) = threads->create(\&outputb); inputb(); $q->end; $outq->end; $_->join() foreach(@threads,$outth); printf STDERR "testb done in %0.02f seconds\n",time - $t0; } sub workb { while(1) { my $arr = $q->dequeue; last unless(defined $arr); $s->up(); my ($input,$output,$sem) = @$arr; foreach my $data(@$input) { my %ret :shared = (); foreach my $key(keys %$data) { if($key eq '.') { $ret{$key} = $$data{$key}; next; } my $val = $$data{$key}; my $uc = uc($key); $val =~ s/$key/$uc/g; $ret{$key} = $val; } push(@$output,\%ret); } $sem->up(); } } sub inputb { foreach my $a(1..$iterations) { my @input = (); foreach my $b(1..$chunksize) { my %bdata = %data; $bdata{'.'} = $a * $b; push(@input,\%bdata); } my @output :shared = (); my $sem = Thread::Semaphore->new(0); $s->down(); $outq->enqueue([\@output,$sem]); $q->enqueue([\@input,\@output,$sem]); } } sub outputb { open(my $fh,'>',$boutput); while(1) { my $arr = $outq->dequeue; last unless(defined $arr); my ($output,$sem) = @$arr; $sem->down(); foreach my $data(@$output) { foreach my $key(sort keys %$data) { print {$fh} $$data{$key}; } print {$fh} "\n"; } } close($fh); }