Beloved Brewing Brethren Beholding Bitwise Borders,
This follows on from my previous questions regarding Parallel::ForkManager and DBD::Pg.
I have posted this to Stackoverflow, but haven't got a working answer...
I am trying to create DB connections from multiple child threads, each of which is spawned as a child using Parallel::ForkManager.
I cannot get this to work. I basically keep having the same sharing error - DBH cannot be shared between threads.
Here is my example. I have included both a create_dbh and a clone_dbh sub-routine as options. Neither work.
SQL TABLE SCRIPT:
CREATE TABLE mytable (
field1 varchar(24) NULL,
field2 varchar(24) NULL,
field3 varchar(24) NULL,
field4 varchar(24) NULL,
field5 varchar(24) NULL,
pk serial NOT NULL,
CONSTRAINT mytable_pk PRIMARY KEY (pk)
);
PERL SCRIPT
use 5.24.0;
use strict;
use warnings;
use Parallel::ForkManager;
use SQL::Abstract;
use DBI ':sql_types';
use DBD::Pg qw/:pg_types/;
#@codes = ("A"); # testing single thread
my @codes = ("A","B","F","M","S"); # testing multi-thread
my %varset;
################################################################
# connect to db
################################################################
my $dsn = 'DBI:Pg:dbname=$ENV{DB_NAME}';
my $userid = $ENV{DBI_USER};
my $sesame = $ENV{DBI_PASS};
my %dbh; # hash for storing dbh handles
my $dbh = DBI->connect($dsn, $userid, $sesame, {
AutoCommit => 1,
RaiseError => 1,
PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;
################################################################
# test db connection
################################################################
my $me = $dbh->{Driver}{Name};
my $sversion = $dbh->{pg_server_version};
print "DBI is version $DBI::VERSION, "
. "I am $me, "
. "version of DBD::Pg is $DBD::Pg::VERSION, "
. "server is $sversion\n";
print "Name: $dbh->{Name}\n";
################################################################
# prepare array and hash for matching db columns
################################################################
my %columns; # hash for persistent mapping of column-values
my @columns; # deterministic - $values[$columns{$column}];
set_columns(); # define column<->value mapping for db table
my $placeholders = join(", ", map { '?' } @columns);
################################################################
# Build the SELECT SQL statement
################################################################
my $sql_select_statement =
qq(SELECT count(*) FROM mytable WHERE id = ?;);
################################################################
# Build the INSERT SQL statement
################################################################
my $sql_insert_statement =
"INSERT INTO mytable ("
. join(", ", @columns) # column names
. ") VALUES ($placeholders)";
################################################################
# create clones of database handle and SQL statements for threads
################################################################
my %sth_select_code;
my %sth_insert_code;
#for my $code (@codes) {
# $dbh{$code} = $dbh->clone();
# # prepare the SELECT statement handle
# $sth_select_code{$code} = $dbh{$code}->prepare_cached($sql_select
+_statement);
# # prepare the INSERT statement handle
# $sth_insert_code{$code} = $dbh{$code}->prepare_cached($sql_insert
+_statement);
#}
################################################################
# create Parallel::ForkManager object for @codes
################################################################
my $optimization = Parallel::ForkManager->new(scalar @codes);
$optimization->run_on_start(sub{
my ($pid,$ident) = @_;
print "Starting $ident under process id $pid\n";
});
$optimization->run_on_finish(sub{
my ($pid,
$exit_code,
$ident,
$exit_signal,
$core_dump,
$data_structure_reference) = @_;
});
my $thread_count = 0;
OPTIMIZATION:
for my $code (@codes) {
$thread_count++;
print "Thread $thread_count running for $code\n";
# fork optimization threads - per code
$optimization->start($code) and next OPTIMIZATION;
if ($code =~ m/A/i) {
sub_a("A");
} elsif ($code =~ m/B/i) {
sub_b("B");
} elsif ($code =~ m/F/i) {
sub_f("F");
} elsif ($code =~ m/M/i) {
sub_m("M");
} elsif ($code =~ m/S/i) {
sub_s("S");
}
print "\$optimization->finish on child $code\n";
$optimization->finish(0);
}
print "\$optimization->wait_all_children() is waiting...\n";
$optimization->wait_all_children();
################################################################
# disconnect from database
################################################################
for my $code (@codes) {
$sth_select_code{$code}->finish();
$sth_insert_code{$code}->finish();
$dbh{$code}->disconnect;
}
$dbh->disconnect;
################################################################
# end
################################################################
exit;
################################################################
sub sub_a {
################################################################
my $code = shift; # Code
my @values;
# clone_dbh($code);
create_dbh($code);
# my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
# generate values specific to A
$varset{field2} = 'a_f1'; # for illustrative purposes
$varset{field3} = 'a_f2';
$varset{field4} = 'a_f3';
$varset{field5} = 'a_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable
+)
$values[$columns{$column}] = $value; # add to list of column v
+alues
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_b {
################################################################
my $code = shift; # Code
my @values;
# clone_dbh($code);
create_dbh($code);
# my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
# generate values specific to B
$varset{field2} = 'b_f1'; # for illustrative purposes
$varset{field3} = 'b_f2';
$varset{field4} = 'b_f3';
$varset{field5} = 'b_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable
+)
$values[$columns{$column}] = $value; # add to list of column v
+alues
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_f {
################################################################
my $code = shift; # Code
my @values;
# clone_dbh($code);
create_dbh($code);
# my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
# generate values specific to F
$varset{field2} = 'f_f1'; # for illustrative purposes
$varset{field3} = 'f_f2';
$varset{field4} = 'f_f3';
$varset{field5} = 'f_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable
+)
$values[$columns{$column}] = $value; # add to list of column v
+alues
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_m {
################################################################
my $code = shift; # Code
my @values;
# clone_dbh($code);
create_dbh($code);
# my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
# generate values specific to M
$varset{field2} = 'm_f1'; # for illustrative purposes
$varset{field3} = 'm_f2';
$varset{field4} = 'm_f3';
$varset{field5} = 'm_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable
+)
$values[$columns{$column}] = $value; # add to list of column v
+alues
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_s {
################################################################
my $code = shift; # Code
my @values;
# clone_dbh($code);
create_dbh($code);
# my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
# generate values specific to S
$varset{field2} = 's_f1'; # for illustrative purposes
$varset{field3} = 's_f2';
$varset{field4} = 's_f3';
$varset{field5} = 's_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable
+)
$values[$columns{$column}] = $value; # add to list of column v
+alues
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub create_dbh {
################################################################
my $code = shift;
$dbh{$code} = DBI->connect($dsn, $userid, $sesame, {
AutoCommit => 1,
RaiseError => 1,
PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;
# did it work? are we there yet?
my $me = $dbh{$code}->{Driver}{Name};
my $sversion = $dbh{$code}->{pg_server_version};
print "DBI is version $DBI::VERSION, "
. "I am $me, "
. "version of DBD::Pg is $DBD::Pg::VERSION, "
. "server is $sversion\n";
print "Name: $dbh->{Name}\n";
# prepare the SELECT statement handle
$sth_select_code{$code} = $dbh{$code}->prepare_cached($sql_select_
+statement);
# prepare the INSERT statement handle
$sth_insert_code{$code} = $dbh{$code}->prepare_cached($sql_insert_
+statement);
}
################################################################
sub clone_dbh {
################################################################
my $code = shift;
$dbh{$code} = $dbh->clone();
# prepare the SELECT statement handle
$sth_select_code{$code} = $dbh{$code}->prepare_cached($sql_select_
+statement);
# prepare the INSERT statement handle
$sth_insert_code{$code} = $dbh{$code}->prepare_cached($sql_insert_
+statement);
}
################################################################
sub write_to_db {
################################################################
my ($code, @values) = shift @_;
my $rv_code = $sth_select_code{$code}->execute($code);
if($rv_code < 0) {
print $DBI::errstr;
}
my @row = $sth_select_code{$code}->fetchrow_array();
# if the SELECT found no existing records for this strategy, then
+INSERT it
unless ($row[0] > 0) {
# INSERT settings into 'mytable'
$sth_insert_code{$code}->execute(@values);
}
}
################################################################
sub set_columns {
################################################################
$columns{field1} = 0;
$columns{field2} = 1;
$columns{field3} = 2;
$columns{field4} = 3;
$columns{field5} = 4;
$columns[0] = 'field1';
$columns[1] = 'field2';
$columns[2] = 'field3';
$columns[3] = 'field4';
$columns[4] = 'field5';
}
Do I have to pass dbh and statement handles to the create_dbh sub explicitly from within the calling sub_x thread?
Do I have to return the handles or handle_refs from create_dbh back to calling sub_x thread?
I don't know how to get around this, but it seems like a lexical scope or object/memory access issue.
Any advice?