in reply to Re^4: shared scalar freed early
in thread shared scalar freed early
my $request_q; my $response_q; sub testc { my $t0 = time; $request_q = Thread::Queue->new(); $response_q = Thread::Queue->new(); my $merge_thread = threads->create(\&outputc); my @worker_threads; for (1..$threads) { push @worker_threads, async { while ( my $job = $request_q->dequeue() ) { $response_q->enqueue([ $job->[0], worka($job->[1]) ]); } }; } inputc(); $request_q->end(); $_->join() for @worker_threads; $response_q->end(); $merge_thread->join(); printf STDERR "testc done in %0.02f seconds\n",time - $t0; } sub inputc { my $id = -1; foreach my $a(1..$iterations) { my @chunk = (); foreach my $b(1..$chunksize) { my %adata = %data; $adata{'.'} = $a * $b; push(@chunk,\%adata); } $request_q->enqueue([ ++$id, \@chunk ]); } } sub outputc { my $next_id = 0; my %cache; open(my $fh,'>',$aoutput); while( my $job = $response_q->dequeue() ) { if ($job->[0] != $next_id) { $cache{$job->[0]} = $job; next; } do { foreach my $data(@{ $job->[1] }) { foreach my $key(sort keys %$data) { print {$fh} $$data{$key}; } print {$fh} "\n"; } $job = delete($cache{++$next_id}); } while $job; } close($fh); }
Untested. Untimed.
By the way, notice how the worker didn't have to change at all (testc calls worka)? That's good. That means the work is independent of the communication.
|
|---|
| Replies are listed 'Best First'. | |
|---|---|
|
Re^6: shared scalar freed early
by chris212 (Scribe) on Feb 23, 2017 at 21:11 UTC | |
by ikegami (Patriarch) on Feb 23, 2017 at 21:59 UTC |