in reply to Re^2: Tread::Queue enqueue blocks all of my threads
in thread Tread::Queue enqueue blocks all of my threads

Indeed, Thread::Queues relatively recently acquired ability to convey unshared lexical structures by automatically sharing them leaks like the proverbial sieve. As demonstrated by this:

#! perl -slw use strict; use Data::Dump qw[ pp ]; use threads; use threads::shared; use thread::queue; our $THREADS ||= 5; --$THREADS; my @Qins = map Thread::Queue->new(), 0 .. $THREADS; my $Qout = Thread::Queue->new; sub worker { my $tid = threads->tid; warn "$tid: starting\n"; my( $Qin, $Qout ) = @_; while( my $work = $Qin->dequeue ) { my %h = ( 'a'..'z' ); $Qout->enqueue( \%h ); } warn "$tid: ending\n"; } my @threads = map threads->new( \&worker, $Qins[ $_ ], $Qout ), 0 .. $ +THREADS; my $tOut = async{ while( my $out = $Qout->dequeue ) { pp $out; } warn "output thread finished\n"; }; my $i = 0; while( <> ) { $Qins[ $i ]->enqueue( $_ ); $i %= $THREADS; } warn "Finished reading & queing file\n"; $_->enqueue( undef ) for @Qins; $_->join for @threads; $Qout->enqueue( undef ); $tOut->join; warn "main finished\n";

Contrast that with this version that queues simple scalars which doesn't leak at all:

#! perl -slw use strict; use Data::Dump qw[ pp ]; use threads qw[ yield ]; use threads::shared; use thread::queue; our $THREADS ||= 5; our $MAX ||= 1e3; --$THREADS; my $Qin = Thread::Queue->new; my $Qout = Thread::Queue->new; sub worker { my $tid = threads->tid; warn "$tid: starting\n"; my( $Qin, $Qout ) = @_; while( my $work = $Qin->dequeue ) { $Qout->enqueue( "$work - processed by thread $tid" ); } warn "$tid: ending\n"; } my @threads = map threads->new( \&worker, $Qin, $Qout ), 0 .. $THREADS +; my $tOut = async{ while( my $out = $Qout->dequeue ) { print $out; } warn "output thread finished\n"; }; my $i = 0; while( <> ) { printf STDERR "\rPending: %d (%d)", $Qin->pending, $i++; $Qin->enqueue( $_ ); # yield while $Qin->pending > $MAX; } warn "Finished reading & queing file\n"; $Qin->enqueue( undef ) for @threads; $_->join for @threads; $Qout->enqueue( undef ); $tOut->join; warn "main finished\n";

You will have to raise a bug report against the module and hope that it can be fixed. That said, from my breif reading of the code, I wouldn't get my hopes up for a quick fix. If at all.

The only alternative I can offer you is to avoid queueing hashes. If I need to pass more than one piece of information between threads, it has been my habit to join them into a scalar before enquing them and spliting them on dequeue. I've found this to be far more efficient that either Storable freeze/thaw or the automatic sharing of structures as currently implemented. Most importantly, it doesn't leak!


Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.
RIP PCW It is as I've been saying!(Audio until 20090817)

Replies are listed 'Best First'.
Re^4: Tread::Queue enqueue blocks all of my threads
by sivert (Initiate) on Nov 11, 2009 at 16:23 UTC

    That was really helpful, thanks! My computer's tied up running this program on a large file at the moment, but I'll try the modified version out soon and see if it works