#init code here, loading modules, settings stricts etc
....
....
my $pid = Proc::PID::File->new(name => "$who_am_i");
my $daemon = Proc::Daemon->new( work_dir => $mydir);
my $Kid_PID = $daemon->Init;
unless ( $Kid_PID ) {
$pid->touch();
print "Starting....\n";
open (STDERR,">>/tmp/diro_listener.log");
# INITIATE THREAD AND PG LISTEN
$can_use_threads = eval 'use threads \'exit\' => \'threads_only\'; 1'; warn $@ if $@;
warn "Threads support: $can_use_threads\n";
my $content = `/usr/bin/curl -s 'http://domain.com/daemon_setup.cgi?auth_key=$auth_key&act=db_connections_listen&v=test'`;
eval($cipher->decrypt($content)); undef $cipher; undef $auth_reload_url;
# FINISH UNFINISHED SENDING
$process_thread_running = 0;
if (keys( %db_ids_in_progress )) { &process_thread(); }
&dbconn('pg1');
for (;;) {
$sth->execute ();
while (my $res = $sth->fetchrow_arrayref) { }
my $notify = $dbh_scan->func('pg_notifies');
if ($notify && @{$notify}[0] eq $who_am_i || $auto_retry) {
$sql = "SELECT id,uid,msg,\"from\",\"to\",dlr_track_id FROM daemon_sending WHERE daemon_id=$daemon_id ORDER BY id ASC LIMIT 500";
$sth_gen = $dbhpg1->prepare($sql);
$sth_gen->execute;
while ( my $ary_ref = $sth_gen->fetchrow_arrayref) {
next if ($db_ids_in_progress{$ary_ref->[0]});
$db_ids_in_progress{$ary_ref->[0]}{'uid'} = $ary_ref->[1];
$db_ids_in_progress{$ary_ref->[0]}{'msg'} = $ary_ref->[2];
$db_ids_in_progress{$ary_ref->[0]}{'from'} = $ary_ref->[3];
$db_ids_in_progress{$ary_ref->[0]}{'to'} = $ary_ref->[4];
$db_ids_in_progress{$ary_ref->[0]}{'dlr_track_id'} = $ary_ref->[5];
}
$sth_gen->finish();
my $found_db_records = (keys %db_ids_in_progress);
if ($found_db_records) {
leaktrace{
&process_thread('pg1');
};
$auto_retry = 1;
}
else { $auto_retry = 0; }
}
else {
$auto_retry = 0;
&simple_sql('pg1',"UPDATE daemon_instances SET keep_alive=now() WHERE id=$daemon_id RETURNING 1;");
sleep 1;
}
}
&dbdisconn('pg1');
$dbh_scan->disconnect;
}
close(STDERR);
$pid->release();
####
sub dbconn() {
my ($cnf,$thread) = @_;
if ($thread && !$db_connections{$cnf}{$thread}) {
my $control = "dbh".$cnf.$thread;
$$control = DBI->connect("dbi:Pg:database=dirotext;host=domain.com", 'user', 'pass' ) or die $DBI::errstr;
$db_connections{$cnf}{$thread} = 1;
}
elsif ($thread && $db_connections{$cnf}{$thread}) {
$db_connections{$cnf}{$thread}++;
}
############################
elsif (!$db_connections{$cnf}) {
my $control = "dbh".$cnf;
warn "Connecting DB $control\n";
$$control = DBI->connect("dbi:Pg:database=dirotext;host=domain.com", 'user', 'pass' ) or die $DBI::errstr;
$db_connections{$cnf} = 1;
}
else { $db_connections{$cnf}++; }
}
sub dbdisconn() {
my ($cnf,$thread) = @_;
if ($thread && $db_connections{$cnf}{$thread} == 1) {
$db_connections{$cnf}{$thread} = 0;
my $control = "dbh".$cnf.$thread;
$$control->disconnect;
}
elsif ($thread && $db_connections{$cnf}{$thread} > 1) {
$db_connections{$cnf}{$thread}--;
}
elsif ($db_connections{$cnf} == 1) {
$db_connections{$cnf} = 0;
my $control = "dbh".$cnf;
$$control->disconnect;
}
elsif ($db_connections{$cnf} > 1) {
$db_connections{$cnf}--;
}
}
sub sqldo() {
my ($cnf,$sql,$thread) = @_;
my $control = "dbh".$cnf.$thread;
&dbconn($cnf,$thread);
$$control->do($sql);
&dbdisconn($cnf,$thread);
}
sub simple_sql() {
my ($cnf,$sql,$conn,$thread) = @_;
my $control = "dbh".$cnf;
my $exec = "exec".$cnf;
if ($thread) {
$control = $control . $thread;
$exec = $exec . $thread;
}
if (!$conn) { $conn = 1; }
@result_simple = ();
if ($conn) { &dbconn($cnf,$thread); }
$$exec = $$control->prepare("$sql");
$$exec->execute;
while ( @row_simple = $$exec->fetchrow_array()) { @result_simple = @row_simple; }
$$exec->finish();
if ($conn) { &dbdisconn($cnf,$thread); }
return @result_simple;
}
use sigtrap 'handler' => \&cleanAndExit, 'INT', 'ABRT', 'QUIT', 'TERM';
sub cleanAndExit(){
my $now = strftime "%Y-%m-%d %H:%M:%S", localtime;
warn "\n----------------->[ $now ]<--------------------------\nApplication to be killed:\n";
store \%db_ids_in_progress, $mydir."daemon/cache/db_ids_in_progress";
warn "\tHash db_ids_in_progress saved to ".$mydir."daemon/cache/db_ids_in_progress\n";
store \$process_thread_running, $mydir."daemon/cache/process_thread_running";
warn "\tScallar process_thread_running saved to ".$mydir."daemon/cache/process_thread_running\n";
store \$total_counter, $mydir."daemon/cache/total_counter";
warn "\tScallar total_counter saved to ".$mydir."daemon/cache/total_counter\n";
warn "<-----------------[ $now ]-------------------------->\n";
$pid->release();
system("rm -rf /var/run/local_daemon.pid");
exit(1);
}
####
sub process_thread() {
my ($db_handler) = @_;
return 0 if $process_thread_running;
$process_thread_running = 1;
warn Dumper(%db_ids_in_progress)."\n\n";
%grand_hash = (); $gcnt = 0;
&simple_sql($db_handler,"UPDATE daemon_instances SET busy=TRUE,\"version\"=1.0 WHERE id=$daemon_id RETURNING 1;");
my $internal_counter_for_speed = 0;
my $starti = [ Time::HiRes::gettimeofday( ) ];
while (keys( %db_ids_in_progress ) || @threads_in_use) {
foreach my $id (keys %db_ids_in_progress) {
last if (@threads_in_use >= 20);
if ($total_counter == 1000000) { $total_counter = 0; }
$total_counter++;
my $threadname = 'thr'.$total_counter;
$db_ids_in_progress{$id}{'to'} =~ s/([^a-zA-Z0-9]+)//g;
$db_ids_in_progress{$id}{'msg'} =~ s/^\s+|\s+$//g;
$db_ids_in_progress{$id}{'msg'} =~ s/\xe2\x80\x9c|\xe2\x80\x9d/\"/g;
$db_ids_in_progress{$id}{'msg'} =~ s/\xe2\x80\x99|\xe2\x80\x98/\'/g;
my ($processing) = &simple_sql($db_handler,"SELECT * FROM sp_route_message($db_ids_in_progress{$id}{'uid'},'$db_ids_in_progress{$id}{'to'}',\$a\$$db_ids_in_progress{$id}{'msg'}\$a\$,\$a\$$db_ids_in_progress{$id}{'dlr_track_id'}\$a\$,\$a\$$db_ids_in_progress{$id}{'from'}\$a\$);");
$running_threads{$threadname} = threads->create(\&send_sms_message, $db_ids_in_progress{$id}{'uid'}, $db_ids_in_progress{$id}{'from'}, $db_ids_in_progress{$id}{'to'}, $db_ids_in_progress{$id}{'msg'}, $id, $db_ids_in_progress{$id}{'dlr_track_id'}, $processing);
$threads_in_progress++;
push(@threads_in_use,$threadname);
delete $db_ids_in_progress{$id};
}
$name = shift(@threads_in_use); # GET THE OLDER THREAD
if ($running_threads{$name}) {
my %tr_res = %{$running_threads{$name}->join()}; # FINISH THREAD
delete $running_threads{$name};
foreach my $key (keys %tr_res) {
foreach my $internal_key (keys %{$tr_res{$key}}) {
$gcnt++;
if ($key eq 'delete') { $grand_hash{'del'}{$gcnt} = $tr_res{$key}{$internal_key}; }
elsif ($key eq 'sqldo') { $grand_hash{'sqldo'}{$gcnt} = $tr_res{$key}{$internal_key}; }
}
}
$internal_counter_for_speed++;
}
}
foreach my $key (keys %{$grand_hash{'sqldo'}}) {
warn "DEBUG:\&sqldo($db_handler,\"$grand_hash{'sqldo'}{$key}\");\n";
&simple_sql($db_handler,$grand_hash{'sqldo'}{$key}." RETURNING 1;");
}
foreach my $key (keys %{$grand_hash{'del'}}) {
warn "DEBUG:\&sqldo($db_handler,\"$grand_hash{'del'}{$key}\");\n";
&simple_sql($db_handler,$grand_hash{'del'}{$key}." RETURNING 1;");
}
my $elapsef = Time::HiRes::tv_interval( $starti );
warn "\n=========================\nTotal $internal_counter_for_speed messages processed in $elapsef seconds\n";
&simple_sql($db_handler,"UPDATE daemon_instances SET busy=FALSE WHERE id=$daemon_id RETURNING 1;");
$process_thread_running = 0;
%grand_hash = (); %db_ids_in_progress = (); %running_threads = ();
####
sub send_sms_message() {
my ($acctid,$from,$to,$message,$internal_id,$dlr_track_id,$processing) = @_;
my $icnt = 0; my %response_hash = ();
my $now = strftime "%Y-%m-%d %H:%M:%S", localtime;
my $exec_name = "[$now][SMS:$internal_id]";
my (@tmp,@tmp2,%sms_setup);
my $msg_cnt = ceil(length($message)/160);
warn $exec_name."[START] [$acctid][$to]=>$processing\n";
@tmp = split(/\;/,lc($processing)); $res = shift(@tmp);
foreach $set (@tmp) {
if ($set eq 'stop') { $stop_bulk_sending = 1; }
@tmp2 = split(/\:/,$set,2);
if (@tmp2[1]) {
$sms_setup{@tmp2[0]} = @tmp2[1];
}
}
if (!$res) {
$icnt++; $response_hash{'delete'}{$icnt} = "DELETE FROM daemon_sending WHERE id=$internal_id";
return \%response_hash;
}
else { $to = $sms_setup{'mapped'}; }
if ($res == 1 && $sms_setup{'provider'} == 1) {
##curl here
}
elsif ($res == 1 && $sms_setup{'provider'} == 2) {
##curl here
}
.....
.....
$icnt++; $response_hash{'delete'}{$icnt} = "DELETE FROM daemon_sending WHERE id=$internal_id";
return \%response_hash;
}
####
leaked GLOB(0xd02f88) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 539.
leaked SCALAR(0xd02f70) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 539.
leaked ARRAY(0xd02f58) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 539.
leaked SCALAR(0xd02f28) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 539.
leaked ARRAY(0xd02f10) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 539.
leaked ARRAY(0xd02ef8) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 539.
leaked ARRAY(0xd02ee0) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 539.
leaked SCALAR(0xd02ec8) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 1285.
...
...
leaked ARRAY(0xf3f580) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 539.
leaked GLOB(0xf3f568) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 539.
leaked GLOB(0xf3f550) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 539.
leaked HASH(0xf3f538) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 539.
leaked SCALAR(0xf3f520) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 539.
leaked ARRAY(0xf3f508) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 539.
leaked GLOB(0xf3f4f0) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 539.
leaked HASH(0xf3f4d8) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 539.
leaked SCALAR(0xf3f4c0) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 539.
leaked ARRAY(0xf3f4a8) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 539.
...
...
leaked REF(0xe6a470) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 1280.
leaked HASH(0xe6a248) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 1285.
leaked SCALAR(0xe69e70) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 1285.
leaked REF(0xd02d78) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 1285.
leaked SCALAR(0xd02c70) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 1281.
leaked SCALAR(0xd01ff8) from (eval 18) line 260.
leaked SCALAR(0xeb3ec8) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 1285.
leaked SCALAR(0x130b518) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 1285.
leaked SCALAR(0x130b380) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 1285.
leaked REF(0x130b350) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 1285.
leaked SCALAR(0x130b158) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm line 1285.
...
...
####
sub CLONE {
$drh = undef;
return;
}
####
while ( my ($driver, $drh) = each %DBI::installed_drh) {
no strict 'refs';
next if defined &{"DBD::${driver}::CLONE"};
warn("$driver has no driver CLONE() function so is unsafe threaded\n");
}