use threads; use threads::shared qw(share); use Thread::Queue; use Thread::Semaphore; my $q : shared = Thread::Queue->new(); my $loading : shared = 1; my $threads : shared = 0; my $thr = threads->create( \&load_queue, $q, \$loading ); $thr->detach(); $|++; my @threads; for ( 1 .. 6 ) { my $thr = threads->create( \&{type_issues}, $q, \$loading, $threads ); push @threads, $thr; } $_->join() for @threads; sub load_queue { my $q = shift; my $loading = shift; my $i = 1; my $ary = &share( [] ); while ( my $v = ["this is a test" x 250] ) { # print "$v->[0]\n"; push @$ary, $v->[0]; unless ( $i++ % 10000 ) { print "$i\n"; $q->enqueue($ary); $ary = &share( [] ); while ( $q->pending() > 20 ) { # try not to eat tooo much memory select(undef,undef,undef,.1); } } } $q->enqueue($ary) if @$ary; $$loading = 0; } sub type_issues { my ( $q, $loading ) = @_; #my $DB = IDC::Data->new( bes => 'webdev' ); print "Type issues\n"; my $issues; while ( ( $issues = $q->dequeue() ) ) { print "Got ".@$issues." - ".$q->pending()."\n"; select(undef,undef,undef,.1); } }