in reply to Re: Thread::Queue locking question
in thread Thread::Queue locking question

I do not claim to have found a bug with any of the perl threading modules. I'm sure the bug is in my usage of it (which is limited at best). Here is the method which processes output:
################################################################# sub process_output { ################################################################# my $self = shift; use Text::CSV; my $csv = Text::CSV->new(); if (scalar @{$self->{redo_inotify_output}} > 1) { my $do_overs = scalar @{$self->{redo_inotify_output}} - 1; $self->debug(qq{Appending do-overs from previous run: $do_over +s.\n}); $q->enqueue(@{$self->{redo_inotify_output}}); @{$self->{redo_inotify_output}} = ''; } NOTIFY: while ( my $line = $q->dequeue_nb() ) { chomp $line; $self->debug(qq{Reading: $line\n}); my ($dir, $file, $event, $file_path); if ($csv->parse($line)) { my @columns = $csv->fields(); $dir = $columns[0]; $event = $columns[1]; $file = $columns[2]; $file_path = $dir . $file; } ### Go to next item in queue if we don't have a file or direct +ory. if (! $dir || ! $file) { $self->debug(qq{Dir: $dir or File: $file is undefined.\n}) +; next NOTIFY; } ### Go to next item in queue if source file not longer exists. if (! -e $file_path && $event ne "DELETE") { $self->debug(qq{Source file no longer exists: $file_path.\ +n}); next NOTIFY; } ### Go to next item in queue if $file_path matches exclusion. for my $pattern (@{$self->{patterns}}) { if ($file =~ /$pattern/) { $self->debug(qq{Exclusion pattern matched: $pattern.\n +}); next NOTIFY; } } ### For files under 100MB, only transfer them every minute. ### For files over 100MB, only transfer them every 5 minutes. ### These items are moved to the end of the queue. if ($self->{transfer_times}{$file_path} && $event ne "DELETE") + { if ((stat($file_path))[7] > 104857600 && time() - $self->{transfer_times}{$file_path} < 300) +{ $self->debug(qq{Skipping $file_path. Transferred < 30 +0 } . qq{seconds ago.\n}); ### only push to redo list if not in list. if (! grep { /$line/ } @{$self->{redo_inotify_output}} +) { push @{$self->{redo_inotify_output}}, $line; } next NOTIFY; } if (time() - $self->{transfer_times}{$file_path} < 30) { $self->debug(qq{Skipping $file_path. Transferred < 30 + } . qq{seconds ago.\n}); ### only push to redo list if not in list. if (! grep { /$line/ } @{$self->{redo_inotify_output}} +) { push @{$self->{redo_inotify_output}}, $line; } next NOTIFY; } } ### Populate Transfer_times with the current epoch. $self->{transfer_times}{$file_path} = time() if $event ne "DEL +ETE"; ### Rsync the file the the target host. my $cmd = 'rsync -aH --delete' . ' ' . $file_path . ' ' . $self->{target_host} . $self->{sync_path} . ':' . $file_path; if ($event eq "DELETE") { $cmd = 'rsh ' . $self->{target_host} . ' ' . qq{"[ -f \"$file_path\" ] && rm } . $self->{sync_path} . $file_path . '"'; } system $cmd; $self->debug(qq{$cmd\n}); } }

Replies are listed 'Best First'.
Re^3: Thread::Queue locking question
by ikegami (Patriarch) on Apr 21, 2008 at 18:10 UTC

    I do not claim to have found a bug with any of the perl threading modules.

    I suppose you could have meant that you thought were misusing the module. But that doesn't change anything. The question I asked ("How did you arrive to that conclusion?") still applied. You specified a very specific problem (the lock on the queue isn't being released) without showing any evidence of that. You didn't even mention where the code blocks! It's hard to debug a problem without knowing anything about it.

    In fact, from looking at the code you have since posted, I suspect the opposite problem. It's not one of your threads blocking itself to oblivion, it's one of your threads not waiting at all.

    Unless there's already something in the queue, that function will exit immediately after being called since $q->dequeue_nb will return undef. The function will also exit as soon as the queue is empties, even if more items will later be added to it.

    That's not necessarily a problem. I don't know how process_output is used. But if I were to venture a guess at what the rest of the code looks like, $q->dequeue_nb should be $q->dequeue.

    There's a second issue. If you put an empty string or the number zero in the queue, the loop will exit prematurely.
    while ( my $line = $q->dequeue_nb() ) or
    while ( my $line = $q->dequeue() )
    should be
    while ( defined( my $line = $q->dequeue_nb() ) ) or
    while ( defined( my $line = $q->dequeue() ) )
    since those methods could return false value when they aren't

    Finally, it's really weird to see use Text::CSV; inside a function. Do you understand that the statement will execute when process_output is compiled (even if it's never called), and not when process_output is called? It would be better if you put it at the top of your program.

    Update: Readability enhancements.