Category: Databases
Author/Contact Info Daniel Anderson dan@ngenllc.com
Description:

I've run into a problem for a script I'm writing, namely DBD::mysql isn't thread safe. (Try executing a query on a DBD::mysql object after you've done a fork or two). Now, the common solution (according to the archives) is to create a new DBD::mysql object within every fork. Unfortunately that isn't an option for me, so I have begun creating a DBD::mysql replacement that is thread and fork safe and also can be used to perform rudimentary table locking (so you don't delete something that is being selected -- still working on that).

So I have created the following, threadsafe.pm, and I'd appreciate any comments on how I can improve on it (or even patches) before I post an updated version and create a module for CPAN.

Thanks for looking!

Vautrin

# ATTENTION!
# These files distributed under the GNU GPL
# view it here: http://www.gnu.org/licenses/gpl.txt

#############
# BEGIN POD #
#############

=head1 NAME

threadsafe - Implements a thread safe DBD::mysql

=head1 SYNOPSIS

C<
use strict;
use warnings;
use debug_skeleton;
use threadsafe;

my $debugger = debug_skeleton->create;
my $conn = threadsafe->create({
                               hostname => 'hostname',
                               username => 'username',
                               password => 'password',
                               database => 'database',
                               debugger => $debugger,
                              });
my $query = "SELECT * FROM table";
my $result = $conn->prepare
  or die ("Can't prepare $query");
$result->execute
  or die ("Can't execute $query");
my $row = $result->fetchrow_hashref;

>

=head1 DESCRIPTION

    The  threadsafe.pm  module  provides  a thread  safe  drop  in
replacement for DBD::mysql.  This  module implements as many functions
from DBD::mysql  as was necessary to  allow the database_controller.pm
objects to  work in a  multithreaded enviornment.  Code has  also been
put in place to ensure that race conditions and deadlocks do not occur
-- dangerous  possibility  with multithreaded  database  queries on  a
database that does not support transactions. 

=head1 Public Functions

        The  public  Object  Oriented  Interface (OOI)  to  threadsafe
involves the use of the following functions:

=over 4

=item create

        The C<create>  function is the  constructor for threadsafe.pm.
It  must contain  one argument,  a  hash reference  that contains  the
following keys and values:

=over 4

=item debugger (Optional)

        The key debugger should point to an instance of debug_skeleton
-- which is  the preferred means to  log all info  / warnings /errors.
It  is important to  note that  although this  is optional  (a default
debug_skeleton object will be created, pointing all output to STDERR),
it is much better to keep all error messages centralized.

=item username

        The username to the database. 

=item hostname

        The  hostname  or  IP  address the  DBD::mysql  driver  should
connect to.

=item password

        The  password (if  any) to  connect to  the database.   For no
password use an empty string (C<"">) and B<not> C<undef>. 

=item database

        The database to connect to.   For security reasons this is not
optional. 

=item port (Optional)

        The port to connect to.  Optional. 

=back

=item prepare

      The C<prepare> function takes  one argument, the query it should
run  on the  database connection.  It  is used  before the  C<execute>
command,  and in  order  to maintain  compatibility with  DBD::mysql's
prepare function returns a copy of its threadsafe object.

=item execute

      The C<execute>  function does not take any  objects and executes
the query  which was  previously prepared.  It  returns TRUE  or FALSE
depending on whether or not the query was successfully executed. 

=item fetchrow_hashref

      After executing a command, returns  a row, or undef if there are
no more rows to be gotten. 

=back


=head1 Notes on Protected Functions

       It is  entirely possible  that in the  future you will  want to
extend or  modify the internals  of the threadsafe.pm.   All functions
contain comments and  documentation as to their function  -- they have
been left  out of the POD  for clarity.  Documentation will  be of two
forms: comments  or logging  directives.  The logging  directives, are
typically of the form:

C<
$debug_skeleton_object->info("Comment", "Function");
$debug_skeleton_object->warning("Warning Message");
$debug_skeleton_object->error("Error Message");
$debug_skeleton_object->fatal_error("Fatal Error");
>

    They provide valueable feedback of what a function is supposed
