in reply to Re: perl 5.12.3 + threads + memory leak
in thread perl 5.12.3 + threads + memory leak

Thanks for the info.

I have tested the following scenarios:
- 5.10.0/5.12.3 + LWP --> both LWP and Perl leaks memory
- 5.10.0/5.12.3 + curl --> Perl leaks memory
- 5.13.9 + LWP --> LWP leaks memory
- 5.13.9 + curl --> no leak


Thank you once again for your time and help!
  • Comment on Re^2: perl 5.12.3 + threads + memory leak

Replies are listed 'Best First'.
Re^3: perl 5.12.3 + threads + memory leak
by kamenpetrov (Novice) on Feb 17, 2011 at 00:43 UTC
    Ok, now to the real problem:
    - perl 5.13.9
    - proc::daemon
    - threads
    - DBD::Pg

    So, I have a script running as a daemon. The daemon listen to a Postgres database for "notify" events. Once there is new event, the daemon fetch all records from a database table, passes them back to a database procedure (equal to SELECT statement) and the result is passed as string to another sub routine in the perl script. The routine generates an URL and that URL is fetched via curl in a perl thread.

    All that works good, except the fact DBD::Pg leaks memory. All the code is "strict"/"warnings"-safe and the Pg module leaks really serious amount of memory. The module I pick for tracing the leaks returns many records in the DBI.pm. Looking trough DBD::Pg code, I noticed the CLONE function does not really do what it is supposed to do (compare to DBD::mysql). It only exists so to stop the DBI module from reporting warnings.

    Does anyone of you guys have used DBD::Pg in a multi-threaded script and how you manage the memory leaks?

    Thanks

      In general--I'm having to be general because I don't use *nix, and these leaks do not happen on my system--thread leaks only become a problem if you create and destroy threads in large numbers. And there is rarely ever a reason to do so. It is almost always better to start one or a pool of threads and reuse them.

      It's not clear from your description which parts of your application your are theading, which makes it difficult to offer specific solutions. If you could post the code, it would be much easier to help you.


      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.
        Thanks for your reply.
        Ok, this is going to be a bit long, but hopefully would describe the code clearly.
        So, I have that main loop which listen to the DB for "notify" events:

        #init code here, loading modules, settings stricts etc .... .... my $pid = Proc::PID::File->new(name => "$who_am_i"); my $daemon = Proc::Daemon->new( work_dir => $mydir); my $Kid_PID = $daemon->Init; unless ( $Kid_PID ) { $pid->touch(); print "Starting....\n"; open (STDERR,">>/tmp/diro_listener.log"); # INITIATE THREAD AND PG LISTEN $can_use_threads = eval 'use threads \'exit\' => \'threads_onl +y\'; 1'; warn $@ if $@; warn "Threads support: $can_use_threads\n"; my $content = `/usr/bin/curl -s 'http://domain.com/daemon_setu +p.cgi?auth_key=$auth_key&act=db_connections_listen&v=test'`; eval($cipher->decrypt($content)); undef $cipher; undef $auth_r +eload_url; # FINISH UNFINISHED SENDING $process_thread_running = 0; if (keys( %db_ids_in_progress )) { &process_thread(); } &dbconn('pg1'); for (;;) { $sth->execute (); while (my $res = $sth->fetchrow_arrayref) { } my $notify = $dbh_scan->func('pg_notifies'); if ($notify && @{$notify}[0] eq $who_am_i || $auto_re +try) { $sql = "SELECT id,uid,msg,\"from\",\"to\",dlr_ +track_id FROM daemon_sending WHERE daemon_id=$daemon_id ORDER BY id +ASC LIMIT 500"; $sth_gen = $dbhpg1->prepare($sql); $sth_gen->execute; while ( my $ary_ref = $sth_gen->fetchrow_array +ref) { next if ($db_ids_in_progress{$ary_ref- +>[0]}); $db_ids_in_progress{$ary_ref->[0]}{'ui +d'} = $ary_ref->[1]; $db_ids_in_progress{$ary_ref->[0]}{'ms +g'} = $ary_ref->[2]; $db_ids_in_progress{$ary_ref->[0]}{'fr +om'} = $ary_ref->[3]; $db_ids_in_progress{$ary_ref->[0]}{'to +'} = $ary_ref->[4]; $db_ids_in_progress{$ary_ref->[0]}{'dl +r_track_id'} = $ary_ref->[5]; } $sth_gen->finish(); my $found_db_records = (keys %db_ids_in_progre +ss); if ($found_db_records) { leaktrace{ &process_thread('pg1'); }; $auto_retry = 1; } else { $auto_retry = 0; } } else { $auto_retry = 0; &simple_sql('pg1',"UPDATE daemon_instances SET + keep_alive=now() WHERE id=$daemon_id RETURNING 1;"); sleep 1; } } &dbdisconn('pg1'); $dbh_scan->disconnect; } close(STDERR); $pid->release();


        In the init code I also set DB connections + some sub routines that are using curl. Some examples:

        sub dbconn() { my ($cnf,$thread) = @_; if ($thread && !$db_connections{$cnf}{$thread}) { my $control = "dbh".$cnf.$thread; $$control = DBI->connect("dbi:Pg:database=dirotext;hos +t=domain.com", 'user', 'pass' ) or die $DBI::errstr; $db_connections{$cnf}{$thread} = 1; } elsif ($thread && $db_connections{$cnf}{$thread}) { $db_connections{$cnf}{$thread}++; } ############################ elsif (!$db_connections{$cnf}) { my $control = "dbh".$cnf; warn "Connecting DB $control\n"; $$control = DBI->connect("dbi:Pg:database=dirotext;hos +t=domain.com", 'user', 'pass' ) or die $DBI::errstr; $db_connections{$cnf} = 1; } else { $db_connections{$cnf}++; } } sub dbdisconn() { my ($cnf,$thread) = @_; if ($thread && $db_connections{$cnf}{$thread} == 1) { $db_connections{$cnf}{$thread} = 0; my $control = "dbh".$cnf.$thread; $$control->disconnect; } elsif ($thread && $db_connections{$cnf}{$thread} > 1) { $db_connections{$cnf}{$thread}--; } elsif ($db_connections{$cnf} == 1) { $db_connections{$cnf} = 0; my $control = "dbh".$cnf; $$control->disconnect; } elsif ($db_connections{$cnf} > 1) { $db_connections{$cnf}--; } } sub sqldo() { my ($cnf,$sql,$thread) = @_; my $control = "dbh".$cnf.$thread; &dbconn($cnf,$thread); $$control->do($sql); &dbdisconn($cnf,$thread); } sub simple_sql() { my ($cnf,$sql,$conn,$thread) = @_; my $control = "dbh".$cnf; my $exec = "exec".$cnf; if ($thread) { $control = $control . $thread; $exec = $exec . $thread; } if (!$conn) { $conn = 1; } @result_simple = (); if ($conn) { &dbconn($cnf,$thread); } $$exec = $$control->prepare("$sql"); $$exec->execute; while ( @row_simple = $$exec->fetchrow_array()) { @result_simp +le = @row_simple; } $$exec->finish(); if ($conn) { &dbdisconn($cnf,$thread); } return @result_simple; } use sigtrap 'handler' => \&cleanAndExit, 'INT', 'ABRT', 'QUIT', 'TERM' +; sub cleanAndExit(){ my $now = strftime "%Y-%m-%d %H:%M:%S", localtime; warn "\n----------------->[ $now ]<--------------------------\ +nApplication to be killed:\n"; store \%db_ids_in_progress, $mydir."daemon/cache/db_id +s_in_progress"; warn "\tHash db_ids_in_progress saved to ".$mydir."dae +mon/cache/db_ids_in_progress\n"; store \$process_thread_running, $mydir."daemon/cache/p +rocess_thread_running"; warn "\tScallar process_thread_running saved to ".$myd +ir."daemon/cache/process_thread_running\n"; store \$total_counter, $mydir."daemon/cache/total_coun +ter"; warn "\tScallar total_counter saved to ".$mydir."daemo +n/cache/total_counter\n"; warn "<-----------------[ $now ]-------------------------->\n" +; $pid->release(); system("rm -rf /var/run/local_daemon.pid"); exit(1); }

        As can be seen, this calls &process_thread(). Here is its code:

        sub process_thread() { my ($db_handler) = @_; return 0 if $process_thread_running; $process_thread_running = 1; warn Dumper(%db_ids_in_progress)."\n\n"; %grand_hash = (); $gcnt = 0; &simple_sql($db_handler,"UPDATE daemon_instances SET busy=TRUE +,\"version\"=1.0 WHERE id=$daemon_id RETURNING 1;"); my $internal_counter_for_speed = 0; my $starti = [ Time::HiRes::gettimeofday( ) ]; while (keys( %db_ids_in_progress ) || @threads_in_use) { foreach my $id (keys %db_ids_in_progress) { last if (@threads_in_use >= 20); if ($total_counter == 1000000) { $total_counte +r = 0; } $total_counter++; my $threadname = 'thr'.$total_counter; $db_ids_in_progress{$id}{'to'} =~ s/([^a-zA-Z0 +-9]+)//g; $db_ids_in_progress{$id}{'msg'} =~ s/^\s+|\s+$ +//g; $db_ids_in_progress{$id}{'msg'} =~ s/\xe2\x80\ +x9c|\xe2\x80\x9d/\"/g; $db_ids_in_progress{$id}{'msg'} =~ s/\xe2\x80\ +x99|\xe2\x80\x98/\'/g; my ($processing) = &simple_sql($db_handler,"SE +LECT * FROM sp_route_message($db_ids_in_progress{$id}{'uid'},'$db_ids +_in_progress{$id}{'to'}',\$a\$$db_ids_in_progress{$id}{'msg'}\$a\$,\$ +a\$$db_ids_in_progress{$id}{'dlr_track_id'}\$a\$,\$a\$$db_ids_in_prog +ress{$id}{'from'}\$a\$);"); $running_threads{$threadname} = threads->creat +e(\&send_sms_message, $db_ids_in_progress{$id}{'uid'}, $db_ids_in_pro +gress{$id}{'from'}, $db_ids_in_progress{$id}{'to'}, $db_ids_in_progre +ss{$id}{'msg'}, $id, $db_ids_in_progress{$id}{'dlr_track_id'}, $proce +ssing); $threads_in_progress++; push(@threads_in_use,$threadname); delete $db_ids_in_progress{$id}; } $name = shift(@threads_in_use); # GET THE OLDE +R THREAD if ($running_threads{$name}) { my %tr_res = %{$running_threads{$name} +->join()}; # FINISH THREAD delete $running_threads{$name}; foreach my $key (keys %tr_res) { foreach my $internal_key (keys + %{$tr_res{$key}}) { $gcnt++; if ($key eq 'delete') +{ $grand_hash{'del'}{$gcnt} = $tr_res{$key}{$internal_key}; } elsif ($key eq 'sqldo' +) { $grand_hash{'sqldo'}{$gcnt} = $tr_res{$key}{$internal_key}; } } } $internal_counter_for_speed++; } } foreach my $key (keys %{$grand_hash{'sqldo'}}) { warn "DEBUG:\&sqldo($db_handler,\"$grand_hash{'sqldo'} +{$key}\");\n"; &simple_sql($db_handler,$grand_hash{'sqldo'}{$key}." R +ETURNING 1;"); } foreach my $key (keys %{$grand_hash{'del'}}) { warn "DEBUG:\&sqldo($db_handler,\"$grand_hash{'del'}{$ +key}\");\n"; &simple_sql($db_handler,$grand_hash{'del'}{$key}." RET +URNING 1;"); } my $elapsef = Time::HiRes::tv_interval( $starti ); warn "\n=========================\nTotal $internal_counter_for +_speed messages processed in $elapsef seconds\n"; &simple_sql($db_handler,"UPDATE daemon_instances SET busy=FALS +E WHERE id=$daemon_id RETURNING 1;"); $process_thread_running = 0; %grand_hash = (); %db_ids_in_progress = (); %running_threads = + ();

        In that code, the sub send_sms_message is called in a thread. Here is (almost) how it looks like:

        sub send_sms_message() { my ($acctid,$from,$to,$message,$internal_id,$dlr_track_id,$pro +cessing) = @_; my $icnt = 0; my %response_hash = (); my $now = strftime "%Y-%m-%d %H:%M:%S", localtime; my $exec_name = "[$now][SMS:$internal_id]"; my (@tmp,@tmp2,%sms_setup); my $msg_cnt = ceil(length($message)/160); warn $exec_name."[START] [$acctid][$to]=>$processing\n"; @tmp = split(/\;/,lc($processing)); $res = shift(@tmp); foreach $set (@tmp) { if ($set eq 'stop') { $stop_bulk_sending = 1; } @tmp2 = split(/\:/,$set,2); if (@tmp2[1]) { $sms_setup{@tmp2[0]} = @tmp2[1]; } } if (!$res) { $icnt++; $response_hash{'delete'}{$icnt} = "DELETE FRO +M daemon_sending WHERE id=$internal_id"; return \%response_hash; } else { $to = $sms_setup{'mapped'}; } if ($res == 1 && $sms_setup{'provider'} == 1) { ##curl here } elsif ($res == 1 && $sms_setup{'provider'} == 2) { ##curl here } ..... ..... $icnt++; $response_hash{'delete'}{$icnt} = "DELETE FROM daemon +_sending WHERE id=$internal_id"; return \%response_hash; }

        In general, I am using Test::LeakTrace to detect the leaks.
        Here is how the leak looks like:

        leaked GLOB(0xd02f88) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 539. leaked SCALAR(0xd02f70) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 539. leaked ARRAY(0xd02f58) from /usr/local/lib/perl5/site_perl/5.13.9/x86_ +64-linux-thread-multi/DBI.pm line 539. leaked SCALAR(0xd02f28) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 539. leaked ARRAY(0xd02f10) from /usr/local/lib/perl5/site_perl/5.13.9/x86_ +64-linux-thread-multi/DBI.pm line 539. leaked ARRAY(0xd02ef8) from /usr/local/lib/perl5/site_perl/5.13.9/x86_ +64-linux-thread-multi/DBI.pm line 539. leaked ARRAY(0xd02ee0) from /usr/local/lib/perl5/site_perl/5.13.9/x86_ +64-linux-thread-multi/DBI.pm line 539. leaked SCALAR(0xd02ec8) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 1285. ... ... leaked ARRAY(0xf3f580) from /usr/local/lib/perl5/site_perl/5.13.9/x86_ +64-linux-thread-multi/DBI.pm line 539. leaked GLOB(0xf3f568) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 539. leaked GLOB(0xf3f550) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 539. leaked HASH(0xf3f538) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 539. leaked SCALAR(0xf3f520) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 539. leaked ARRAY(0xf3f508) from /usr/local/lib/perl5/site_perl/5.13.9/x86_ +64-linux-thread-multi/DBI.pm line 539. leaked GLOB(0xf3f4f0) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 539. leaked HASH(0xf3f4d8) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 539. leaked SCALAR(0xf3f4c0) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 539. leaked ARRAY(0xf3f4a8) from /usr/local/lib/perl5/site_perl/5.13.9/x86_ +64-linux-thread-multi/DBI.pm line 539. ... ... leaked REF(0xe6a470) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64 +-linux-thread-multi/DBI.pm line 1280. leaked HASH(0xe6a248) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 1285. leaked SCALAR(0xe69e70) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 1285. leaked REF(0xd02d78) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64 +-linux-thread-multi/DBI.pm line 1285. leaked SCALAR(0xd02c70) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 1281. leaked SCALAR(0xd01ff8) from (eval 18) line 260. leaked SCALAR(0xeb3ec8) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 1285. leaked SCALAR(0x130b518) from /usr/local/lib/perl5/site_perl/5.13.9/x8 +6_64-linux-thread-multi/DBI.pm line 1285. leaked SCALAR(0x130b380) from /usr/local/lib/perl5/site_perl/5.13.9/x8 +6_64-linux-thread-multi/DBI.pm line 1285. leaked REF(0x130b350) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 1285. leaked SCALAR(0x130b158) from /usr/local/lib/perl5/site_perl/5.13.9/x8 +6_64-linux-thread-multi/DBI.pm line 1285. ... ...


        What i suspect is the reason, can be found on /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBD/Pg.pm (module version 2.17.2). Line 97:

        sub CLONE { $drh = undef; return; }


        Looking on /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm (line 537):

        while ( my ($driver, $drh) = each %DBI::installed_drh) { no strict 'refs'; next if defined &{"DBD::${driver}::CLONE"}; warn("$driver has no driver CLONE() function so is unsafe thre +aded\n"); }

        I appreciate the time everyone spent to reach that part of my message :)

        Any suggestions are welcome

        Thanks !