expresspotato has asked for the wisdom of the Perl Monks concerning the following question:

Hi, I'm using DBI across multiple threads and or forks. Apparently DBI still shares handles even across FORKED processes. Adding InactiveDestroy=>1 to the connect routine is supposed to fix this. There is no way I can setup and tear down a DBI instance directly in my program when need be. Thus, I use a single SQL.pl script, then call various subroutines to either row_sql,get_sql,put_sql or hash_sql. Now the problem is, when one of the forked children terminates, it seems to take other handles along with it from the parent. I've tried various things, even stupid things no doubt to get this working (like just constantly creating new handles if the old one has been used). Anyone who is able to help me solve this problem gets a free $25 paypal donation to them. Below is the SQL.pl code. Making it crash is fairly hard in a single threaded environment. But if you thread or fork a child process, when the thread / fork is done, so are some of the handles until the "until" loop kicks in to save the query from certain death! The Until loop provides some level of protection but I would preffer an error free solution.
#!/usr/bin/perl -T use DBI; &sql_setup; $sqlw = 1; #Don't print SQL warnings if 0 sub sql_setup{ if ($_ eq ""){ $db="tr"; }else{ $db = $_[0]; } $user="user"; $passwd="password"; $host="blogsphere.com:3306"; $connectionInfo="dbi:mysql:$db;$host"; $sql = 1; $sql_abort = 0; } sub del_sql{ if ($_[0] eq ""){print "Empty Delete";} return &sql($_[0]); } sub mod_sql{ if ($_[0] eq ""){print "Empty Modify";} return &sql($_[0]); } sub put_sql{ if ($_[0] eq ""){print "Empty Put";} return &sql($_[0]); } sub row_sql(){ $sql_r++; my $select = $_[0]; my (@row1,@dbh1,@sth1,$retry_count1); if ($select eq ""){print "Empty Select."; exit;} until ( eval { if ($sql_abort){&log("Aborted failed SQL.",-2); return 1;} $z1++; @dbh1[$z1]=DBI->connect_cached($connectionInfo,$user,$passwd, +{PrintError=>0,InactiveDestroy=>1,mysql_auto_reconnect=>1} ); @sth1[$z1]=@dbh1[$z1]->prepare($select); if (@sth1[$z1]->execute()) { @row1=@sth1[$z1]->fetchrow_array(); @sth1[$z1]->finish; } } ) { if ($sqlw){print "Retrying SQL Row ($DBI::errstr,$select)..."; + } &sql_check_err($DBI::err,$select); sleep (1); $retry_count1++;} @dbh1[$z1]->disconnect(); #print "Used Row Handle ($z1)"; return @row1; } sub get_sql(){ $sql_g++; my $select = $_[0]; my $c = 0; my (@results,@row,@dbh2,@sth2,$retry_count2); if ($select eq ""){print "Empty Select."; exit;} until ( eval { if ($sql_abort){&log("Aborted failed SQL.",-2); return 1;} $z2++; @dbh2[$z2]=DBI->connect_cached($connectionInfo,$user,$passwd, +{PrintError=>0,InactiveDestroy=>1} ); @sth2[$z2] = @dbh2[$z2]->prepare($select); if (@sth2[$z2]->execute()) { while (@row=@sth2[$z2]->fetchrow_array()) { @results[$c] = @row[0]; $c++; } @sth2[$z2]->finish; } } ) { if ($sqlw){print "Retrying SQL Get ($DBI::errstr)...";} &sql_c +heck_err($DBI::err,$select); sleep (1); $retry_count2++;} @dbh2[$z2]->disconnect(); #print "Used Get Handle ($z2)"; return (@results); } sub hash_sql(){ $sql_h++; my $select = $_[0]; my (@dbh3,@sth3,$retry_count3,$rows); if ($select eq ""){print "Empty Select."; exit;} until ( eval { if ($sql_abort){&log("Aborted failed SQL.",-2); return 1;} $z3++; @dbh3[$z3]=DBI->connect_cached($connectionInfo,$user,$passwd, +{PrintError=>0,InactiveDestroy=>1} ); $rows = @dbh3[$z3]->selectall_arrayref( $select, { Slice => {} } ); } ) { if ($sqlw){print "Retrying SQL Hash Select ($DBI::errstr,$DBI::e +rr)...";} &sql_check_err($DBI::err,$select); sleep (1); $retry_coun +t3++;} @dbh3[$z3]->disconnect(); #print "Used Hash Handle ($z3)"; return ($rows); } sub sql{ $sql_a++; my (@row,@dbh4,@sth4,$retry_count4); my $insert = $_[0]; if ($insert eq ""){print "Empty insert."; exit;} until ( eval { if ($sql_abort){&log("Aborted failed SQL.",-2); return 1;} $z4++; @dbh4[$z4]=DBI->connect_cached($connectionInfo,$user,$passwd, +{PrintError=>0,InactiveDestroy=>1} ); @sth4[$z4]=@dbh4[$z4]->prepare($insert); if (@sth4[$z4]->execute()) { return 1; } } ) { if ($sqlw){print "Retrying SQL Insert ($insert) ($DBI::errstr) +...";} &sql_check_err($DBI::err,$insert); sleep (1); $retry_count4++; +} @dbh4[$z4]->disconnect(); #print "Used SQL Handle ($z4)"; return 1; } sub sql_check_err{ #print "chkerr ($_[0])" . substr($_[0],0,19); #$_[1] =~ s/\"//g; if ($_[0] eq "1064"){ print "WARNING: SQL SYNTAX ERROR<br>($_[1])!!!\n"; #&log("WARNING SQL SYNTAX ERROR ($_[1])",-3); $sql_abort = 1; } if ($_[0] eq "1054"){ print "WARNING: SQL UNKNOWN COLUMN!!!\n"; #&log("WARNING SQL COLUMN ERROR ($_[1])",-3); $sql_abort = 1; } if ($sql_abort){ exit; } }
  • Comment on The pain of DBI !!! Lost connection from forked or threaded children
  • Download Code