to do, why things are happening, and should make clear the reasons why
things are done the way they are done.  If logging directives are ever
unclear comments will be added to the code to make everything clear as
to what is happening and why.

        Also, it is important to recognize that most functions simply
call DBD::mysql functions after double checking that:

     a. The program id ($$ -- perldoc perlvars for more info) has not
     changed.   This is  a sure  sign that  we have  forked and  a new
     database connection must be created.  

     b.  If locking is in place, acquire a lock for the prepare /
     execute statements.

=head1 NOTES

       Because I prefer an object  oriented interface (and do not like
to  see  main  filled  up  with too  many  subroutines  and  variables
(courtesy  of Exporter)), a  database_controller must  be instantiated
and then all methods must be called through it.

    Also, because  it makes  it much easier  to catch  errors, all
functions are designed to exit if they do not expicitly return a value
and  leave a  record of  exactly  what went  wrong in  the debug  log.
Consult the  debug_skeleton.pm POD for  details on turning  on logging
and its use.

=head1 Notes on Editors

       The editor used  to create this file was  GNU Emacs 21.3.3 (CVS
version).  Although an  effort was made to ensure  that all formatting
would be the  same across other editors (all tabs  were converted to 4
spaces), if you have any  problems viewing the code you are encouraged
to get  a copy of GNU  Emacs or its cousin  XEmacs and try  it out for
yourself (Check out cperl mode if you do). 

=head1 Author

       This file was created by Dan Anderson.  If you need technical
support or would  like to contact him about  extending the project, or
just have any questions or comments, you are encouraged to contact him
at:

Dan Anderson
(914) 466-8526
dan@ngenllc.com
http://www.ngenllc.com

=cut

###########
# END POD #
###########


#! /usr/bin/perl

use strict;
use warnings;
use DBI;
use debug_skeleton;

