#init code here, loading modules, settings stricts etc .... .... my $pid = Proc::PID::File->new(name => "$who_am_i"); my $daemon = Proc::Daemon->new( work_dir => $mydir); my $Kid_PID = $daemon->Init; unless ( $Kid_PID ) { $pid->touch(); print "Starting....\n"; open (STDERR,">>/tmp/diro_listener.log"); # INITIATE THREAD AND PG LISTEN $can_use_threads = eval 'use threads \'exit\' => \'threads_onl +y\'; 1'; warn $@ if $@; warn "Threads support: $can_use_threads\n"; my $content = `/usr/bin/curl -s 'http://domain.com/daemon_setu +p.cgi?auth_key=$auth_key&act=db_connections_listen&v=test'`; eval($cipher->decrypt($content)); undef $cipher; undef $auth_r +eload_url; # FINISH UNFINISHED SENDING $process_thread_running = 0; if (keys( %db_ids_in_progress )) { &process_thread(); } &dbconn('pg1'); for (;;) { $sth->execute (); while (my $res = $sth->fetchrow_arrayref) { } my $notify = $dbh_scan->func('pg_notifies'); if ($notify && @{$notify}[0] eq $who_am_i || $auto_re +try) { $sql = "SELECT id,uid,msg,\"from\",\"to\",dlr_ +track_id FROM daemon_sending WHERE daemon_id=$daemon_id ORDER BY id +ASC LIMIT 500"; $sth_gen = $dbhpg1->prepare($sql); $sth_gen->execute; while ( my $ary_ref = $sth_gen->fetchrow_array +ref) { next if ($db_ids_in_progress{$ary_ref- +>[0]}); $db_ids_in_progress{$ary_ref->[0]}{'ui +d'} = $ary_ref->[1]; $db_ids_in_progress{$ary_ref->[0]}{'ms +g'} = $ary_ref->[2]; $db_ids_in_progress{$ary_ref->[0]}{'fr +om'} = $ary_ref->[3]; $db_ids_in_progress{$ary_ref->[0]}{'to +'} = $ary_ref->[4]; $db_ids_in_progress{$ary_ref->[0]}{'dl +r_track_id'} = $ary_ref->[5]; } $sth_gen->finish(); my $found_db_records = (keys %db_ids_in_progre +ss); if ($found_db_records) { leaktrace{ &process_thread('pg1'); }; $auto_retry = 1; } else { $auto_retry = 0; } } else { $auto_retry = 0; &simple_sql('pg1',"UPDATE daemon_instances SET + keep_alive=now() WHERE id=$daemon_id RETURNING 1;"); sleep 1; } } &dbdisconn('pg1'); $dbh_scan->disconnect; } close(STDERR); $pid->release();
sub dbconn() { my ($cnf,$thread) = @_; if ($thread && !$db_connections{$cnf}{$thread}) { my $control = "dbh".$cnf.$thread; $$control = DBI->connect("dbi:Pg:database=dirotext;hos +t=domain.com", 'user', 'pass' ) or die $DBI::errstr; $db_connections{$cnf}{$thread} = 1; } elsif ($thread && $db_connections{$cnf}{$thread}) { $db_connections{$cnf}{$thread}++; } ############################ elsif (!$db_connections{$cnf}) { my $control = "dbh".$cnf; warn "Connecting DB $control\n"; $$control = DBI->connect("dbi:Pg:database=dirotext;hos +t=domain.com", 'user', 'pass' ) or die $DBI::errstr; $db_connections{$cnf} = 1; } else { $db_connections{$cnf}++; } } sub dbdisconn() { my ($cnf,$thread) = @_; if ($thread && $db_connections{$cnf}{$thread} == 1) { $db_connections{$cnf}{$thread} = 0; my $control = "dbh".$cnf.$thread; $$control->disconnect; } elsif ($thread && $db_connections{$cnf}{$thread} > 1) { $db_connections{$cnf}{$thread}--; } elsif ($db_connections{$cnf} == 1) { $db_connections{$cnf} = 0; my $control = "dbh".$cnf; $$control->disconnect; } elsif ($db_connections{$cnf} > 1) { $db_connections{$cnf}--; } } sub sqldo() { my ($cnf,$sql,$thread) = @_; my $control = "dbh".$cnf.$thread; &dbconn($cnf,$thread); $$control->do($sql); &dbdisconn($cnf,$thread); } sub simple_sql() { my ($cnf,$sql,$conn,$thread) = @_; my $control = "dbh".$cnf; my $exec = "exec".$cnf; if ($thread) { $control = $control . $thread; $exec = $exec . $thread; } if (!$conn) { $conn = 1; } @result_simple = (); if ($conn) { &dbconn($cnf,$thread); } $$exec = $$control->prepare("$sql"); $$exec->execute; while ( @row_simple = $$exec->fetchrow_array()) { @result_simp +le = @row_simple; } $$exec->finish(); if ($conn) { &dbdisconn($cnf,$thread); } return @result_simple; } use sigtrap 'handler' => \&cleanAndExit, 'INT', 'ABRT', 'QUIT', 'TERM' +; sub cleanAndExit(){ my $now = strftime "%Y-%m-%d %H:%M:%S", localtime; warn "\n----------------->[ $now ]<--------------------------\ +nApplication to be killed:\n"; store \%db_ids_in_progress, $mydir."daemon/cache/db_id +s_in_progress"; warn "\tHash db_ids_in_progress saved to ".$mydir."dae +mon/cache/db_ids_in_progress\n"; store \$process_thread_running, $mydir."daemon/cache/p +rocess_thread_running"; warn "\tScallar process_thread_running saved to ".$myd +ir."daemon/cache/process_thread_running\n"; store \$total_counter, $mydir."daemon/cache/total_coun +ter"; warn "\tScallar total_counter saved to ".$mydir."daemo +n/cache/total_counter\n"; warn "<-----------------[ $now ]-------------------------->\n" +; $pid->release(); system("rm -rf /var/run/local_daemon.pid"); exit(1); }
sub process_thread() { my ($db_handler) = @_; return 0 if $process_thread_running; $process_thread_running = 1; warn Dumper(%db_ids_in_progress)."\n\n"; %grand_hash = (); $gcnt = 0; &simple_sql($db_handler,"UPDATE daemon_instances SET busy=TRUE +,\"version\"=1.0 WHERE id=$daemon_id RETURNING 1;"); my $internal_counter_for_speed = 0; my $starti = [ Time::HiRes::gettimeofday( ) ]; while (keys( %db_ids_in_progress ) || @threads_in_use) { foreach my $id (keys %db_ids_in_progress) { last if (@threads_in_use >= 20); if ($total_counter == 1000000) { $total_counte +r = 0; } $total_counter++; my $threadname = 'thr'.$total_counter; $db_ids_in_progress{$id}{'to'} =~ s/([^a-zA-Z0 +-9]+)//g; $db_ids_in_progress{$id}{'msg'} =~ s/^\s+|\s+$ +//g; $db_ids_in_progress{$id}{'msg'} =~ s/\xe2\x80\ +x9c|\xe2\x80\x9d/\"/g; $db_ids_in_progress{$id}{'msg'} =~ s/\xe2\x80\ +x99|\xe2\x80\x98/\'/g; my ($processing) = &simple_sql($db_handler,"SE +LECT * FROM sp_route_message($db_ids_in_progress{$id}{'uid'},'$db_ids +_in_progress{$id}{'to'}',\$a\$$db_ids_in_progress{$id}{'msg'}\$a\$,\$ +a\$$db_ids_in_progress{$id}{'dlr_track_id'}\$a\$,\$a\$$db_ids_in_prog +ress{$id}{'from'}\$a\$);"); $running_threads{$threadname} = threads->creat +e(\&send_sms_message, $db_ids_in_progress{$id}{'uid'}, $db_ids_in_pro +gress{$id}{'from'}, $db_ids_in_progress{$id}{'to'}, $db_ids_in_progre +ss{$id}{'msg'}, $id, $db_ids_in_progress{$id}{'dlr_track_id'}, $proce +ssing); $threads_in_progress++; push(@threads_in_use,$threadname); delete $db_ids_in_progress{$id}; } $name = shift(@threads_in_use); # GET THE OLDE +R THREAD if ($running_threads{$name}) { my %tr_res = %{$running_threads{$name} +->join()}; # FINISH THREAD delete $running_threads{$name}; foreach my $key (keys %tr_res) { foreach my $internal_key (keys + %{$tr_res{$key}}) { $gcnt++; if ($key eq 'delete') +{ $grand_hash{'del'}{$gcnt} = $tr_res{$key}{$internal_key}; } elsif ($key eq 'sqldo' +) { $grand_hash{'sqldo'}{$gcnt} = $tr_res{$key}{$internal_key}; } } } $internal_counter_for_speed++; } } foreach my $key (keys %{$grand_hash{'sqldo'}}) { warn "DEBUG:\&sqldo($db_handler,\"$grand_hash{'sqldo'} +{$key}\");\n"; &simple_sql($db_handler,$grand_hash{'sqldo'}{$key}." R +ETURNING 1;"); } foreach my $key (keys %{$grand_hash{'del'}}) { warn "DEBUG:\&sqldo($db_handler,\"$grand_hash{'del'}{$ +key}\");\n"; &simple_sql($db_handler,$grand_hash{'del'}{$key}." RET +URNING 1;"); } my $elapsef = Time::HiRes::tv_interval( $starti ); warn "\n=========================\nTotal $internal_counter_for +_speed messages processed in $elapsef seconds\n"; &simple_sql($db_handler,"UPDATE daemon_instances SET busy=FALS +E WHERE id=$daemon_id RETURNING 1;"); $process_thread_running = 0; %grand_hash = (); %db_ids_in_progress = (); %running_threads = + ();
sub send_sms_message() { my ($acctid,$from,$to,$message,$internal_id,$dlr_track_id,$pro +cessing) = @_; my $icnt = 0; my %response_hash = (); my $now = strftime "%Y-%m-%d %H:%M:%S", localtime; my $exec_name = "[$now][SMS:$internal_id]"; my (@tmp,@tmp2,%sms_setup); my $msg_cnt = ceil(length($message)/160); warn $exec_name."[START] [$acctid][$to]=>$processing\n"; @tmp = split(/\;/,lc($processing)); $res = shift(@tmp); foreach $set (@tmp) { if ($set eq 'stop') { $stop_bulk_sending = 1; } @tmp2 = split(/\:/,$set,2); if (@tmp2[1]) { $sms_setup{@tmp2[0]} = @tmp2[1]; } } if (!$res) { $icnt++; $response_hash{'delete'}{$icnt} = "DELETE FRO +M daemon_sending WHERE id=$internal_id"; return \%response_hash; } else { $to = $sms_setup{'mapped'}; } if ($res == 1 && $sms_setup{'provider'} == 1) { ##curl here } elsif ($res == 1 && $sms_setup{'provider'} == 2) { ##curl here } ..... ..... $icnt++; $response_hash{'delete'}{$icnt} = "DELETE FROM daemon +_sending WHERE id=$internal_id"; return \%response_hash; }
leaked GLOB(0xd02f88) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 539. leaked SCALAR(0xd02f70) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 539. leaked ARRAY(0xd02f58) from /usr/local/lib/perl5/site_perl/5.13.9/x86_ +64-linux-thread-multi/DBI.pm line 539. leaked SCALAR(0xd02f28) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 539. leaked ARRAY(0xd02f10) from /usr/local/lib/perl5/site_perl/5.13.9/x86_ +64-linux-thread-multi/DBI.pm line 539. leaked ARRAY(0xd02ef8) from /usr/local/lib/perl5/site_perl/5.13.9/x86_ +64-linux-thread-multi/DBI.pm line 539. leaked ARRAY(0xd02ee0) from /usr/local/lib/perl5/site_perl/5.13.9/x86_ +64-linux-thread-multi/DBI.pm line 539. leaked SCALAR(0xd02ec8) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 1285. ... ... leaked ARRAY(0xf3f580) from /usr/local/lib/perl5/site_perl/5.13.9/x86_ +64-linux-thread-multi/DBI.pm line 539. leaked GLOB(0xf3f568) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 539. leaked GLOB(0xf3f550) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 539. leaked HASH(0xf3f538) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 539. leaked SCALAR(0xf3f520) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 539. leaked ARRAY(0xf3f508) from /usr/local/lib/perl5/site_perl/5.13.9/x86_ +64-linux-thread-multi/DBI.pm line 539. leaked GLOB(0xf3f4f0) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 539. leaked HASH(0xf3f4d8) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 539. leaked SCALAR(0xf3f4c0) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 539. leaked ARRAY(0xf3f4a8) from /usr/local/lib/perl5/site_perl/5.13.9/x86_ +64-linux-thread-multi/DBI.pm line 539. ... ... leaked REF(0xe6a470) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64 +-linux-thread-multi/DBI.pm line 1280. leaked HASH(0xe6a248) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 1285. leaked SCALAR(0xe69e70) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 1285. leaked REF(0xd02d78) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64 +-linux-thread-multi/DBI.pm line 1285. leaked SCALAR(0xd02c70) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 1281. leaked SCALAR(0xd01ff8) from (eval 18) line 260. leaked SCALAR(0xeb3ec8) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 1285. leaked SCALAR(0x130b518) from /usr/local/lib/perl5/site_perl/5.13.9/x8 +6_64-linux-thread-multi/DBI.pm line 1285. leaked SCALAR(0x130b380) from /usr/local/lib/perl5/site_perl/5.13.9/x8 +6_64-linux-thread-multi/DBI.pm line 1285. leaked REF(0x130b350) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 1285. leaked SCALAR(0x130b158) from /usr/local/lib/perl5/site_perl/5.13.9/x8 +6_64-linux-thread-multi/DBI.pm line 1285. ... ...
sub CLONE { $drh = undef; return; }
while ( my ($driver, $drh) = each %DBI::installed_drh) { no strict 'refs'; next if defined &{"DBD::${driver}::CLONE"}; warn("$driver has no driver CLONE() function so is unsafe thre +aded\n"); }
In reply to Re^5: perl 5.12.3 + threads + memory leak
by kamenpetrov
in thread perl 5.12.3 + threads + memory leak
by kamenpetrov
| For: | Use: | ||
| & | & | ||
| < | < | ||
| > | > | ||
| [ | [ | ||
| ] | ] |