http://qs1969.pair.com?node_id=554254


in reply to A perl daemon

This node gets a fair bit of traffic, so I thought I'd post the code that I used to implement the above. The reason that I record the number of attempts is that, some jobs that run frequently may still be running when the next job tries to start. If they are STILL running after 5 attempts, something is wrong and you should probably kill the process.

The only difference in the logic is that the check for whether the job is already running happens after the fork, instead of before, as I was getting locking issues with database handles being passed from parent to child.

It also emails you if any errors occur, or a job fails to run.

I'd welcome comments on the code. It could be CPANed quite easily - just need to add hooks for the database calls.

#=============================================== # Daemon module - you need to implement your own database calls #=============================================== package My::Daemon; use strict; use warnings FATAL => 'all', NONFATAL => 'redefine'; use base qw (My::Base); use Schedule::Cron; use Proc::PID::File(); use POSIX(); use MIME::Lite(); require Exporter; our @ISA = qw(Exporter); our @EXPORT = qw(logmsg); our $Daemon; #=================================== sub new { #=================================== my $proto = shift; my $class = ref ($proto) || $proto; my $self = {}; bless($self,$class); my $params = ref $_[0] ? shift : {@_}; foreach my $key (keys %$params) { $self->$key($params->{$key}); } $Daemon = $self; return $self; } # Start cron daemon #=================================== sub start { #=================================== my $self = shift; my $name = $self->name; if (my $pid = fork()) { # Parent exit 0; } # Child if (Proc::PID::File->running({name=>$name})) { die "Couldn't start : '$name' already running"; } $self->init_cron_daemon; } # Stop cron daemon #=================================== sub stop { #=================================== my $self = shift; my $name = $self->name; my $pid; unless ($pid = Proc::PID::File->running({name=>$name})) { print "'$name' not running\n"; return; } $|=1; print "Shutting down '$name'"; kill -3 => $pid; my $i = 0; while (kill (-0 => $pid) && $i++ < 30) { print '.'; sleep 1; } unless (kill -0 => $pid) { print "\nShut down complete\n"; return 0; } print "\nNot responding - sending kill signal\n"; kill -9 => $pid; return; } # Start cron daemon #=================================== sub status { #=================================== my $self = shift; my $name = $self->name; if (Proc::PID::File->running({name=>$name})) { print "'$name' is running\n"; } else { print "'$name' is not running\n" } exit 0; } #=================================== sub init_cron_daemon { #=================================== my $self = shift; my $name = $self->name; my $cron; print "Starting '$name'\n"; *CORE::GLOBAL::warn = \&warn_to_log; $SIG{__DIE__} = \&die_to_log; $SIG{__WARN__} = \&warn_to_log; eval { chdir '/' or die $!; open STDIN, '/dev/null' or die $!; open(STDOUT, ">>".$self->logfile); open(STDERR, "+>&STDOUT"); logmsg('Starting'); POSIX::setsid or die $!; my $preload_class = $self->preload_class; eval "require $preload_class"; $cron = new Schedule::Cron(\&dispatcher); # Setup cron file and load required modules foreach my $job (@{$self->jobs}) { my $class = $job->[1]||''; eval "require $class"; die "Couldn't load job '$class' : $@" if $@; $cron->add_entry(@$job); } }; if ($@) { die "Couldn't start child '$name' : $@"; } local $SIG{CHLD} = 'IGNORE'; $cron->run( detach => 0 ); } #=================================== sub dispatcher { #=================================== my $self = $Daemon; my $class = shift; my $handler = shift; my $args = [@_]; local $SIG{CHLD} = 'DEFAULT'; my $jobname = $class.'::'.$handler; logmsg("Starting : $jobname"); my $DB = $self->db_class; if (Proc::PID::File->running({name=>$jobname})) { # Pseudocode whose implementation depends on your system # Increment number of attempts # SQL => 'UPDATE jobs SET attempts=attempts+1 WHERE name + = ?' # If no of rows affected == 1 # SQL => 'SELECT attempts FROM jobs WHERE name = ?' # If attempts > 5 die "Attempted to start job '$jobname' $attempts time +s" # else warn "Couldn't start : '$jobname' already running"; # else # die "WEIRD Couldn't find job '$jobname' in the jobs tabl +e" exit 0; } # SQL => 'UPDATE jobs SET attempts = 0 WHERE name = ?' # get the last ID processed, or the last time job was run (depends + on the job type) # SQL => 'SELECT last_id,last_run FROM jobs WHERE name = ?' my $current_time = timestamp(); my $last_run = $results->{last_run} ? $results->{last_run}->strftime('%F %T') : '1970-01-01'; my $last = { id => $results->{last_id}||0, run => $last_run, time => $current_time, }; chdir '/' or die $!; open STDIN, '/dev/null' or die $!; # Run handler which does job and updates last_id (if required) eval {$class->$handler($last,$args)}; if ($@) { die "Error running '$jobname' : $@"; } # Update jobs table # SQL => <<SQL}); # REPLACE INTO jobs ( # name # , attempts # , last_id # , last_run # ) VALUES (?,0,?,?) logmsg ("Ending '$jobname'"); exit 0; } #=================================== sub logmsg { #=================================== print format_msg($_[0]); } #=================================== sub warn_to_log { #=================================== print format_msg('**** '.$_[0]); } #=================================== sub die_to_log { #=================================== return if $^S; my $error = $_[0]; my $self = $Daemon; eval { my $email = $self->email; my $msg = MIME::Lite->new( From => $email->{from}, To => $email->{to}, Subject => 'Error running cron daemon', Data => $error, Encoding => 'quoted-printable', ); $msg->attr('content-type' => 'text/plain; charset=utf-8; forma +t=flowed'); $msg->send_by_sendmail; }; if ($@) { $error.= ' Additionally, an error occurred sending the alert +email : '.$@; } die format_msg('*** DIE!!! **** '.$error); } #=================================== sub format_msg { #=================================== return (timestamp(),' [',(caller(1))[0]," $$] : ",$_[0]."\n"); } #=================================== sub timestamp { #=================================== my ($sec,$min,$hour,$day,$mon,$year) = localtime; return sprintf ("%4d-%02d-%02d %02d:%02d:%02d", $year+1900,++$mon,$day,$hour,$min,$sec); } #=================================== sub logfile { #=================================== my $self = shift; $self->{logfile} = shift if $_[0]; return $self->{logfile}; } #=================================== sub email { #=================================== my $self = shift; $self->{email} = shift if $_[0]; return $self->{email}; } #=================================== sub jobs { #=================================== my $self = shift; $self->{jobs} = shift if $_[0]; return $self->{jobs}; } # You can subclass this module and override this name call to # run several cron daemons #=================================== sub name { __PACKAGE__ } #=================================== 1
#=============================================== # MySQL table : jobs #=============================================== # For recording the number of attempts to run each job, and the point # where the job should start reprocessing (ie everything after last_id +, or # everything changed after time last_run) +----------+----------------------+------+-----+---------------------+ +-------+ | Field | Type | Null | Key | Default | + Extra | +----------+----------------------+------+-----+---------------------+ +-------+ | name | varchar(200) | | PRI | | + | | last_id | bigint(20) unsigned | | | 0 | + | | last_run | timestamp | YES | | 0000-00-00 00:00:00 | + | | attempts | smallint(5) unsigned | | | 0 | + | +----------+----------------------+------+-----+---------------------+ +-------+
#=============================================== # Cron config file - in YAML #=============================================== --- logfile: /my/dir/cron.log email: from: crondaemon@domain.com to: me@domain.com jobs: - - '* * * * * */10' - My::Cron::Job - update_indexes - - '* * * * * */10' - My::Cron::Job - clean_cache - - '* * * * * */10' - My::Cron::Job2 - do_queue
#=============================================== # Script for starting/stopping daemon #=============================================== #!/usr/bin/perl use strict; use warnings FATAL => 'all',NONFATAL => 'redefine'; my $config = # Load YAML file; use My::Daemon; my $command = shift @ARGV||''; my $daemon = My::Daemon->new($config); if ($command eq 'start') { exit $daemon->start; } elsif ($command eq 'restart') { $daemon->stop; sleep 5; exit $daemon->start; } elsif ($command eq 'stop') { $daemon->stop; } elsif ($command eq 'status') { $daemon->status; } else { die <<USAGE; Usage : $0 stop|start|restart|status USAGE }