package threadsafe;
{

  ####################
  # PUBLIC FUNCTIONS #
  ####################

  sub create {
    my $class = shift(@_);
    my $options = shift (@_);
    die ("We are not a copy constructor") if (ref $class);

    my $debugger;
    if ($options->{debugger}) {
      $debugger = $options->{debugger};
      $debugger->info("Just entered threadsafe::create", "threadsafe::
+create");
    }
    else {
      $debugger = debug_skeleton->new;
      $debugger->info("Just entered threadsafe::create.  Created debug
+_skeleton here",
              "threadsafe::create");
    }

    if (not $options->{hostname}) {
      $debugger->fatal("You must provide a hostname");
    }
    elsif (not $options->{username}) {
      $debugger->fatal("You must provide a username");
    }
    elsif (not (defined $options->{password})) {
      $debugger->fatal("The password was not defined.");
    }
    elsif (not $options->{database}) {
      $debugger->fatal("You must provide a database name");
    }

    my $self = {
        pid      => $$,
        hostname => $options->{hostname},
        username => $options->{username},
        password => $options->{password},
        database => $options->{database},
        port     => $options->{port},
        debugger => $debugger,
        conn     => undef,
        result   => undef,
        query    => undef,
           };
    bless $self, $class;
  }

  sub prepare {
    my $self = shift (@_);
    my $query = shift (@_);
    my $debugger = $self->{debugger};
    $debugger->info("Just entered threadsafe::prepare for query $query
+",
            "threadsafe::prepare");
    $self->check_connection;
    my $conn = $self->{conn};
    $self->{query} = $query;
    $self->{result} = $conn->prepare($query);
    unless ($self->{result}) {
      $debugger->info("Could not prepare the query.  Returning undef",
+ "threadsafe::prepare");
      return undef;
    }
    $debugger->info("Query successfully prepared.  Returning \$self", 
+"threadsafe::prepare");
    # we have to return $self because we want to support the old parad
+igm:
    # my $result = $conn->prepare; $result->execute;
    return $self;
  }

  sub execute {
    my $self = shift (@_);
    my $conn = $self->{conn};
    my $result = $self->{result};
    my $debugger = $self->{debugger};
    $debugger->info("Just entered threadsafe::execute",
            "threadsafe::execute");
    unless ($result) {
      $debugger->fatal("No result was prepared!");
    }
    $self->check_connection;
    unless ($result) {
      $debugger->fatal("Connection reset...");
    }

    unless ($result->execute) {
      $debugger->info("Just left threadsafe::execute.  Returning FALSE
+",
              "threadsafe::execute");
      return 0;
    }

    $debugger->info("Just left threadsafe::execute.  Returning TRUE.",
            "threadsafe::execute");
    return 1;
  }

  sub fetchrow_hashref {
    my $self = shift (@_);
    my $conn = $self->{conn};
    my $debugger = $self->{debugger};
    $debugger->info("Just entered threadsafe::fetchrow_hashref",
            "threadsafe::fetchrow_hashref");
    unless ($self->{result}) {
      $debugger->fatal_error("There was no result to fetch.");
    }

    $self->check_connection;

    unless ($self->{result}) {
      $debugger->fatal_error("Connection was reset.");
    }

    my $result = $self->{result};

    $debugger->info("Just left threadsafe::fetchrow_hashref",
            "threadsafe::fetchrow_hashref");
    return $self->fetchrow_hashref;
  }

  ########################
  # END PUBLIC FUNCTIONS #
  ########################

  #######################
  # PROTECTED FUNCTIONS #
  #######################

  sub check_connection {
    my $self = shift (@_);
    my $debugger = $self->{debugger};
    $debugger->info("Just entered threadsafe::check_connection",
            "threadsafe::");

    if ((not (defined ($self->{conn}))) or (not ($$ == $self->{pid})))
+ {
      $self->{pid} = $$;
      my $conn_string = "DBI:mysql:database=$self->{database};host=$se
+lf->{hostname};";
      if ($self->{port}) {
    $conn_string .= "port=$self->{port};";
      }
      $self->{conn} = DBI->connect($conn_string, $self->{username}, $s
+elf->{password})
    or $debugger->fatal_error("Could not connect to the database becau
+se: $!");
      $self->{result} = undef;
      $self->{query} = undef;
    }

    $debugger->info("Just left threadsafe::check_connection",
            "threadsafe::");
  }

  ###########################
  # END PROTECTED FUNCTIONS #
  ###########################

  1;
}

# The following is the debug_skeleton.pm file I use for
# debugging:

#! /usr/bin/perl

##############
# BEGIN POD: #
##############

############
# END POD: #
############

###############
# BEGIN CODE: #
###############

package debug_skeleton;
{
  sub create {
    my $class = shift (@_);
    my $debug_logfile = "./debug_skeleton.log";
    my $debug_info = 1;
    my $debug_warning = 1;
    my $debug_error = 1;
    my $show_info = {};
    my $all_info = 1;

    while (my $option = shift (@_)) {
      if ($option eq 'LOG') {
    $debug_logfile = shift (@_);
      }
      elsif ($option eq 'INFO') {
    $debug_info = shift (@_);
      }
      elsif ($option eq 'WARNING') {
    $debug_warning = shift (@_);
      }
      elsif ($option eq 'ERROR') {
    $debug_error = shift (@_);
      }
      elsif ($option eq 'SHOWINFO') {
    $show_info->{shift (@_)} = 1;
      }
      elsif ($option eq 'ALLINFO') {
    $all_info = shift (@_);
      }
      else {
    die ("Not a valid option: $option");
      }
    }

    my $self = {
        logfile => $debug_logfile,
        log_open => 0,
        log_handle => undef,
        info => $debug_info,
        warning => $debug_warning,
        error => $debug_error,
        show_info => $show_info,
        all_info => $all_info,
        nologging => 0,
           };
    bless $self, $class;
  }

  sub info {
    my $self = shift (@_);
    my $message = shift (@_);
    my $class = shift (@_);

    unless ($self->{nologging}) {
      if ($self->{all_info} or $self->{show_info}->{$class}) {
    if ($self->{info}) {
      $self->printInfo ($message);
    }
    else {
      $self->logInfo ($message);
    }
      }
    }
  }

  sub warning {
    my $self = shift (@_);
    my $message = shift (@_);

    unless ($self->{nologging}) {
      if ($self->{warning}) {
    $self->printWarning ($message);
      }
      else {
    $self->logWarning ($message);
      }
    }
  }

  sub error {
    my $self = shift (@_);
    my $message = shift (@_);

    unless ($self->{nologging}) {
      if ($self->{error}) {
    $self->printError ($message);
      }
      else {
    $self->logError ($message);
      }
    }
  }

  sub fatal_error {
    my $self = shift (@_);
    my $message = shift (@_);
    my $exit_code = shift (@_);
    if (not ($exit_code)) {
      $exit_code = 1;
    }

    unless ($self->{nologging}) {
      if ($self->{error}) {
    $self->printError ($message);
      }
      else {
    $self->logError ($message);
      }
      exit ($exit_code);
    }
  }


  sub printInfo ($) {
    my $self = shift (@_);
    my $message = shift (@_);
    print STDERR "INFO: ", $message, "\n";
  }

  sub logInfo ($) {
    my $self = shift (@_);
    my $message = shift (@_);
    my $fh = $self->checkLog;
    print $fh "INFO: ", $message, "\n";
  }

  sub printWarning ($) {
    my $self = shift (@_);
    my $message = shift (@_);
    print STDERR "WARNING: ", $message, "\n";
  }

  sub logWarning ($) {
    my $self = shift (@_);
    my $message = shift (@_);
    my $fh = $self->checkLog;
    print $fh "WARNING: ", $message, "\n";
  }

  sub printError ($) {
    my $self = shift (@_);
    my $message = shift (@_);
    print STDERR "ERROR: ", $message, "\n";
  }

  sub logError ($) {
    my $self = shift (@_);
    my $message = shift (@_);
    my $fh = $self->checkLog;
    print $fh "ERROR: ", $message, "\n";
  }


  sub checkLog () {
    my $self = shift (@_);
    unless ($self->{log_open}) {
      my $fh;
      open ($fh, "> $self->{logfile}")
    or die ("Cannot open the log file because $!");
      $self->{log_open} = 1;
      $self->{log_handle} = $fh;
      return $fh;
    }
    else {
      return $self->{log_handle};
    }
  }


  1;
}

############
# END CODE #
############
Replies are listed 'Best First'.
Re: Thread and Fork Safe DBI and DBD::mysql replacement
by simonm (Vicar) on Feb 06, 2004 at 22:47 UTC

    I have a few quick comments:

    It's not clear how this is to be used. Is it a DBD class, which can be accessed from within the normal DBI interface, a wrapper around an existing DBI handle, or is it a parallel mechanism that should be used separately?

    Also, the name is much too general. Consider naming it DBD::MySQL::Threadsafe if it is a legitimate DBD subclass, or DBIx::ThreadSafe::MySQL if it's a wrapper.

      It's a wrapper. I'm updating and putting in a POD now. I'm also going to add an example of when this fails -- it seems mySQL clients < 4.00 are thread safe.

      Update: Thanks gmax for pointing out why DBD::mysql clients can't fork for mysql version 4+, this node. For what it's worth, code to replicate this problem can be used here:

      #! /usr/bin/perl use strict; use warnings; use DBI; my $conn = DBI->connect("DBI:mysql:database=db;host=localhost;", "user +", "password"); my $pid = fork; die ("No more resources") unless (defined $pid); my $query = "SELECT * FROM table"; my $result = $conn->prepare($query) or die ("Cannot prepare query. $!"); $result->execute or die ("Cannot execute query. $!");

      Want to support the EFF and FSF buy buying cool stuff? Click here.
Re: Thread and Fork Safe DBI and DBD::mysql replacement
by jZed (Prior) on Feb 06, 2004 at 23:25 UTC
    You might want to forward your code to the dbi-dev@perl.org mailing list to see what other DBD think of it.