Beefy Boxes and Bandwidth Generously Provided by pair Networks
XP is just a number
 
PerlMonks  

Re^5: shared scalar freed early

by ikegami (Patriarch)
on Feb 23, 2017 at 20:33 UTC ( [id://1182670]=note: print w/replies, xml ) Need Help??


in reply to Re^4: shared scalar freed early
in thread shared scalar freed early

my $request_q; my $response_q; sub testc { my $t0 = time; $request_q = Thread::Queue->new(); $response_q = Thread::Queue->new(); my $merge_thread = threads->create(\&outputc); my @worker_threads; for (1..$threads) { push @worker_threads, async { while ( my $job = $request_q->dequeue() ) { $response_q->enqueue([ $job->[0], worka($job->[1]) ]); } }; } inputc(); $request_q->end(); $_->join() for @worker_threads; $response_q->end(); $merge_thread->join(); printf STDERR "testc done in %0.02f seconds\n",time - $t0; } sub inputc { my $id = -1; foreach my $a(1..$iterations) { my @chunk = (); foreach my $b(1..$chunksize) { my %adata = %data; $adata{'.'} = $a * $b; push(@chunk,\%adata); } $request_q->enqueue([ ++$id, \@chunk ]); } } sub outputc { my $next_id = 0; my %cache; open(my $fh,'>',$aoutput); while( my $job = $response_q->dequeue() ) { if ($job->[0] != $next_id) { $cache{$job->[0]} = $job; next; } do { foreach my $data(@{ $job->[1] }) { foreach my $key(sort keys %$data) { print {$fh} $$data{$key}; } print {$fh} "\n"; } $job = delete($cache{++$next_id}); } while $job; } close($fh); }

Untested. Untimed.

By the way, notice how the worker didn't have to change at all (testc calls worka)? That's good. That means the work is independent of the communication.

Replies are listed 'Best First'.
Re^6: shared scalar freed early
by chris212 (Scribe) on Feb 23, 2017 at 21:11 UTC
    Your testc does look cleaner than my testb and runs a little faster than testb, but is till about 5x slower than testa.

      Interesting. Will have to ponder that (later). Some factors, ranked in estimated descending order of effect:

      • Because the thread does basically no work, the output thread is the bottleneck, and there's definitely less work being done there in testa.
      • By using a hash instead of thread->join, we're moving work from the OS into Perl, which is going to be slower.
      • Later tests are hurt by the variables created by the earlier tests. This should be tiny, though.
      • You have next to no variables in your test program. That's going to help testa.

      Here's another solution, but I doubt it'll catch up to testa.

      my $request_q; my $response_q; sub testd { my $t0 = time; $request_q = Thread::Queue->new(); $response_q = Thread::OrderedQueue->new(); my $merge_thread = threads->create(\&outputd); my @worker_threads; for (1..$threads) { push @worker_threads, async { while ( my $job = $request_q->dequeue() ) { $response_q->enqueue( $job->[0], worka($job->[1]) ); } }; } inputd(); $request_q->end(); $_->join() for @worker_threads; $response_q->end(); $merge_thread->join(); printf STDERR "testd done in %0.02f seconds\n",time - $t0; } sub inputd { my $id = -1; foreach my $a(1..$iterations) { my @chunk = (); foreach my $b(1..$chunksize) { my %adata = %data; $adata{'.'} = $a * $b; push(@chunk,\%adata); } $request_q->enqueue([ $response_q->get_token(), \@chunk ]); } } sub outputd { open(my $fh,'>',$aoutput); while( my $job = $response_q->dequeue() ) { foreach my $data(@$job) { foreach my $key(sort keys %$data) { print {$fh} $$data{$key}; } print {$fh} "\n"; } } close($fh); }

      Thread/OrderedDequeue.pm:

      package Thread::OrderedQueue; use strict; use warnings; use threads::shared; sub new { my $class = shift; my $self = bless(\( my %self :shared ), $class); $self->{ q } = \( my %q: shared ); $self->{ last_given_ref } = \( my $last_given :shared = -1 ); $self->{ next_read } = 0; $self->{ ended } = 0; return $self; } sub get_token { my $self = shift; lock(${ $self->{last_given_ref} }); # We can't check if ended because we didn't lock %$self. return ++${ $self->{last_given_ref} }; } sub end { my $self = shift; lock(%$self); $self->{ended} = 1; cond_signal(%$self); } sub enqueue { my $self = shift; lock(%$self); die("!!!") if $self->{ended}; $self->{q}{ $_[0] } = shared_clone( $_[1] ); cond_signal(%$self) if $_[0] == $self->{next_read}; } sub dequeue { my $self = shift; lock(%$self); while (1) { if (exists( $self->{q}{ $self->{next_read} } )) { my $rv = delete( $self->{q}{ $self->{next_read}++ } ); cond_signal(%$self); return $rv; } if ($self->{ended}) { cond_signal(%$self); return undef; } cond_wait(%$self); } } 1;

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://1182670]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others goofing around in the Monastery: (5)
As of 2024-04-19 07:09 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found