Replies are listed 'Best First'.
Re: A perl daemon
by trm (Novice) on Jan 20, 2009 at 19:00 UTC
    I found this code potentially very handy. However, I found that the dispatcher function within the class is a new process every time it executes. So if I update the cron, the changes are lost once the function completes.

    If I create something similar as a script (not as a daemon), the schedule doesn't fork off a new process each time the function is called.

    Is there a way to mimic this functionality (non-forking) by using a daemon similar to the above example?

      However, I found that the dispatcher function within the class is a new process every time it executes. So if I update the cron, the changes are lost once the function completes.

      You mean, if you were to update the cron table from one of the jobs? Yes, those changes would be lost.

      I've been using the example code above in production for 2 years now, and it runs beautifully. I have a large application, which gets loaded at startup, and using fork to launch each process on linux is really cheap (as it makes use of Copy on Write). It also means that different jobs can run simultaneously.

      In Schedule::Cron there is an option nofork, which (not surprisingly) launches the jobs without forking :) - this would let you alter your cron table, but would only run jobs sequentially.

      Instead of that, you could consider making your daemon re-read the cron table whenever it receives a signal of your choosing, eg $SIG{USR}, then your child job could update the cron table, and signal its parent.

        Thank you for your response.

        I just need "something" to update the cron. I was thinking that I could get the parent to update the cron in a while loop, but it looks like after I "run" the schedule, the schedule seems to have control - i.e. until it completes, the next lines in what you called "init_cron_daemon" won't execute.

        Perhaps I just need the schedule to signal the parent to update the cron. I just need to research how I signal that parent.

        The only thing preventing me from using this is updating the cron on the fly. I found that deleting crons caused table problems, but I wrote workarounds for that. Now I need to avoid the forking - or precisely as you state, work around it somehow.