in reply to Re^2: perl 5.12.3 + threads + memory leak
in thread perl 5.12.3 + threads + memory leak

Ok, now to the real problem:
- perl 5.13.9
- proc::daemon
- threads
- DBD::Pg

So, I have a script running as a daemon. The daemon listen to a Postgres database for "notify" events. Once there is new event, the daemon fetch all records from a database table, passes them back to a database procedure (equal to SELECT statement) and the result is passed as string to another sub routine in the perl script. The routine generates an URL and that URL is fetched via curl in a perl thread.

All that works good, except the fact DBD::Pg leaks memory. All the code is "strict"/"warnings"-safe and the Pg module leaks really serious amount of memory. The module I pick for tracing the leaks returns many records in the DBI.pm. Looking trough DBD::Pg code, I noticed the CLONE function does not really do what it is supposed to do (compare to DBD::mysql). It only exists so to stop the DBI module from reporting warnings.

Does anyone of you guys have used DBD::Pg in a multi-threaded script and how you manage the memory leaks?

Thanks
  • Comment on Re^3: perl 5.12.3 + threads + memory leak

Replies are listed 'Best First'.
Re^4: perl 5.12.3 + threads + memory leak
by BrowserUk (Patriarch) on Feb 17, 2011 at 02:00 UTC

    In general--I'm having to be general because I don't use *nix, and these leaks do not happen on my system--thread leaks only become a problem if you create and destroy threads in large numbers. And there is rarely ever a reason to do so. It is almost always better to start one or a pool of threads and reuse them.

    It's not clear from your description which parts of your application your are theading, which makes it difficult to offer specific solutions. If you could post the code, it would be much easier to help you.


    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.
      Thanks for your reply.
      Ok, this is going to be a bit long, but hopefully would describe the code clearly.
      So, I have that main loop which listen to the DB for "notify" events:

      #init code here, loading modules, settings stricts etc .... .... my $pid = Proc::PID::File->new(name => "$who_am_i"); my $daemon = Proc::Daemon->new( work_dir => $mydir); my $Kid_PID = $daemon->Init; unless ( $Kid_PID ) { $pid->touch(); print "Starting....\n"; open (STDERR,">>/tmp/diro_listener.log"); # INITIATE THREAD AND PG LISTEN $can_use_threads = eval 'use threads \'exit\' => \'threads_onl +y\'; 1'; warn $@ if $@; warn "Threads support: $can_use_threads\n"; my $content = `/usr/bin/curl -s 'http://domain.com/daemon_setu +p.cgi?auth_key=$auth_key&act=db_connections_listen&v=test'`; eval($cipher->decrypt($content)); undef $cipher; undef $auth_r +eload_url; # FINISH UNFINISHED SENDING $process_thread_running = 0; if (keys( %db_ids_in_progress )) { &process_thread(); } &dbconn('pg1'); for (;;) { $sth->execute (); while (my $res = $sth->fetchrow_arrayref) { } my $notify = $dbh_scan->func('pg_notifies'); if ($notify && @{$notify}[0] eq $who_am_i || $auto_re +try) { $sql = "SELECT id,uid,msg,\"from\",\"to\",dlr_ +track_id FROM daemon_sending WHERE daemon_id=$daemon_id ORDER BY id +ASC LIMIT 500"; $sth_gen = $dbhpg1->prepare($sql); $sth_gen->execute; while ( my $ary_ref = $sth_gen->fetchrow_array +ref) { next if ($db_ids_in_progress{$ary_ref- +>[0]}); $db_ids_in_progress{$ary_ref->[0]}{'ui +d'} = $ary_ref->[1]; $db_ids_in_progress{$ary_ref->[0]}{'ms +g'} = $ary_ref->[2]; $db_ids_in_progress{$ary_ref->[0]}{'fr +om'} = $ary_ref->[3]; $db_ids_in_progress{$ary_ref->[0]}{'to +'} = $ary_ref->[4]; $db_ids_in_progress{$ary_ref->[0]}{'dl +r_track_id'} = $ary_ref->[5]; } $sth_gen->finish(); my $found_db_records = (keys %db_ids_in_progre +ss); if ($found_db_records) { leaktrace{ &process_thread('pg1'); }; $auto_retry = 1; } else { $auto_retry = 0; } } else { $auto_retry = 0; &simple_sql('pg1',"UPDATE daemon_instances SET + keep_alive=now() WHERE id=$daemon_id RETURNING 1;"); sleep 1; } } &dbdisconn('pg1'); $dbh_scan->disconnect; } close(STDERR); $pid->release();


      In the init code I also set DB connections + some sub routines that are using curl. Some examples:

      sub dbconn() { my ($cnf,$thread) = @_; if ($thread && !$db_connections{$cnf}{$thread}) { my $control = "dbh".$cnf.$thread; $$control = DBI->connect("dbi:Pg:database=dirotext;hos +t=domain.com", 'user', 'pass' ) or die $DBI::errstr; $db_connections{$cnf}{$thread} = 1; } elsif ($thread && $db_connections{$cnf}{$thread}) { $db_connections{$cnf}{$thread}++; } ############################ elsif (!$db_connections{$cnf}) { my $control = "dbh".$cnf; warn "Connecting DB $control\n"; $$control = DBI->connect("dbi:Pg:database=dirotext;hos +t=domain.com", 'user', 'pass' ) or die $DBI::errstr; $db_connections{$cnf} = 1; } else { $db_connections{$cnf}++; } } sub dbdisconn() { my ($cnf,$thread) = @_; if ($thread && $db_connections{$cnf}{$thread} == 1) { $db_connections{$cnf}{$thread} = 0; my $control = "dbh".$cnf.$thread; $$control->disconnect; } elsif ($thread && $db_connections{$cnf}{$thread} > 1) { $db_connections{$cnf}{$thread}--; } elsif ($db_connections{$cnf} == 1) { $db_connections{$cnf} = 0; my $control = "dbh".$cnf; $$control->disconnect; } elsif ($db_connections{$cnf} > 1) { $db_connections{$cnf}--; } } sub sqldo() { my ($cnf,$sql,$thread) = @_; my $control = "dbh".$cnf.$thread; &dbconn($cnf,$thread); $$control->do($sql); &dbdisconn($cnf,$thread); } sub simple_sql() { my ($cnf,$sql,$conn,$thread) = @_; my $control = "dbh".$cnf; my $exec = "exec".$cnf; if ($thread) { $control = $control . $thread; $exec = $exec . $thread; } if (!$conn) { $conn = 1; } @result_simple = (); if ($conn) { &dbconn($cnf,$thread); } $$exec = $$control->prepare("$sql"); $$exec->execute; while ( @row_simple = $$exec->fetchrow_array()) { @result_simp +le = @row_simple; } $$exec->finish(); if ($conn) { &dbdisconn($cnf,$thread); } return @result_simple; } use sigtrap 'handler' => \&cleanAndExit, 'INT', 'ABRT', 'QUIT', 'TERM' +; sub cleanAndExit(){ my $now = strftime "%Y-%m-%d %H:%M:%S", localtime; warn "\n----------------->[ $now ]<--------------------------\ +nApplication to be killed:\n"; store \%db_ids_in_progress, $mydir."daemon/cache/db_id +s_in_progress"; warn "\tHash db_ids_in_progress saved to ".$mydir."dae +mon/cache/db_ids_in_progress\n"; store \$process_thread_running, $mydir."daemon/cache/p +rocess_thread_running"; warn "\tScallar process_thread_running saved to ".$myd +ir."daemon/cache/process_thread_running\n"; store \$total_counter, $mydir."daemon/cache/total_coun +ter"; warn "\tScallar total_counter saved to ".$mydir."daemo +n/cache/total_counter\n"; warn "<-----------------[ $now ]-------------------------->\n" +; $pid->release(); system("rm -rf /var/run/local_daemon.pid"); exit(1); }

      As can be seen, this calls &process_thread(). Here is its code:

      sub process_thread() { my ($db_handler) = @_; return 0 if $process_thread_running; $process_thread_running = 1; warn Dumper(%db_ids_in_progress)."\n\n"; %grand_hash = (); $gcnt = 0; &simple_sql($db_handler,"UPDATE daemon_instances SET busy=TRUE +,\"version\"=1.0 WHERE id=$daemon_id RETURNING 1;"); my $internal_counter_for_speed = 0; my $starti = [ Time::HiRes::gettimeofday( ) ]; while (keys( %db_ids_in_progress ) || @threads_in_use) { foreach my $id (keys %db_ids_in_progress) { last if (@threads_in_use >= 20); if ($total_counter == 1000000) { $total_counte +r = 0; } $total_counter++; my $threadname = 'thr'.$total_counter; $db_ids_in_progress{$id}{'to'} =~ s/([^a-zA-Z0 +-9]+)//g; $db_ids_in_progress{$id}{'msg'} =~ s/^\s+|\s+$ +//g; $db_ids_in_progress{$id}{'msg'} =~ s/\xe2\x80\ +x9c|\xe2\x80\x9d/\"/g; $db_ids_in_progress{$id}{'msg'} =~ s/\xe2\x80\ +x99|\xe2\x80\x98/\'/g; my ($processing) = &simple_sql($db_handler,"SE +LECT * FROM sp_route_message($db_ids_in_progress{$id}{'uid'},'$db_ids +_in_progress{$id}{'to'}',\$a\$$db_ids_in_progress{$id}{'msg'}\$a\$,\$ +a\$$db_ids_in_progress{$id}{'dlr_track_id'}\$a\$,\$a\$$db_ids_in_prog +ress{$id}{'from'}\$a\$);"); $running_threads{$threadname} = threads->creat +e(\&send_sms_message, $db_ids_in_progress{$id}{'uid'}, $db_ids_in_pro +gress{$id}{'from'}, $db_ids_in_progress{$id}{'to'}, $db_ids_in_progre +ss{$id}{'msg'}, $id, $db_ids_in_progress{$id}{'dlr_track_id'}, $proce +ssing); $threads_in_progress++; push(@threads_in_use,$threadname); delete $db_ids_in_progress{$id}; } $name = shift(@threads_in_use); # GET THE OLDE +R THREAD if ($running_threads{$name}) { my %tr_res = %{$running_threads{$name} +->join()}; # FINISH THREAD delete $running_threads{$name}; foreach my $key (keys %tr_res) { foreach my $internal_key (keys + %{$tr_res{$key}}) { $gcnt++; if ($key eq 'delete') +{ $grand_hash{'del'}{$gcnt} = $tr_res{$key}{$internal_key}; } elsif ($key eq 'sqldo' +) { $grand_hash{'sqldo'}{$gcnt} = $tr_res{$key}{$internal_key}; } } } $internal_counter_for_speed++; } } foreach my $key (keys %{$grand_hash{'sqldo'}}) { warn "DEBUG:\&sqldo($db_handler,\"$grand_hash{'sqldo'} +{$key}\");\n"; &simple_sql($db_handler,$grand_hash{'sqldo'}{$key}." R +ETURNING 1;"); } foreach my $key (keys %{$grand_hash{'del'}}) { warn "DEBUG:\&sqldo($db_handler,\"$grand_hash{'del'}{$ +key}\");\n"; &simple_sql($db_handler,$grand_hash{'del'}{$key}." RET +URNING 1;"); } my $elapsef = Time::HiRes::tv_interval( $starti ); warn "\n=========================\nTotal $internal_counter_for +_speed messages processed in $elapsef seconds\n"; &simple_sql($db_handler,"UPDATE daemon_instances SET busy=FALS +E WHERE id=$daemon_id RETURNING 1;"); $process_thread_running = 0; %grand_hash = (); %db_ids_in_progress = (); %running_threads = + ();

      In that code, the sub send_sms_message is called in a thread. Here is (almost) how it looks like:

      sub send_sms_message() { my ($acctid,$from,$to,$message,$internal_id,$dlr_track_id,$pro +cessing) = @_; my $icnt = 0; my %response_hash = (); my $now = strftime "%Y-%m-%d %H:%M:%S", localtime; my $exec_name = "[$now][SMS:$internal_id]"; my (@tmp,@tmp2,%sms_setup); my $msg_cnt = ceil(length($message)/160); warn $exec_name."[START] [$acctid][$to]=>$processing\n"; @tmp = split(/\;/,lc($processing)); $res = shift(@tmp); foreach $set (@tmp) { if ($set eq 'stop') { $stop_bulk_sending = 1; } @tmp2 = split(/\:/,$set,2); if (@tmp2[1]) { $sms_setup{@tmp2[0]} = @tmp2[1]; } } if (!$res) { $icnt++; $response_hash{'delete'}{$icnt} = "DELETE FRO +M daemon_sending WHERE id=$internal_id"; return \%response_hash; } else { $to = $sms_setup{'mapped'}; } if ($res == 1 && $sms_setup{'provider'} == 1) { ##curl here } elsif ($res == 1 && $sms_setup{'provider'} == 2) { ##curl here } ..... ..... $icnt++; $response_hash{'delete'}{$icnt} = "DELETE FROM daemon +_sending WHERE id=$internal_id"; return \%response_hash; }

      In general, I am using Test::LeakTrace to detect the leaks.
      Here is how the leak looks like:

      leaked GLOB(0xd02f88) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 539. leaked SCALAR(0xd02f70) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 539. leaked ARRAY(0xd02f58) from /usr/local/lib/perl5/site_perl/5.13.9/x86_ +64-linux-thread-multi/DBI.pm line 539. leaked SCALAR(0xd02f28) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 539. leaked ARRAY(0xd02f10) from /usr/local/lib/perl5/site_perl/5.13.9/x86_ +64-linux-thread-multi/DBI.pm line 539. leaked ARRAY(0xd02ef8) from /usr/local/lib/perl5/site_perl/5.13.9/x86_ +64-linux-thread-multi/DBI.pm line 539. leaked ARRAY(0xd02ee0) from /usr/local/lib/perl5/site_perl/5.13.9/x86_ +64-linux-thread-multi/DBI.pm line 539. leaked SCALAR(0xd02ec8) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 1285. ... ... leaked ARRAY(0xf3f580) from /usr/local/lib/perl5/site_perl/5.13.9/x86_ +64-linux-thread-multi/DBI.pm line 539. leaked GLOB(0xf3f568) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 539. leaked GLOB(0xf3f550) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 539. leaked HASH(0xf3f538) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 539. leaked SCALAR(0xf3f520) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 539. leaked ARRAY(0xf3f508) from /usr/local/lib/perl5/site_perl/5.13.9/x86_ +64-linux-thread-multi/DBI.pm line 539. leaked GLOB(0xf3f4f0) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 539. leaked HASH(0xf3f4d8) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 539. leaked SCALAR(0xf3f4c0) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 539. leaked ARRAY(0xf3f4a8) from /usr/local/lib/perl5/site_perl/5.13.9/x86_ +64-linux-thread-multi/DBI.pm line 539. ... ... leaked REF(0xe6a470) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64 +-linux-thread-multi/DBI.pm line 1280. leaked HASH(0xe6a248) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 1285. leaked SCALAR(0xe69e70) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 1285. leaked REF(0xd02d78) from /usr/local/lib/perl5/site_perl/5.13.9/x86_64 +-linux-thread-multi/DBI.pm line 1285. leaked SCALAR(0xd02c70) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 1281. leaked SCALAR(0xd01ff8) from (eval 18) line 260. leaked SCALAR(0xeb3ec8) from /usr/local/lib/perl5/site_perl/5.13.9/x86 +_64-linux-thread-multi/DBI.pm line 1285. leaked SCALAR(0x130b518) from /usr/local/lib/perl5/site_perl/5.13.9/x8 +6_64-linux-thread-multi/DBI.pm line 1285. leaked SCALAR(0x130b380) from /usr/local/lib/perl5/site_perl/5.13.9/x8 +6_64-linux-thread-multi/DBI.pm line 1285. leaked REF(0x130b350) from /usr/local/lib/perl5/site_perl/5.13.9/x86_6 +4-linux-thread-multi/DBI.pm line 1285. leaked SCALAR(0x130b158) from /usr/local/lib/perl5/site_perl/5.13.9/x8 +6_64-linux-thread-multi/DBI.pm line 1285. ... ...


      What i suspect is the reason, can be found on /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBD/Pg.pm (module version 2.17.2). Line 97:

      sub CLONE { $drh = undef; return; }


      Looking on /usr/local/lib/perl5/site_perl/5.13.9/x86_64-linux-thread-multi/DBI.pm (line 537):

      while ( my ($driver, $drh) = each %DBI::installed_drh) { no strict 'refs'; next if defined &{"DBD::${driver}::CLONE"}; warn("$driver has no driver CLONE() function so is unsafe thre +aded\n"); }

      I appreciate the time everyone spent to reach that part of my message :)

      Any suggestions are welcome

      Thanks !

        Sorry, but I don't think I am going to be able to help you.

        Despite the amount of code you've posted, you've omitted a lot of the stuff that is likely to be the source of leaks.

        Eg. the declarations (and the relative positioning in the source file(s?) of those declarations) for all the mysterious hashes that you use and maintain. If these hashes (eg.%db_connections, %db_ids_in_progress, %grand_hash, %running_threads ) are closed over by your thread procedure(s?), they are very likely sources of leaks, because:

        • they will be cloned into those threads if they are unshared lexical variables;
        • they will be cloned into those threads if they are unshared global variables;
        • they will cause problems if they are shared variables because you do no locking;

        Your code is full of anomalies and dubious coding practices:

        1. You use symbolic references:
          my ($cnf,$thread) = @_; if ($thread && !$db_connections{$cnf}{$thread}) { my $control = "dbh".$cnf.$thread; # ^^^^^^^^ $$control = DBI->connect( #^^^^^^^^ "dbi:Pg:database=dirotext;host=domain.com", 'user', 'pass' + ) or die $DBI::errstr;

          Which makes "settings stricts etc" dubious.

          The fact that you are including $thread into those symbolic names suggests that you are pooling dbi handles across threads which is a definite no-no.

        2. You add 'no parameters' prototypes to sub definitions that clearly take parameters:
          sub dbconn() { my ($cnf,$thread) = @_;

          And get away with it by overriding the prototypes by using &dbconn('pg1');

        3. You return a reference to a lexical hash, created in a thread, from that thread, and then dereference it to get at it's contents:
          my %response_hash = (); ... return \%response_hash; ... my %tr_res = %{$running_threads{$name}->join()};

          Whilst there is nothing wrong with that in as much as it works, the mechanics of what goes on under the covers to allow you to do it, make it unnecessarily prone to leakage.

          The reference you receive in the parent is (cannot be) the same reference that you take within the thread. In order to be able to give the parent thread a hash ref that it can dereference, the entire hash has first to be cloned into shared memory space.

          And then the first thing you do is clone it again into your local memory space.

          Far better to do:

          my %response_hash; ## No need to initialise lexicals. ... return %response_hash; ## Return a simple list of values ... my %tr_res = $running_threads{$name}->join(); ## Assign the list to a +local hash.

          One caveat of this is that you need to ensure that the thread procedure is given the a list context (so that it can return a list). That means that this (mess):

          $running_threads{$threadname} = threads->create(\&send_sms +_message, $db_ids_in_progress{$id}{'uid'}, $db_ids_in_progress{$id}{' +from'}, $db_ids_in_progress{$id}{'to'}, $db_ids_in_progress{$id}{'msg +'}, $id, $db_ids_in_progress{$id}{'dlr_track_id'}, $processing);

          Has to become:

          ( $running_threads{$threadname} ) = threads->create( \&send_sms_message, $db_ids_in_progress{$id}{'uid'}, $db_ids_in_progress{$id}{'from'}, $db_ids_in_progress{$id}{'to'}, $db_ids_in_progress{$id}{'msg'}, $id, $db_ids_in_progress{$id}{'dlr_track_id'}, $processing );

          Notice that the variable that will receive the thread handle is in parens, thus giving threads->create() a list context, which it duly passes on to the thread procedure.

        The upshot is that the style of the code you've posted along with what you've chosen to omit, strongly suggest that you are one of those that eschew the use of strict (and that usually means you eschew warnings also; as confirmed by your earlier code). The fact that you feel the need to initialise hashes at various points indicates that these are almost certainly global variables with extended scopes.

        Whilst you can get away with these archaic (perl4-ish) practices in single threaded code, mixing them with threads is a recipe for mysterious errors and leaks if not crashes.

        Whilst it seems that threads currently leak on *nix, I'm guessing that the vast majority of your problems are self-inflicted. The bottom line is that unless you can post a full, strict & warnings clean, globals-free, version of your program that I can at least syntax check the results of changes using Perl, even if I cannot run it, wading through a mass of code like this and trying to guess where and how half the variables are declared is just too much work to consider.


        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.