#!/usr/bin/perl use strict; use Data::Dumper ; use POSIX 'setsid'; use File::Copy ; $| = 1; #unbuferred print my $basedir = '/home/emf003/loader'; # base loader + dir my $sourceFile = "$basedir/raw"; # dir to chec +k my $logsDir = "$basedir/logs"; # where lock +and log files will be in my $loaderDcProgram = "$basedir/t.pl"; # program to +run my $moveFile2Dir = "$basedir/arch"; # where files + will be moved after beeing processed my $errorFilesDir = "$basedir/errorFiles"; # where files + that could not be loaded will be my $sleep = 5; # how long to + sleep for my $maxProcess = 10 ; # max number +of process in the stack my $processCount = 0 ; my $timeToKill = 180; # 3 (180) min +utes to kill a son of the process; my $maxInternalErrors = 100 ; # max number +of messages (before this daemon kill himself) my $internalErrors = 0 ; # internal er +ror counter. To avoid endless loop my %files = () ; $SIG{INT} = \&catchKiller; # deal with ctrl+c $SIG{TERM} = \&catchKiller; # deal with kill process $SIG{KILL} = \&catchKiller; # deal with kill process $SIG{CHLD} = 'IGNORE' ; # kill zombies :d like resident evil girl +does :d sub daemonize { chdir '/' or die "Can't chdir to /: $!"; open STDIN, '/dev/null' or die "Can't read /dev/null: $!"; open STDOUT, '>/dev/null' or die "Can't write to /dev/null: $!"; defined(my $pid = fork) or die "Can't fork: $!"; exit if $pid; setsid or die "Can't start a new session: $!" +; open STDERR, '>&STDOUT' or die "Can't dup stdout: $!"; } &daemonize(); &check_env(); # very basic env check #print STATUS Dumper(\%files); print STATUS "\nloader started at " . localtime() . "\n"; while (1) { &createInfoFile(); #warn "\n opening $sourceFile "; opendir(BASE, $sourceFile) or "Cant open dir $sourceFile: $!\n +"; #any file; minus . and .. my @dirContent = grep {!/^\.{1,2}$/ } readdir BASE ; closedir BASE ; #main idea: get the file and its size. Wait some seconds and c +heck if its #size had changed. If not, then let's load it. #STATUs A (active - will be checked to see if file size had +changed) # R (ready to be loaded) # L (loading - loader is processing the file) # X (file had been loaded and process finished - can b +e deleted) # F (files had been processed but returned errors) for (@dirContent) { if (!exists $files{$_}) { open (FH, ">>/$sourceFile/$_"); $files{$_}{FILESIZE} = -s FH ; $files{$_}{TIMESTAMP} = time ; $files{$_}{STATUS} = 'A'; close FH ; } } sleep $sleep; &mark2Load(\%files); foreach my $file (keys %files) { if ($files{$file}{STATUS} eq 'R') { if ($processCount < $maxProcess) { if (my $pid = fork) { $files{$file}{STATUS}='L'; $files{$file}{PROCESS}=$pid; $files{$file}{STARTPROCESS}=ti +me ; print STATUS "\n PID $pid proc +essing file $file"; $processCount++ ; } else { exec ( "$loaderDcProgram $sour +ceFile/$file > $logsDir/$$.lock" ) ; exit(0); } } } elsif ( ($files{$file}{STATUS} eq 'L') && (( +time - $files{$file}{STARTPROCESS}) <= $timeToKill) ) { #check if lock file still exists eval { open TEST, "< $logsDir/$files{ +$file}{PROCESS}.lock"; die "not found " unless (-e TE +ST); }; #update status if file not found if ($@ =~ /not found/) { $files{$file}{STATUS} = 'X' } else { my @lines = <TEST> ; if ($lines[0] =~ /^OK$/) { unlink("$logsDir/$file +s{$file}{PROCESS}.lock") or makeLog("\n Could n +ot unlink file $logsDir/$files{$file}{PROCESS}.lock" . "$!"); } elsif ($lines[0] =~ /^FAIL$/ +) { $files{$file}{STATUS} += 'F'; } #if file has not been correct +loaded (OK) it should have a FAIL in the 1st line. #antway it can implement more +'status' in a switch way } close TEST ; } elsif (($files{$file}{STATUS} eq 'L') && (ti +me - $files{$file}{STARTPROCESS}) > $timeToKill) { #Kill all of them. Do not exitet (??) + Show no mercy. kill $files{$file}{PROCESS} ; } elsif ($files{$file}{STATUS} eq 'X') { move ("$sourceFile/$file","$moveFile2D +ir/$file"); delete ($files{$file}); $processCount--; } elsif ($files{$file}{STATUS} eq 'F') { move ("$sourceFile/$file","$errorFiles +Dir/$file"); move ("$logsDir/$files{$file}{PROCESS} +.lock","$errorFilesDir/$file.err"); delete ($files{$file}); $processCount-- ; } } } print Dumper(\%files); #------------------------------------------------------------ sub mark2Load() { my $files = shift ; ##receives the hash as a reference foreach my $file (keys %{$files}) { #check if timestamp changed and if filesize doesn't. #if it happened, than we can try to load it. if ($files->{$file}{STATUS} eq 'A') { open (FH, ">>/$sourceFile/$file"); my $fileSize = -s FH ; if ( (time - $files->{$file}{TIMESTAMP +}) > 0 && ($files->{$file}{FILESIZE} == $fil +eSize ) ) { $files->{$file}{STATUS} = 'R' +; } close FH ; } } } #--------------------------------------------------------- sub catchKiller { my $sigName = shift ; makeLog("\nRecebendo sinal de kill $sigName as ". localtime() +); die "\n I've been killed $sigName"; } #-------------------------------------------------------- sub check_env() { open TEST1, $basedir or die "\n Can't open $basedir $!"; open TEST2, $sourceFile or die "\n Can't open $sourceFile $!"; open TEST3, $logsDir or die "\n Can't open $logsDir $!"; open TEST4, $loaderDcProgram or die "\n Can't open $loaderDcP +rogram $!"; die " $loaderDcProgram has no exec permission" unless ( -x TES +T4) ; close TEST4 ; close TEST1 ; close TEST2 ; close TEST3 ; open (STATUS, ">> $logsDir/LOADERSTATUS") } #--------------------------------------------------------- sub makeLog() { print STATUS shift ; $internalErrors++; print STATUS "'\nErrors $internalErrors process: $processCou +nt"; if ($internalErrors >= $maxInternalErrors) { print STATUS "\nkilling myself ($$) . too many errors. +" . " DEBUG info: " . print Dumper(\%files); die ; } } sub END { print STATUS "\n closing loader " . localtime() ; close STATUS ; } #-------------------------------------------------------- sub createInfoFile() { open (STATUS, "> $logsDir/STATUS"); print STATUS <<EOF; Last Update: @{[scalar localtime()]} process: $$ basedir: $basedir sourcefile: $sourceFile logs: $logsDir program: $loaderDcProgram move good: $moveFile2Dir move error: $errorFilesDir sleep time: $sleep max process: $maxProcess process count: $processCount time to kill child: $timeToKill Max errors: $maxInternalErrors internal errors: $internalErrors @{[Dumper(\%files)]} EOF close STATUS }
In reply to Re: File Loader (load the content of a file and insert into DB)
by r34d0nl1
in thread File Loader (load the content of a file and insert into DB)
by r34d0nl1
| For: | Use: | ||
| & | & | ||
| < | < | ||
| > | > | ||
| [ | [ | ||
| ] | ] |