Adamba has asked for the wisdom of the Perl Monks concerning the following question:

Hey, I'm trying to build a Jobs Queue Manager that is Inotyfing a folder for changes, when a job is inserted to the new folder it is being moved to progress folder and if the job hasn't been successfully proccesed, the program moves it to failed fodler. Now, I want to add a feature that after 10 minutes that a job is in failed folder, the program will retry to do the job again by moving the job to the new folder. Here's my code:

use strict; use warnings; package QueueManager; use AnyEvent; use AnyEvent::Filesys::Notify; use Const::Fast; use DDP; use File::Basename; use File::Copy; use File::Remove qw(remove); use File::Slurp; use File::Stat qw(:stat); use List::Util qw(first); use List::MoreUtils qw(natatime); use Moo; use MooX::Types::MooseLike::Base qw<Str Num InstanceOf ArrayRef HashRe +f Int>; use Regexp::Common qw(net); use v5.10.1; with 'MooseX::Role::Loggable'; has notifier_interval => ( is => 'ro', isa => Num, default => sub {30.0}, ); has failed_interval => ( is => 'ro', isa => Num, default => sub{60*10}, ); has number_of_jobs_to_process => ( is => 'ro', isa => Int, default => sub{20}, ); has failed_timer => ( is => 'ro', writer => 'set_failed_timer', ); has notifier => ( is => 'ro', isa => InstanceOf['AnyEvent::Filesys::Notify'], writer => 'set_notifier', ); has add_output_file => ( is => 'ro', isa => Str, default => sub{'.add_route.output'}, ); has del_output_file => ( is => 'ro', isa => Str, default => sub{'.del_route.output'}, ); has jobs_folder_path => ( is => 'ro', isa => Str, default => sub{'queue_manager/jobs'}, ); has job_folders => ( is => 'ro', isa => HashRef, lazy => 1, builder => '_build_job_folders', ); has jobs => ( is => 'rw', isa => ArrayRef, lazy => 1, default => sub{ [] }, clearer => '_clear_jobs', ); has queue => ( is => 'rw', isa => ArrayRef, lazy => 1, default => sub{ [] }, clearer => '_clear_queue', ); has added_jobs => ( is => 'rw', isa => ArrayRef[HashRef], lazy => 1, default => sub { [] }, clearer => '_clear_added_jobs', ); has deleted_jobs => ( is => 'rw', isa => ArrayRef[HashRef], lazy => 1, default => sub { [] }, clearer => '_clear_deleted_jobs', ); has routing_table => ( is => 'rw', isa => ArrayRef[HashRef], lazy => 1, builder => '_build_routing_table', ); has email_contact => ( is => 'ro', isa => Str, default => sub{'some@user.com'}, ); has '+log_to_file' => ( default => sub{1} ); has '+log_path' => ( default => sub{'queue_manager/logs'} ); has '+log_file' => ( default => sub{'queue_manager.log'} ); has '+log_to_stdout' => ( default => sub{1} ); sub _build_job_folders { my $self = shift; my $jobs_folder_path = $self->jobs_folder_path; my %ret_val = ( new => "$jobs_folder_path/new", progress => "$jobs_folder_path/progress", failed => "$jobs_folder_path/failed", ); return \%ret_val; } sub _build_routing_table { my $self = shift; my @ret_val; my @routing_table = `ip route show`; foreach my $line (@routing_table) { # 1.1.1.1 via 2.2.2.2 dev eth0 proto baba my ($ip_address, $next_hop) = $line =~ /^($RE{net}{IPv4}) via +($RE{net}{IPv4}) .*proto zebra\s+$/; if (defined ($ip_address) and defined ($next_hop)) { push @ret_val, { ip_address => $ip_address, next_hop => $n +ext_hop }; } } return \@ret_val; } sub run { my $self = shift; my %job_folders = %{ $self->job_folders }; $self->log("Queue Manager is running..."); $self->set_failed_timer( AnyEvent->timer( interval => $self->failed_interval, cb => sub { $self->post_process_failed_jobs(); }, ) ); $self->set_notifier( AnyEvent::Filesys::Notify->new( dirs => [ $job_folders{'new'} ], interval => $self->notifier_interval, cb => sub { my (@events) = @_; for my $event (@events) { if ($event->is_created and basename($event->path) +!~ /^\./) { push @{ $self->jobs }, $event->path; } } my $queue_timer; $queue_timer = AE::timer $self->notif +ier_interval, 0, sub { $self->process_new_jobs(); undef $queue_timer; }; }, ) ); } sub process_new_jobs { my $self = shift; my $progress_path; my %job_folders = %{ $self->job_folders }; my @new_jobs = @{ $self->jobs }; foreach my $new_job (@new_jobs) { my $file_name = basename($new_job); $progress_path = "$job_folders{'progress'}/$file_name"; move($new_job, $progress_path); $self->enter_queue($progress_path); } $self->_clear_jobs; $self->start_new_jobs(); } sub post_process_failed_jobs { my $self = shift; my %job_folders = %{ $self->job_folders }; my @failed_files = read_dir($job_folders{'failed'}); if (@failed_files) { foreach my $failed_file (@failed_files) { my $file_stat = stat("$job_folders{'failed'}/$failed_file" +); my $result = time - $file_stat->mtime; $self->log("Modified " . int($result/60) . " mins ago"); } } } sub enter_queue { my $self = shift; my ($job_to_process) = shift; my $file_name = basename($job_to_process); my $args = read_file($job_to_process); chomp $args; for ($job_to_process) { when (/add/) { my ($ip_address, $next_hop) = split(/ /, $args); push @{ $self->added_jobs }, { ip_address => $ip_address, next_hop => $next_hop, file_name => $file_name, }; push @{ $self->queue }, "add:$ip_address:$next_hop"; } when (/del/) { my ($ip_address) = $args; push @{ $self->deleted_jobs }, { ip_address => $ip_address, file_name => $file_name, }; push @{ $self->queue }, "del:$ip_address"; } } } sub start_new_jobs { my $self = shift; my $iterator = natatime $self->number_of_jobs_to_process, @{ $self +->queue }; while ( my @values = $iterator->() ) { my $arguments_line = join(' ', @values); $self->log("Processing: $arguments_line"); } $self->_clear_queue; $self->jobs_post_process(); } sub jobs_post_process { my $self = shift; my %job_folders = %{ $self->job_folders }; p $self->added_jobs; foreach my $added_job ( @{ $self->added_jobs } ) { my ($ip_address, $next_hop, $file_name) = ($added_job->{'ip_address'}, $added_job->{'next_hop'}, $added_job->{'file_name'}); my $is_in_routing_table = $self->check_is_in_routing_table($ip +_address); if (not $is_in_routing_table) { $self->log("FAILED job: $file_name"); move("$job_folders{'progress'}/$file_name", "$job_folders{ +'failed'}/$file_name"); $self->send_email($self->add_output_file, $added_job); } else { $self->log("Finished processing job: $file_name"); remove("$job_folders{'progress'}/$file_name"); } } $self->_clear_added_jobs; foreach my $deleted_job ( @{ $self->deleted_jobs } ) { my ($ip_address, $file_name) = ($deleted_job->{'ip_address'}, $deleted_job->{'file_name'}); my $is_in_routing_table = $self->check_is_in_routing_table($ip +_address); if ($is_in_routing_table) { $self->log("FAILED job: $file_name"); move("$job_folders{'progress'}/$file_name", "$job_folders{ +'failed'}/$file_name"); $self->send_email($self->del_output_file, $deleted_job); } else { $self->log("Finished processing job: $file_name"); remove("$job_folders{'progress'}/$file_name"); } } $self->_clear_deleted_jobs; } sub check_is_in_routing_table { my $self = shift; my ($ip_address) = shift; $self->routing_table($self->get_routing_table); my @addresses = @{ $self->routing_table }; my ($comparable_ip) = $ip_address =~ /($RE{net}{IPv4})\/32$/; my $ret_val = first { $_->{'ip_address'} eq $comparable_ip } @addr +esses; return $ret_val; } sub get_routing_table { my $self = shift; my @routing_table = `ip route show`; my @ret_val; foreach my $line (@routing_table) { # 1.1.1.1 via 2.2.2.2 dev eth0 proto baba my ($ip_address, $next_hop) = $line =~ /^($RE{net}{IPv4}) via +($RE{net}{IPv4}) .*proto zebra\s+$/; if (defined ($ip_address) and defined ($next_hop)) { push @ret_val, { ip_address => $ip_address, next_hop => $n +ext_hop }; } } return @ret_val; } 1;

Now, my problem is that failed_timer runs only once, when the program starts, and that's it... What have I done wrong?

BTW, I've asked the same question in Stack Overflow in that thread: StackOverflow

Replies are listed 'Best First'.
Re: Can't run two AnyEvent::timers in the same time?
by zwon (Abbot) on Aug 29, 2013 at 15:36 UTC
    That's a lot of code and you don't really show how you run it. In general you can run several AnyEvent timers at the same time:
    use 5.010; use strict; use warnings; use AnyEvent; my $cv = AE::cv; my $t1 = AE::timer 0.2, 0.2, sub { say "t1" }; my $t2 = AE::timer 1, 0, sub { say "t2"; $cv->send; }; $cv->recv;
    Perhaps you're blocking somewhere?