in reply to Can't get multiple Parallel::ForkManager threads connecting to DBD::Pg database

Usually, database drivers don't like fork(), or rather, you should create the database connection after forking.

If you move creating the database connection into your fork loop (or maybe into run_on_start), maybe things start working?

Replies are listed 'Best First'.
Re^2: Can't get multiple Parallel::ForkManager threads connecting to DBD::Pg database
by perlygapes (Beadle) on Jul 22, 2021 at 13:56 UTC
    I got it working using the following code.

    In short, this code creates multiple threads (maybe processes, not sure) using `Parallel::ForkManager`, and each thread gets it's own database handle and associated statement handles.
    It abstracts `dbh` and `statement` handle creation to a sub, and also `db write` and `db handle close` functions.
    use 5.24.0; use strict; use warnings; use Parallel::ForkManager; use SQL::Abstract; use DBI ':sql_types'; use DBD::Pg qw/:pg_types/; #my @codes = ("A"); # testing single thread my @codes = ("A","B","F","M","S"); # testing multi-thread my %varset; ################################################################ # get db connection info ################################################################ my $dsn = 'DBI:Pg:dbname=mt4_test'; my $userid = $ENV{DBI_USER}; my $sesame = $ENV{DBI_PASS}; my %dbh; # hash for storing dbh handles ################################################################ # prepare array and hash for matching db columns ################################################################ my %columns; # hash for persistent mapping of column-values my @columns; # deterministic - $values[$columns{$column}]; set_columns(); # define column<->value mapping for db table my $placeholders = join(", ", map { '?' } @columns); ################################################################ # Build the SELECT SQL statement ################################################################ my $sql_select_statement = qq(SELECT count(*) FROM mytable WHERE field1 = ?;); ################################################################ # Build the INSERT SQL statement ################################################################ my $sql_insert_statement = "INSERT INTO mytable (" . join(", ", @columns) # column names . ") VALUES ($placeholders)"; ################################################################ # hash for storing SQL statement handles for threads ################################################################ my %sth_select_code; my %sth_insert_code; ################################################################ # create Parallel::ForkManager object for @codes ################################################################ my $optimization = Parallel::ForkManager->new(scalar @codes); $optimization->run_on_start(sub{ my ($pid,$ident) = @_; print "Starting $ident under process id $pid\n"; }); $optimization->run_on_finish(sub{ my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_structure_reference) = @_; }); my $thread_count = 0; OPTIMIZATION: for my $code (@codes) { $thread_count++; print "Thread $thread_count running for $code\n"; # fork optimization threads - per code if (scalar @codes > 1) { $optimization->start($code) and next OPTIMIZATION; } else { $optimization->start($code); } launch_sub($code); print "\$optimization->finish on child $code\n"; $optimization->finish(0); } print "\$optimization->wait_all_children() is waiting...\n"; $optimization->wait_all_children(); ################################################################ # THE END ################################################################ exit; ################################################################ ################################################################ sub launch_sub { ################################################################ my $code = shift; if ($code =~ m/A/i) { sub_a("A"); } elsif ($code =~ m/B/i) { sub_b("B"); } elsif ($code =~ m/F/i) { sub_f("F"); } elsif ($code =~ m/M/i) { sub_m("M"); } elsif ($code =~ m/S/i) { sub_s("S"); } } ################################################################ sub sub_a { ################################################################ my $code = shift; # Code my @values; my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_sele +ct_code{$code},\$sth_insert_code{$code}); # generate values specific to A $varset{field2} = 'a_f1'; # for illustrative purposes $varset{field3} = 'a_f2'; $varset{field4} = 'a_f3'; $varset{field5} = 'a_f4'; foreach my $key (keys %varset) { my $column = $key; # the column name as key my $value = $varset{$key}; # the column value (field variable +) $values[$columns{$column}] = $value; # add to list of column v +alues } $values[0] = $code; write_to_db($code, @values); disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_ +code{$code}); } ################################################################ sub sub_b { ################################################################ my $code = shift; # Code my @values; my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_sele +ct_code{$code},\$sth_insert_code{$code}); # generate values specific to B $varset{field2} = 'b_f1'; # for illustrative purposes $varset{field3} = 'b_f2'; $varset{field4} = 'b_f3'; $varset{field5} = 'b_f4'; foreach my $key (keys %varset) { my $column = $key; # the column name as key my $value = $varset{$key}; # the column value (field variable +) $values[$columns{$column}] = $value; # add to list of column v +alues } $values[0] = $code; write_to_db($code, @values); disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_ +code{$code}); } ################################################################ sub sub_f { ################################################################ my $code = shift; # Code my @values; my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_sele +ct_code{$code},\$sth_insert_code{$code}); # generate values specific to F $varset{field2} = 'f_f1'; # for illustrative purposes $varset{field3} = 'f_f2'; $varset{field4} = 'f_f3'; $varset{field5} = 'f_f4'; foreach my $key (keys %varset) { my $column = $key; # the column name as key my $value = $varset{$key}; # the column value (field variable +) $values[$columns{$column}] = $value; # add to list of column v +alues } $values[0] = $code; write_to_db($code, @values); disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_ +code{$code}); } ################################################################ sub sub_m { ################################################################ my $code = shift; # Code my @values; my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_sele +ct_code{$code},\$sth_insert_code{$code}); # generate values specific to M $varset{field2} = 'm_f1'; # for illustrative purposes $varset{field3} = 'm_f2'; $varset{field4} = 'm_f3'; $varset{field5} = 'm_f4'; foreach my $key (keys %varset) { my $column = $key; # the column name as key my $value = $varset{$key}; # the column value (field variable +) $values[$columns{$column}] = $value; # add to list of column v +alues } $values[0] = $code; write_to_db($code, @values); disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_ +code{$code}); } ################################################################ sub sub_s { ################################################################ my $code = shift; # Code my @values; my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_sele +ct_code{$code},\$sth_insert_code{$code}); # generate values specific to S $varset{field2} = 's_f1'; # for illustrative purposes $varset{field3} = 's_f2'; $varset{field4} = 's_f3'; $varset{field5} = 's_f4'; foreach my $key (keys %varset) { my $column = $key; # the column name as key my $value = $varset{$key}; # the column value (field variable +) $values[$columns{$column}] = $value; # add to list of column v +alues } $values[0] = $code; write_to_db($code, @values); disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_ +code{$code}); } ################################################################ sub create_dbh { ################################################################ my $dbh_ref = shift; my $sel_ref = shift; my $ins_ref = shift; ${$dbh_ref} = DBI->connect($dsn, $userid, $sesame, { AutoCommit => 1, RaiseError => 1, PrintError => 1 }) or die "Connection failed!\n" . $DBI::errstr; # did it work? are we there yet? my $me = ${$dbh_ref}->{Driver}{Name}; my $sversion = ${$dbh_ref}->{pg_server_version}; print "DBI is version $DBI::VERSION, " . "I am $me, " . "version of DBD::Pg is $DBD::Pg::VERSION, " . "server is $sversion\n"; print "Name: ${$dbh_ref}->{Name}\n"; # prepare the SELECT statement handle ${$sel_ref} = ${$dbh_ref}->prepare_cached($sql_select_statement); # prepare the INSERT statement handle ${$ins_ref} = ${$dbh_ref}->prepare_cached($sql_insert_statement); } ################################################################ sub write_to_db { ################################################################ my ($code, @values) = @_; my $rv_code = $sth_select_code{$code}->execute($code); say "SQL SELECT for $code: rv_code = $rv_code"; if($rv_code < 0) { print $DBI::errstr; } my @row = $sth_select_code{$code}->fetchrow_array(); # if the SELECT found no existing records for this strategy, then +INSERT it unless ($row[0] > 0) { # INSERT settings into 'mytable' $sth_insert_code{$code}->execute(@values); say "SQL INSERT for $code"; } } ################################################################ sub disconnect_dbh { ################################################################ my $dbh_ref = shift; my $sel_ref = shift; my $ins_ref = shift; ${$sel_ref}->finish(); ${$ins_ref}->finish(); ${$dbh_ref}->disconnect; say "disconnected dbh_ref: $dbh_ref"; } ################################################################ sub set_columns { ################################################################ $columns{field1} = 0; $columns{field2} = 1; $columns{field3} = 2; $columns{field4} = 3; $columns{field5} = 4; $columns[0] = 'field1'; $columns[1] = 'field2'; $columns[2] = 'field3'; $columns[3] = 'field4'; $columns[4] = 'field5'; }