Thanks for your reply.
Ok, this is going to be a bit long, but hopefully would describe the code clearly.
So, I have that main loop which listen to the DB for "notify" events:
#init code here, loading modules, settings stricts etc
....
....
my $pid = Proc::PID::File->new(name => "$who_am_i");
my $daemon = Proc::Daemon->new( work_dir => $mydir);
my $Kid_PID = $daemon->Init;
unless ( $Kid_PID ) {
$pid->touch();
print "Starting....\n";
open (STDERR,">>/tmp/diro_listener.log");
# INITIATE THREAD AND PG LISTEN
$can_use_threads = eval 'use threads \'exit\' => \'threads_onl
+y\'; 1'; warn $@ if $@;
warn "Threads support: $can_use_threads\n";
my $content = `/usr/bin/curl -s 'http://domain.com/daemon_setu
+p.cgi?auth_key=$auth_key&act=db_connections_listen&v=test'`;
eval($cipher->decrypt($content)); undef $cipher; undef $auth_r
+eload_url;
# FINISH UNFINISHED SENDING
$process_thread_running = 0;
if (keys( %db_ids_in_progress )) { &process_thread(); }
&dbconn('pg1');
for (;;) {
$sth->execute ();
while (my $res = $sth->fetchrow_arrayref) { }
my $notify = $dbh_scan->func('pg_notifies');
if ($notify && @{$notify}[0] eq $who_am_i || $auto_re
+try) {
$sql = "SELECT id,uid,msg,\"from\",\"to\",dlr_
+track_id FROM daemon_sending WHERE daemon_id=$daemon_id ORDER BY id
+ASC LIMIT 500";
$sth_gen = $dbhpg1->prepare($sql);
$sth_gen->execute;
while ( my $ary_ref = $sth_gen->fetchrow_array
+ref) {
next if ($db_ids_in_progress{$ary_ref-
+>[0]});
$db_ids_in_progress{$ary_ref->[0]}{'ui
+d'} = $ary_ref->[1];
$db_ids_in_progress{$ary_ref->[0]}{'ms
+g'} = $ary_ref->[2];
$db_ids_in_progress{$ary_ref->[0]}{'fr
+om'} = $ary_ref->[3];
$db_ids_in_progress{$ary_ref->[0]}{'to
+'} = $ary_ref->[4];
$db_ids_in_progress{$ary_ref->[0]}{'dl
+r_track_id'} = $ary_ref->[5];
}
$sth_gen->finish();
my $found_db_records = (keys %db_ids_in_progre
+ss);
if ($found_db_records) {
leaktrace{
&process_thread('pg1');
};
$auto_retry = 1;
}
else { $auto_retry = 0; }
}
else {
$auto_retry = 0;
&simple_sql('pg1',"UPDATE daemon_instances SET
+ keep_alive=now() WHERE id=$daemon_id RETURNING 1;");
sleep 1;
}
}
&dbdisconn('pg1');
$dbh_scan->disconnect;
}
close(STDERR);
$pid->release();
In the init code I also set DB connections + some sub routines that are using curl. Some examples:
sub dbconn() {
my ($cnf,$thread) = @_;
if ($thread && !$db_connections{$cnf}{$thread}) {
my $control = "dbh".$cnf.$thread;
$$control = DBI->connect("dbi:Pg:database=dirotext;hos
+t=domain.com", 'user', 'pass' ) or die $DBI::errstr;
$db_connections{$cnf}{$thread} = 1;
}
elsif ($thread && $db_connections{$cnf}{$thread}) {
$db_connections{$cnf}{$thread}++;
}
############################
elsif (!$db_connections{$cnf}) {
my $control = "dbh".$cnf;
warn "Connecting DB $control\n";
$$control = DBI->connect("dbi:Pg:database=dirotext;hos
+t=domain.com", 'user', 'pass' ) or die $DBI::errstr;
$db_connections{$cnf} = 1;
}
else { $db_connections{$cnf}++; }
}
sub dbdisconn() {
my ($cnf,$thread) = @_;
if ($thread && $db_connections{$cnf}{$thread} == 1) {
$db_connections{$cnf}{$thread} = 0;
my $control = "dbh".$cnf.$thread;
$$control->disconnect;
}
elsif ($thread && $db_connections{$cnf}{$thread} > 1) {
$db_connections{$cnf}{$thread}--;
}
elsif ($db_connections{$cnf} == 1) {
$db_connections{$cnf} = 0;
my $control = "dbh".$cnf;
$$control->disconnect;
}
elsif ($db_connections{$cnf} > 1) {
$db_connections{$cnf}--;
}
}
sub sqldo() {
my ($cnf,$sql,$thread) = @_;
my $control = "dbh".$cnf.$thread;
&dbconn($cnf,$thread);
$$control->do($sql);
&dbdisconn($cnf,$thread);
}
sub simple_sql() {
my ($cnf,$sql,$conn,$thread) = @_;
my $control = "dbh".$cnf;
my $exec = "exec".$cnf;
if ($thread) {
$control = $control . $thread;
$exec = $exec . $thread;
}
if (!$conn) { $conn = 1; }
@result_simple = ();
if ($conn) { &dbconn($cnf,$thread); }
$$exec = $$control->prepare("$sql");
$$exec->execute;
while ( @row_simple = $$exec->fetchrow_array()) { @result_simp
+le = @row_simple; }
$$exec->finish();
if ($conn) { &dbdisconn($cnf,$thread); }
return @result_simple;
}
use sigtrap 'handler' => \&cleanAndExit, 'INT', 'ABRT', 'QUIT', 'TERM'
+;
sub cleanAndExit(){
my $now = strftime "%Y-%m-%d %H:%M:%S", localtime;
warn "\n----------------->[ $now ]<--------------------------\
+nApplication to be killed:\n";
store \%db_ids_in_progress, $mydir."daemon/cache/db_id
+s_in_progress";
warn "\tHash db_ids_in_progress saved to ".$mydir."dae
+mon/cache/db_ids_in_progress\n";
store \$process_thread_running, $mydir."daemon/cache/p
+rocess_thread_running";
warn "\tScallar process_thread_running saved to ".$myd
+ir."daemon/cache/process_thread_running\n";
store \$total_counter, $mydir."daemon/cache/total_coun
+ter";
warn "\tScallar total_counter saved to ".$mydir."daemo
+n/cache/total_counter\n";
warn "<-----------------[ $now ]-------------------------->\n"
+;
$pid->release();
system("rm -rf /var/run/local_daemon.pid");
exit(1);
}
As can be seen, this calls &process_thread(). Here is its code:
sub process_thread() {
my ($db_handler) = @_;
return 0 if $process_thread_running;
$process_thread_running = 1;
warn Dumper(%db_ids_in_progress)."\n\n";
%grand_hash = (); $gcnt = 0;
&simple_sql($db_handler,"UPDATE daemon_instances SET busy=TRUE
+,\"version\"=1.0 WHERE id=$daemon_id RETURNING 1;");
my $internal_counter_for_speed = 0;
my $starti = [ Time::HiRes::gettimeofday( ) ];
while (keys( %db_ids_in_progress ) || @threads_in_use) {
foreach my $id (keys %db_ids_in_progress) {
last if (@threads_in_use >= 20);
if ($total_counter == 1000000) { $total_counte
+r = 0; }
$total_counter++;
my $threadname = 'thr'.$total_counter;
$db_ids_in_progress{$id}{'to'} =~ s/([^a-zA-Z0
+-9]+)//g;
$db_ids_in_progress{$id}{'msg'} =~ s/^\s+|\s+$
+//g;
$db_ids_in_progress{$id}{'msg'} =~ s/\xe2\x80\
+x9c|\xe2\x80\x9d/\"/g;
$db_ids_in_progress{$id}{'msg'} =~ s/\xe2\x80\
+x99|\xe2\x80\x98/\'/g;
my ($processing) = &simple_sql($db_handler,"SE
+LECT * FROM sp_route_message($db_ids_in_progress{$id}{'uid'},'$db_ids
+_in_progress{$id}{'to'}',\$a\$$db_ids_in_progress{$id}{'msg'}\$a\$,\$
+a\$$db_ids_in_progress{$id}{'dlr_track_id'}\$a\$,\$a\$$db_ids_in_prog
+ress{$id}{'from'}\$a\$);");
$running_threads{$threadname} = threads->creat
+e(\&send_sms_message, $db_ids_in_progress{$id}{'uid'}, $db_ids_in_pro
+gress{$id}{'from'}, $db_ids_in_progress{$id}{'to'}, $db_ids_in_progre
+ss{$id}{'msg'}, $id, $db_ids_in_progress{$id}{'dlr_track_id'}, $proce
+ssing);
$threads_in_progress++;
push(@threads_in_use,$threadname);
delete $db_ids_in_progress{$id};
}
$name = shift(@threads_in_use); # GET THE OLDE
+R THREAD
if ($running_threads{$name}) {
my %tr_res = %{$running_threads{$name}
+->join()}; # FINISH THREAD
delete $running_threads{$name};
foreach my $key (keys %tr_res) {
foreach my $internal_key (keys
+ %{$tr_res{$key}}) {
$gcnt++;
if ($key eq 'delete')
+{ $grand_hash{'del'}{$gcnt} = $tr_res{$key}{$internal_key}; }
elsif ($key eq 'sqldo'
+) { $grand_hash{'sqldo'}{$gcnt} = $tr_res{$key}{$internal_key}; }
}
}
$internal_counter_for_speed++;
}
}
foreach my $key (keys %{$grand_hash{'sqldo'}}) {
warn "DEBUG:\&sqldo($db_handler,\"$grand_hash{'sqldo'}
+{$key}\");\n";
&simple_sql($db_handler,$grand_hash{'sqldo'}{$key}." R
+ETURNING 1;");
}
foreach my $key (keys %{$grand_hash{'del'}}) {
warn "DEBUG:\&sqldo($db_handler,\"$grand_hash{'del'}{$
+key}\");\n";
&simple_sql($db_handler,$grand_hash{'del'}{$key}." RET
+URNING 1;");
}
my $elapsef = Time::HiRes::tv_interval( $starti );
warn "\n=========================\nTotal $internal_counter_for
+_speed messages processed in $elapsef seconds\n";
&simple_sql($db_handler,"UPDATE daemon_instances SET busy=FALS
+E WHERE id=$daemon_id RETURNING 1;");
$process_thread_running = 0;
%grand_hash = (); %db_ids_in_progress = (); %running_threads =
+ ();
In that code, the sub send_sms_message is called in a thread. Here is (almost) how it looks like:
sub send_sms_message() {
my ($acctid,$from,$to,$message,$internal_id,$dlr_track_id,$pro
+cessing) = @_;
my $icnt = 0; my %response_hash = ();
my $now = strftime "%Y-%m-%d %H:%M:%S", localtime;
my $exec_name = "[$now][SMS:$internal_id]";
my (@tmp,@tmp2,%sms_setup);
my $msg_cnt = ceil(length($message)/160);
warn $exec_name."[START] [$acctid][$to]=>$processing\n";
@tmp = split(/\;/,lc($processing)); $res = shift(@tmp);
foreach $set (@tmp) {
if ($set eq 'stop') { $stop_bulk_sending = 1; }
@tmp2 = split(/\:/,$set,2);
if (@tmp2[1]) {
$sms_setup{@tmp2[0]} = @tmp2[1];
}
}
if (!$res) {
$icnt++; $response_hash{'delete'}{$icnt} = "DELETE FRO
+M daemon_sending WHERE id=$internal_id";
return \%response_hash;
}
else { $to = $sms_setup{'mapped'}; }
if ($res == 1 && $sms_setup{'provider'} == 1) {
##curl here
}
elsif ($res == 1 && $sms_setup{'provider'} == 2) {
##curl here
}
.....
.....
$icnt++; $response_hash{'delete'}{$icnt} = "DELETE FROM daemon
+_sending WHERE id=$internal_id";
return \%response_hash;
}
In general, I am using Test::LeakTrace to detect the leaks.
Here is how the leak looks like:
leaked GLOB(0xd02f88) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6
+4-linux-thread-multi/DBI.pm line 539.
leaked SCALAR(0xd02f70) from /usr/local/lib/perl5/site_perl/5.13.9/x86
+_64-linux-thread-multi/DBI.pm line 539.
leaked ARRAY(0xd02f58) from /usr/local/lib/perl5/site_perl/5.13.9/x86_
+64-linux-thread-multi/DBI.pm line 539.
leaked SCALAR(0xd02f28) from /usr/local/lib/perl5/site_perl/5.13.9/x86
+_64-linux-thread-multi/DBI.pm line 539.
leaked ARRAY(0xd02f10) from /usr/local/lib/perl5/site_perl/5.13.9/x86_
+64-linux-thread-multi/DBI.pm line 539.
leaked ARRAY(0xd02ef8) from /usr/local/lib/perl5/site_perl/5.13.9/x86_
+64-linux-thread-multi/DBI.pm line 539.
leaked ARRAY(0xd02ee0) from /usr/local/lib/perl5/site_perl/5.13.9/x86_
+64-linux-thread-multi/DBI.pm line 539.
leaked SCALAR(0xd02ec8) from /usr/local/lib/perl5/site_perl/5.13.9/x86
+_64-linux-thread-multi/DBI.pm line 1285.
...
...
leaked ARRAY(0xf3f580) from /usr/local/lib/perl5/site_perl/5.13.9/x86_
+64-linux-thread-multi/DBI.pm line 539.
leaked GLOB(0xf3f568) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6
+4-linux-thread-multi/DBI.pm line 539.
leaked GLOB(0xf3f550) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6
+4-linux-thread-multi/DBI.pm line 539.
leaked HASH(0xf3f538) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6
+4-linux-thread-multi/DBI.pm line 539.
leaked SCALAR(0xf3f520) from /usr/local/lib/perl5/site_perl/5.13.9/x86
+_64-linux-thread-multi/DBI.pm line 539.
leaked ARRAY(0xf3f508) from /usr/local/lib/perl5/site_perl/5.13.9/x86_
+64-linux-thread-multi/DBI.pm line 539.
leaked GLOB(0xf3f4f0) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6
+4-linux-thread-multi/DBI.pm line 539.
leaked HASH(0xf3f4d8) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6
+4-linux-thread-multi/DBI.pm line 539.
leaked SCALAR(0xf3f4c0) from /usr/local/lib/perl5/site_perl/5.13.9/x86
+_64-linux-thread-multi/DBI.pm line 539.
leaked ARRAY(0xf3f4a8) from /usr/local/lib/perl5/site_perl/5.13.9/x86_
+64-linux-thread-multi/DBI.pm line 539.
...
...
leaked REF(0xe6a470) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64
+-linux-thread-multi/DBI.pm line 1280.
leaked HASH(0xe6a248) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6
+4-linux-thread-multi/DBI.pm line 1285.
leaked SCALAR(0xe69e70) from /usr/local/lib/perl5/site_perl/5.13.9/x86
+_64-linux-thread-multi/DBI.pm line 1285.
leaked REF(0xd02d78) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64
+-linux-thread-multi/DBI.pm line 1285.
leaked SCALAR(0xd02c70) from /usr/local/lib/perl5/site_perl/5.13.9/x86
+_64-linux-thread-multi/DBI.pm line 1281.
leaked SCALAR(0xd01ff8) from (eval 18) line 260.
leaked SCALAR(0xeb3ec8) from /usr/local/lib/perl5/site_perl/5.13.9/x86
+_64-linux-thread-multi/DBI.pm line 1285.
leaked SCALAR(0x130b518) from /usr/local/lib/perl5/site_perl/5.13.9/x8
+6_64-linux-thread-multi/DBI.pm line 1285.
leaked SCALAR(0x130b380) from /usr/local/lib/perl5/site_perl/5.13.9/x8
+6_64-linux-thread-multi/DBI.pm line 1285.
leaked REF(0x130b350) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6
+4-linux-thread-multi/DBI.pm line 1285.
leaked SCALAR(0x130b158) from /usr/local/lib/perl5/site_perl/5.13.9/x8
+6_64-linux-thread-multi/DBI.pm line 1285.
...
...
What i suspect is the reason, can be found on /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBD/Pg.pm (module version 2.17.2). Line 97:
sub CLONE {
$drh = undef;
return;
}
Looking on /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm (line 537):
while ( my ($driver, $drh) = each %DBI::installed_drh) {
no strict 'refs';
next if defined &{"DBD::${driver}::CLONE"};
warn("$driver has no driver CLONE() function so is unsafe thre
+aded\n");
}
I appreciate the time everyone spent to reach that part of my message :)
Any suggestions are welcome
Thanks ! |