in reply to Re: Thread::Queue locking question
in thread Thread::Queue locking question
################################################################# sub process_output { ################################################################# my $self = shift; use Text::CSV; my $csv = Text::CSV->new(); if (scalar @{$self->{redo_inotify_output}} > 1) { my $do_overs = scalar @{$self->{redo_inotify_output}} - 1; $self->debug(qq{Appending do-overs from previous run: $do_over +s.\n}); $q->enqueue(@{$self->{redo_inotify_output}}); @{$self->{redo_inotify_output}} = ''; } NOTIFY: while ( my $line = $q->dequeue_nb() ) { chomp $line; $self->debug(qq{Reading: $line\n}); my ($dir, $file, $event, $file_path); if ($csv->parse($line)) { my @columns = $csv->fields(); $dir = $columns[0]; $event = $columns[1]; $file = $columns[2]; $file_path = $dir . $file; } ### Go to next item in queue if we don't have a file or direct +ory. if (! $dir || ! $file) { $self->debug(qq{Dir: $dir or File: $file is undefined.\n}) +; next NOTIFY; } ### Go to next item in queue if source file not longer exists. if (! -e $file_path && $event ne "DELETE") { $self->debug(qq{Source file no longer exists: $file_path.\ +n}); next NOTIFY; } ### Go to next item in queue if $file_path matches exclusion. for my $pattern (@{$self->{patterns}}) { if ($file =~ /$pattern/) { $self->debug(qq{Exclusion pattern matched: $pattern.\n +}); next NOTIFY; } } ### For files under 100MB, only transfer them every minute. ### For files over 100MB, only transfer them every 5 minutes. ### These items are moved to the end of the queue. if ($self->{transfer_times}{$file_path} && $event ne "DELETE") + { if ((stat($file_path))[7] > 104857600 && time() - $self->{transfer_times}{$file_path} < 300) +{ $self->debug(qq{Skipping $file_path. Transferred < 30 +0 } . qq{seconds ago.\n}); ### only push to redo list if not in list. if (! grep { /$line/ } @{$self->{redo_inotify_output}} +) { push @{$self->{redo_inotify_output}}, $line; } next NOTIFY; } if (time() - $self->{transfer_times}{$file_path} < 30) { $self->debug(qq{Skipping $file_path. Transferred < 30 + } . qq{seconds ago.\n}); ### only push to redo list if not in list. if (! grep { /$line/ } @{$self->{redo_inotify_output}} +) { push @{$self->{redo_inotify_output}}, $line; } next NOTIFY; } } ### Populate Transfer_times with the current epoch. $self->{transfer_times}{$file_path} = time() if $event ne "DEL +ETE"; ### Rsync the file the the target host. my $cmd = 'rsync -aH --delete' . ' ' . $file_path . ' ' . $self->{target_host} . $self->{sync_path} . ':' . $file_path; if ($event eq "DELETE") { $cmd = 'rsh ' . $self->{target_host} . ' ' . qq{"[ -f \"$file_path\" ] && rm } . $self->{sync_path} . $file_path . '"'; } system $cmd; $self->debug(qq{$cmd\n}); } }
|
|---|
| Replies are listed 'Best First'. | |
|---|---|
|
Re^3: Thread::Queue locking question
by ikegami (Patriarch) on Apr 21, 2008 at 18:10 UTC |