| Category: | Databases |
| Author/Contact Info | Daniel Anderson dan@ngenllc.com |
| Description: | I've run into a problem for a script I'm writing, namely DBD::mysql isn't thread safe. (Try executing a query on a DBD::mysql object after you've done a fork or two). Now, the common solution (according to the archives) is to create a new DBD::mysql object within every fork. Unfortunately that isn't an option for me, so I have begun creating a DBD::mysql replacement that is thread and fork safe and also can be used to perform rudimentary table locking (so you don't delete something that is being selected -- still working on that). So I have created the following, threadsafe.pm, and I'd appreciate any comments on how I can improve on it (or even patches) before I post an updated version and create a module for CPAN. Thanks for looking! Vautrin |
# ATTENTION!
# These files distributed under the GNU GPL
# view it here: http://www.gnu.org/licenses/gpl.txt
#############
# BEGIN POD #
#############
=head1 NAME
threadsafe - Implements a thread safe DBD::mysql
=head1 SYNOPSIS
C<
use strict;
use warnings;
use debug_skeleton;
use threadsafe;
my $debugger = debug_skeleton->create;
my $conn = threadsafe->create({
hostname => 'hostname',
username => 'username',
password => 'password',
database => 'database',
debugger => $debugger,
});
my $query = "SELECT * FROM table";
my $result = $conn->prepare
or die ("Can't prepare $query");
$result->execute
or die ("Can't execute $query");
my $row = $result->fetchrow_hashref;
>
=head1 DESCRIPTION
The threadsafe.pm module provides a thread safe drop in
replacement for DBD::mysql. This module implements as many functions
from DBD::mysql as was necessary to allow the database_controller.pm
objects to work in a multithreaded enviornment. Code has also been
put in place to ensure that race conditions and deadlocks do not occur
-- dangerous possibility with multithreaded database queries on a
database that does not support transactions.
=head1 Public Functions
The public Object Oriented Interface (OOI) to threadsafe
involves the use of the following functions:
=over 4
=item create
The C<create> function is the constructor for threadsafe.pm.
It must contain one argument, a hash reference that contains the
following keys and values:
=over 4
=item debugger (Optional)
The key debugger should point to an instance of debug_skeleton
-- which is the preferred means to log all info / warnings /errors.
It is important to note that although this is optional (a default
debug_skeleton object will be created, pointing all output to STDERR),
it is much better to keep all error messages centralized.
=item username
The username to the database.
=item hostname
The hostname or IP address the DBD::mysql driver should
connect to.
=item password
The password (if any) to connect to the database. For no
password use an empty string (C<"">) and B<not> C<undef>.
=item database
The database to connect to. For security reasons this is not
optional.
=item port (Optional)
The port to connect to. Optional.
=back
=item prepare
The C<prepare> function takes one argument, the query it should
run on the database connection. It is used before the C<execute>
command, and in order to maintain compatibility with DBD::mysql's
prepare function returns a copy of its threadsafe object.
=item execute
The C<execute> function does not take any objects and executes
the query which was previously prepared. It returns TRUE or FALSE
depending on whether or not the query was successfully executed.
=item fetchrow_hashref
After executing a command, returns a row, or undef if there are
no more rows to be gotten.
=back
=head1 Notes on Protected Functions
It is entirely possible that in the future you will want to
extend or modify the internals of the threadsafe.pm. All functions
contain comments and documentation as to their function -- they have
been left out of the POD for clarity. Documentation will be of two
forms: comments or logging directives. The logging directives, are
typically of the form:
C<
$debug_skeleton_object->info("Comment", "Function");
$debug_skeleton_object->warning("Warning Message");
$debug_skeleton_object->error("Error Message");
$debug_skeleton_object->fatal_error("Fatal Error");
>
They provide valueable feedback of what a function is supposed
to do, why things are happening, and should make clear the reasons why
things are done the way they are done. If logging directives are ever
unclear comments will be added to the code to make everything clear as
to what is happening and why.
Also, it is important to recognize that most functions simply
call DBD::mysql functions after double checking that:
a. The program id ($$ -- perldoc perlvars for more info) has not
changed. This is a sure sign that we have forked and a new
database connection must be created.
b. If locking is in place, acquire a lock for the prepare /
execute statements.
=head1 NOTES
Because I prefer an object oriented interface (and do not like
to see main filled up with too many subroutines and variables
(courtesy of Exporter)), a database_controller must be instantiated
and then all methods must be called through it.
Also, because it makes it much easier to catch errors, all
functions are designed to exit if they do not expicitly return a value
and leave a record of exactly what went wrong in the debug log.
Consult the debug_skeleton.pm POD for details on turning on logging
and its use.
=head1 Notes on Editors
The editor used to create this file was GNU Emacs 21.3.3 (CVS
version). Although an effort was made to ensure that all formatting
would be the same across other editors (all tabs were converted to 4
spaces), if you have any problems viewing the code you are encouraged
to get a copy of GNU Emacs or its cousin XEmacs and try it out for
yourself (Check out cperl mode if you do).
=head1 Author
This file was created by Dan Anderson. If you need technical
support or would like to contact him about extending the project, or
just have any questions or comments, you are encouraged to contact him
at:
Dan Anderson
(914) 466-8526
dan@ngenllc.com
http://www.ngenllc.com
=cut
###########
# END POD #
###########
#! /usr/bin/perl
use strict;
use warnings;
use DBI;
use debug_skeleton;
package threadsafe;
{
####################
# PUBLIC FUNCTIONS #
####################
sub create {
my $class = shift(@_);
my $options = shift (@_);
die ("We are not a copy constructor") if (ref $class);
my $debugger;
if ($options->{debugger}) {
$debugger = $options->{debugger};
$debugger->info("Just entered threadsafe::create", "threadsafe::
+create");
}
else {
$debugger = debug_skeleton->new;
$debugger->info("Just entered threadsafe::create. Created debug
+_skeleton here",
"threadsafe::create");
}
if (not $options->{hostname}) {
$debugger->fatal("You must provide a hostname");
}
elsif (not $options->{username}) {
$debugger->fatal("You must provide a username");
}
elsif (not (defined $options->{password})) {
$debugger->fatal("The password was not defined.");
}
elsif (not $options->{database}) {
$debugger->fatal("You must provide a database name");
}
my $self = {
pid => $$,
hostname => $options->{hostname},
username => $options->{username},
password => $options->{password},
database => $options->{database},
port => $options->{port},
debugger => $debugger,
conn => undef,
result => undef,
query => undef,
};
bless $self, $class;
}
sub prepare {
my $self = shift (@_);
my $query = shift (@_);
my $debugger = $self->{debugger};
$debugger->info("Just entered threadsafe::prepare for query $query
+",
"threadsafe::prepare");
$self->check_connection;
my $conn = $self->{conn};
$self->{query} = $query;
$self->{result} = $conn->prepare($query);
unless ($self->{result}) {
$debugger->info("Could not prepare the query. Returning undef",
+ "threadsafe::prepare");
return undef;
}
$debugger->info("Query successfully prepared. Returning \$self",
+"threadsafe::prepare");
# we have to return $self because we want to support the old parad
+igm:
# my $result = $conn->prepare; $result->execute;
return $self;
}
sub execute {
my $self = shift (@_);
my $conn = $self->{conn};
my $result = $self->{result};
my $debugger = $self->{debugger};
$debugger->info("Just entered threadsafe::execute",
"threadsafe::execute");
unless ($result) {
$debugger->fatal("No result was prepared!");
}
$self->check_connection;
unless ($result) {
$debugger->fatal("Connection reset...");
}
unless ($result->execute) {
$debugger->info("Just left threadsafe::execute. Returning FALSE
+",
"threadsafe::execute");
return 0;
}
$debugger->info("Just left threadsafe::execute. Returning TRUE.",
"threadsafe::execute");
return 1;
}
sub fetchrow_hashref {
my $self = shift (@_);
my $conn = $self->{conn};
my $debugger = $self->{debugger};
$debugger->info("Just entered threadsafe::fetchrow_hashref",
"threadsafe::fetchrow_hashref");
unless ($self->{result}) {
$debugger->fatal_error("There was no result to fetch.");
}
$self->check_connection;
unless ($self->{result}) {
$debugger->fatal_error("Connection was reset.");
}
my $result = $self->{result};
$debugger->info("Just left threadsafe::fetchrow_hashref",
"threadsafe::fetchrow_hashref");
return $self->fetchrow_hashref;
}
########################
# END PUBLIC FUNCTIONS #
########################
#######################
# PROTECTED FUNCTIONS #
#######################
sub check_connection {
my $self = shift (@_);
my $debugger = $self->{debugger};
$debugger->info("Just entered threadsafe::check_connection",
"threadsafe::");
if ((not (defined ($self->{conn}))) or (not ($$ == $self->{pid})))
+ {
$self->{pid} = $$;
my $conn_string = "DBI:mysql:database=$self->{database};host=$se
+lf->{hostname};";
if ($self->{port}) {
$conn_string .= "port=$self->{port};";
}
$self->{conn} = DBI->connect($conn_string, $self->{username}, $s
+elf->{password})
or $debugger->fatal_error("Could not connect to the database becau
+se: $!");
$self->{result} = undef;
$self->{query} = undef;
}
$debugger->info("Just left threadsafe::check_connection",
"threadsafe::");
}
###########################
# END PROTECTED FUNCTIONS #
###########################
1;
}
# The following is the debug_skeleton.pm file I use for
# debugging:
#! /usr/bin/perl
##############
# BEGIN POD: #
##############
############
# END POD: #
############
###############
# BEGIN CODE: #
###############
package debug_skeleton;
{
sub create {
my $class = shift (@_);
my $debug_logfile = "./debug_skeleton.log";
my $debug_info = 1;
my $debug_warning = 1;
my $debug_error = 1;
my $show_info = {};
my $all_info = 1;
while (my $option = shift (@_)) {
if ($option eq 'LOG') {
$debug_logfile = shift (@_);
}
elsif ($option eq 'INFO') {
$debug_info = shift (@_);
}
elsif ($option eq 'WARNING') {
$debug_warning = shift (@_);
}
elsif ($option eq 'ERROR') {
$debug_error = shift (@_);
}
elsif ($option eq 'SHOWINFO') {
$show_info->{shift (@_)} = 1;
}
elsif ($option eq 'ALLINFO') {
$all_info = shift (@_);
}
else {
die ("Not a valid option: $option");
}
}
my $self = {
logfile => $debug_logfile,
log_open => 0,
log_handle => undef,
info => $debug_info,
warning => $debug_warning,
error => $debug_error,
show_info => $show_info,
all_info => $all_info,
nologging => 0,
};
bless $self, $class;
}
sub info {
my $self = shift (@_);
my $message = shift (@_);
my $class = shift (@_);
unless ($self->{nologging}) {
if ($self->{all_info} or $self->{show_info}->{$class}) {
if ($self->{info}) {
$self->printInfo ($message);
}
else {
$self->logInfo ($message);
}
}
}
}
sub warning {
my $self = shift (@_);
my $message = shift (@_);
unless ($self->{nologging}) {
if ($self->{warning}) {
$self->printWarning ($message);
}
else {
$self->logWarning ($message);
}
}
}
sub error {
my $self = shift (@_);
my $message = shift (@_);
unless ($self->{nologging}) {
if ($self->{error}) {
$self->printError ($message);
}
else {
$self->logError ($message);
}
}
}
sub fatal_error {
my $self = shift (@_);
my $message = shift (@_);
my $exit_code = shift (@_);
if (not ($exit_code)) {
$exit_code = 1;
}
unless ($self->{nologging}) {
if ($self->{error}) {
$self->printError ($message);
}
else {
$self->logError ($message);
}
exit ($exit_code);
}
}
sub printInfo ($) {
my $self = shift (@_);
my $message = shift (@_);
print STDERR "INFO: ", $message, "\n";
}
sub logInfo ($) {
my $self = shift (@_);
my $message = shift (@_);
my $fh = $self->checkLog;
print $fh "INFO: ", $message, "\n";
}
sub printWarning ($) {
my $self = shift (@_);
my $message = shift (@_);
print STDERR "WARNING: ", $message, "\n";
}
sub logWarning ($) {
my $self = shift (@_);
my $message = shift (@_);
my $fh = $self->checkLog;
print $fh "WARNING: ", $message, "\n";
}
sub printError ($) {
my $self = shift (@_);
my $message = shift (@_);
print STDERR "ERROR: ", $message, "\n";
}
sub logError ($) {
my $self = shift (@_);
my $message = shift (@_);
my $fh = $self->checkLog;
print $fh "ERROR: ", $message, "\n";
}
sub checkLog () {
my $self = shift (@_);
unless ($self->{log_open}) {
my $fh;
open ($fh, "> $self->{logfile}")
or die ("Cannot open the log file because $!");
$self->{log_open} = 1;
$self->{log_handle} = $fh;
return $fh;
}
else {
return $self->{log_handle};
}
}
1;
}
############
# END CODE #
############
|
|
|
|---|
| Replies are listed 'Best First'. | |
|---|---|
|
Re: Thread and Fork Safe DBI and DBD::mysql replacement
by simonm (Vicar) on Feb 06, 2004 at 22:47 UTC | |
by Vautrin (Hermit) on Feb 07, 2004 at 00:30 UTC | |
|
Re: Thread and Fork Safe DBI and DBD::mysql replacement
by jZed (Prior) on Feb 06, 2004 at 23:25 UTC |