Replies are listed 'Best First'.
Re: The pain of DBI !!! Lost connection from forked or threaded children
by Corion (Patriarch) on Dec 28, 2009 at 14:23 UTC

    Note what the DBI documentation has to say about InactiveDestroy (emphasis mine):

    This attribute is specifically designed for use in Unix applications that "fork" child processes. Either the parent or the child process, but not both, should set InactiveDestroy true on all their shared handles. (Note that some databases, including Oracle, don't support passing a database connection across a fork.)

    You do set InavtiveDestroy unconditionally. Also, you don't show where you call fork() and you don't tell us what database you use. I think that properly using InactiveDestroy, as per the documentation, should fix this problem, but if you continue having problems, a small, self-contained example will likely elicit better answers than mine.

      Hi, Thank you for your prompt reply. Wouldn't constantly creating new handles through
      $z1++ @dbh1[$z1]->DBI:...
      Make them independent from each other? I'm just uncertain why DBI would trash another connection from a completely different thread or fork even when the DBH isn't the same. I will try setting InactiveDestroy just in the child processes and post back in an hour.

        First of all, you're not creating new connections there, because you're using connect_cached, which will try to reuse an existing connection. Second, even though you might be getting separate Perl objects, your database driver may or may not understand how to handle a fork situation. This is mainly an issue within the database driver (DBD), which likely is written in C and thus needs to take special care when fork()-ing or using multiple threads.

        So far, you haven't shown any code relevant to fork(), and you haven't told us the platform you're running on, nor have you told us which database/driver you're using. There may still be many more interesting problems lurking, for example if you're on Windows, where the fork system call is emulated through threads. But telling us these details would possibly cut short the chase, so don't spoil the fun.

Re: The pain of DBI !!! Lost connection from forked or threaded children
by Arunbear (Prior) on Dec 28, 2009 at 23:05 UTC
      DBIx::Connector did the trick... Although I can't be too sure yet, but the results do look promising. Thank you very much Arunbear ;)
Re: The pain of DBI !!! Lost connection from forked or threaded children
by JavaFan (Canon) on Dec 28, 2009 at 16:47 UTC
    I didn't spot the fork in your code, but could your problem be fixed if you'd just connect to the database after you fork()ed?
Re: The pain of DBI !!! Lost connection from forked or threaded children
by expresspotato (Beadle) on Dec 28, 2009 at 17:44 UTC
    The systems is vanilla Fedora 12. The database is mysql. Processes are forked using the code below. Because I thought forking using Fork Manager might be the problem, I tried threads again with still problems. What I can't believe is even after the fork, a whole new perl is run using system in an attempt to isolate problems like this from occuring. Some of the spwaned processes to video processing which if fails takes the entire parent process (this script) with it. In short, the script currently does threads, but with a few moved comments will just as easily do fork. The problem starts when dl_start finishes.
    #! /usr/bin/perl -T print "Starting...\n"; use DBI; use threads qw(stringify); use threads::shared; use Time::HiRes qw (sleep); do("./sql.pl"); &sql_setup; #use Parallel::ForkManager; #$MAX_PROCESSES = 10000; #$pm = new Parallel::ForkManager($MAX_PROCESSES); $server_id = "A"; share($server_id); $sys_ok = 1; share($sys_ok); $|++; # $pm->run_on_finish( # sub { my ($pid, $exit_code, $ident) = @_; # #print "** $ident just got out of the pool ". # # "with PID $pid and exit code: $exit_code\n"; # } # ); # $pm->run_on_start( # sub { my ($pid,$ident)=@_; # #print "** $ident started, pid: $pid\n"; # } # ); print "Ready ($server_id).\n"; $thr3 = threads->new(\&monitor); $thr3->detach; if ($server_id =~ "proc"){ print "Processing Node Active\n"; $thr4 = threads->new(\&processor,1); $thr4->detach; } $thr1 = threads->new(\&starter); $thr1->detach; $thr2 = threads->new(\&stopper); $thr2->detach; $thr4 = threads->new(\&processor,0); $thr4->detach; while (1){ sleep(1); #$pm->wait_all_children; } sub starter{ my @row; while (1){ if ($sys_ok){ sleep(2); @row = &row_sql(qq~ select * from pool_start where status += "" limit 1; ~,-1); if (!(@row)){ #print "Nothing to do...\n"; }else{ &sql(qq~ update pool_start set status = "RUNNING" wher +e id="@row[0]"; ~,-1); #print "\nStarting (@row[1],@row[2])\n"; #my $thr = threads->create(\&bt_start,@row[1],@row[2], +@row[3]); #$thr->detach; #$child_starter++; #my $pid = $pm->start($child_starter) and next; if (@row[4] eq ""){ # &dl_start(@row[1],@row[2],@row[3],"","",@row[ +5],@row[6],@row[0]); my $thr = threads->create(\&bt_start,@row[1],@ +row[2],@row[3],"","",@row[5],@row[6],@row[0]); $thr->detach; } if (@row[4] eq "URL"){ print @row; &url_start(@row[2],@row[5],@row[6],@row[0]); } #$pm->finish($child_starter); # Terminates the child process #print("Thread $thr started...\n"); } #unless (@row){sleep(2);} }else{ sleep(30); } #$pm->wait_all_children; } } sub stopper{ my @row; while (1){ @row = &row_sql("select b from pool_stop where srv='$server_id +';",-1); if (!(@row)){ #print "Nothing to do...\n"; }else{ #print "\nShould End (@row[0])"; mkdir("./b_stop/@row[0]"); $delete = qq~ delete from pool_stop where b="@row[0]"; ~; &del_sql($delete,-1); } sleep(2); } } sub monitor{ while (1){ sleep(5); $disk =~ s/\%//g; if (($avg > $max_avg) || ($disk > 80)){$sys_ok = 0;}else{$sys_ +ok = 1;} print "($sys_ok)\n"; } } sub processor{ my @row; while (1){ if ($sys_ok){ if ($_[0] eq "0"){ $select = qq~ select * from pool_process where srv="$ser +ver_id" and any != "1" and status=""; ~; } if ($_[0] eq "1"){ $select = qq~ select * from pool_process where any="1" a +nd status=""; ~; } @row = &row_sql($select,-1); if (!(@row)){ #print "Nothing to do...\n"; }else{ &del_sql(qq~ update pool_process set status = "RUNNING +" where id="@row[0]"; ~,-1); print "Processing (@row[1],@row[2],@row[3],@row[4])\n" +; #my $thr = threads->create(\&process_start,@row[1],@ro +w[2],@row[3],@row[4],@row[5]); #$child_processor++; #my $pid = $pm->start($child_processor) and next; # &process_start(@row[1],@row[2],@row[3],@row[4],@row[5],@row +[6],@row[7],@row[0]); my $thr=threads->new(\&process_start,@row[1],@row[ +2],@row[3],@row[4],@row[5],@row[6],@row[7],@row[0]); $thr->detach; #print "($pid)($rand)Child about to get out...\n"; #print "$names[$child], Child $child is about to get + out...\n"; #print "Child about to get out ($$)\n"; #system("kill $$"); #$pm->finish($child_processor); # Terminates the chi +ld process #print "Finished Child!"; } sleep(2); }else{ sleep(30); } #$pm->wait_all_children; } } sub process_start{ system(qq~ perl ./thread_processor.pl "$_[0]" "$_[1]" "$_[2]" "$_[ +3]" "$_[4]" ~); #&tp_remove_process($_[7]); } sub dl_start{ #print $_[2], $_[2]; system(qq~ perl ./fork_sc.pl "$_[0]" "$_[1]" "$_[2]" "$_[3]" "$_[4 +]" "$_[5]" "$_[6]" ~); #&tp_remove_start($_[7]); } sub url_start{ #print "$_[0], $_[1], $_[2]"; system(qq~ perl ./fork_url.pl "$_[0]" "$_[1]" "$_[2]" ~); #&tp_remove_start($_[3]); } sub tp_remove_start(){ &sql(qq~ delete from pool_start where id="$_[0]"; ~); } sub tp_remove_process(){ &sql(qq~ delete from pool_process where id="$_[0]"; ~); }