in reply to Re^2: Tread::Queue enqueue blocks all of my threads
in thread Tread::Queue enqueue blocks all of my threads
Indeed, Thread::Queues relatively recently acquired ability to convey unshared lexical structures by automatically sharing them leaks like the proverbial sieve. As demonstrated by this:
#! perl -slw use strict; use Data::Dump qw[ pp ]; use threads; use threads::shared; use thread::queue; our $THREADS ||= 5; --$THREADS; my @Qins = map Thread::Queue->new(), 0 .. $THREADS; my $Qout = Thread::Queue->new; sub worker { my $tid = threads->tid; warn "$tid: starting\n"; my( $Qin, $Qout ) = @_; while( my $work = $Qin->dequeue ) { my %h = ( 'a'..'z' ); $Qout->enqueue( \%h ); } warn "$tid: ending\n"; } my @threads = map threads->new( \&worker, $Qins[ $_ ], $Qout ), 0 .. $ +THREADS; my $tOut = async{ while( my $out = $Qout->dequeue ) { pp $out; } warn "output thread finished\n"; }; my $i = 0; while( <> ) { $Qins[ $i ]->enqueue( $_ ); $i %= $THREADS; } warn "Finished reading & queing file\n"; $_->enqueue( undef ) for @Qins; $_->join for @threads; $Qout->enqueue( undef ); $tOut->join; warn "main finished\n";
Contrast that with this version that queues simple scalars which doesn't leak at all:
#! perl -slw use strict; use Data::Dump qw[ pp ]; use threads qw[ yield ]; use threads::shared; use thread::queue; our $THREADS ||= 5; our $MAX ||= 1e3; --$THREADS; my $Qin = Thread::Queue->new; my $Qout = Thread::Queue->new; sub worker { my $tid = threads->tid; warn "$tid: starting\n"; my( $Qin, $Qout ) = @_; while( my $work = $Qin->dequeue ) { $Qout->enqueue( "$work - processed by thread $tid" ); } warn "$tid: ending\n"; } my @threads = map threads->new( \&worker, $Qin, $Qout ), 0 .. $THREADS +; my $tOut = async{ while( my $out = $Qout->dequeue ) { print $out; } warn "output thread finished\n"; }; my $i = 0; while( <> ) { printf STDERR "\rPending: %d (%d)", $Qin->pending, $i++; $Qin->enqueue( $_ ); # yield while $Qin->pending > $MAX; } warn "Finished reading & queing file\n"; $Qin->enqueue( undef ) for @threads; $_->join for @threads; $Qout->enqueue( undef ); $tOut->join; warn "main finished\n";
You will have to raise a bug report against the module and hope that it can be fixed. That said, from my breif reading of the code, I wouldn't get my hopes up for a quick fix. If at all.
The only alternative I can offer you is to avoid queueing hashes. If I need to pass more than one piece of information between threads, it has been my habit to join them into a scalar before enquing them and spliting them on dequeue. I've found this to be far more efficient that either Storable freeze/thaw or the automatic sharing of structures as currently implemented. Most importantly, it doesn't leak!
|
|---|
| Replies are listed 'Best First'. | |
|---|---|
|
Re^4: Tread::Queue enqueue blocks all of my threads
by sivert (Initiate) on Nov 11, 2009 at 16:23 UTC |