The final solution was to develop a module, QueryQueue. The source is below (minus the POD, which of course is there in the final product). Feel free to comment, as always.
Example uses...
my $queue = QueryQueue->instance('scott','tiger','server');
...
my $sql = qq{
SELECT available
FROM wms_rf_server_port
WHERE server_id = ?
AND device_name = ?
};
my $queue = QueryQueue->instance();
my $results = $queue->submit_job($sql, $server_id, $device);
...
my $sql = qq{
UPDATE wms_rf_server_port
SET available = ?,
last_updated_by = ?,
last_updated_date = ?
WHERE port_id IN ( $port_ids )
};
my @args = (
$available,
$0,
_timestamp(),
);
my $queue = QueryQueue->instance();
$queue->submit_job($sql, @args);
SOURCE:
package QueryQueue;
use 5.008005;
use strict;
use warnings;
require Exporter;
use base 'Class::Singleton';
use Log::Log4perl;
use Lingua::EN::Numbers::Ordinate;
use DBI;
our @ISA = qw(Exporter Class::Singleton);
# Items to export into callers namespace by default. Note: do not expo
+rt
# names by default without a very good reason. Use EXPORT_OK instead.
# Do not simply export all your public functions/methods/constants.
our @EXPORT_OK = qw(
instance
submit_job
);
our $VERSION = '0.01';
our @queue;
our $dsn;
our $dbuser;
our $dbpass;
=head2 _new_instance
Creates a new QueryQueue singleton object - only for first call of ins
+tance()
=cut
sub _new_instance {
my $class = shift;
($dsn, $dbuser, $dbpass) = @_;
my $self = bless { }, $class;
my $logger = Log::Log4perl->get_logger('QueryQueue');
if( $dsn && $dbuser && $dbpass ) {
$logger->debug("Created a new QueryQueue singleton object");
return $self;
}
$logger->error("Missing required parameters");
return undef;
}
=head2 _dequeue
Processes the queue
=cut
sub _dequeue {
my $class = shift;
my $logger = Log::Log4perl->get_logger('QueryQueue');
my ($dbh, $sth);
my $results_ref = undef;
while (@queue) { # while I have things in the queue
$queue[0][0]++; # increment the number of tries
my ($tries, $sql, @args) = @{ $queue[0] };
eval {
$logger->debug('Connecting (cached) to database ', $dsn, '
+ as ', $dbuser);
$dbh = DBI->connect_cached($dsn, $dbuser, $dbpass,
{ AutoCommit => 0, RaiseError => 1, PrintError => 0
+, },
);
$dbh->{LongReadLen} = 1024 * 1024 * 32; # read up to 32MB
$dbh->{LongTruncOk} = 0; # throw exception instead of trun
+cating
$logger->debug('Preparing statement.');
$sth = $dbh->prepare_cached($sql);
$logger->debug(ordinate($tries), ' try to execute ', _sqls
+tr($sql, @args));
$sth->execute(@args);
if( $sth->{NUM_OF_FIELDS} ) {
$results_ref = $sth->fetchall_arrayref();
}
$dbh->commit;
};
if ($@) {
$logger->error('Error executing statement: ', $@);
if(! $dbh->{Active}) {
$logger->warn('Not connected to database ', $dsn);
sleep 5;
}
else {
$dbh->rollback;
$logger->warn('*** Discarding the statement');
shift @queue;
return undef;
}
}
else {
$logger->debug('Successully executed statement - ', $sth->
+rows,
' rows affected.');
shift @queue;
}
if( $#queue >= 0 ) {
$logger->debug('Still have ', $#queue + 1, ' items in the
+queue');
}
else {
$logger->debug('The queue is empty.');
}
}
return $results_ref;
}
=head2 submit_job
Submit a job to the queue
=cut
sub submit_job {
my $class = shift;
my ($sql, @args) = @_;
my $logger = Log::Log4perl->get_logger('QueryQueue');
$logger->debug('Enqueuing ', _sqlstr($sql, @args));
push @queue, [ 0, $sql, @args ];
_dequeue();
}
=head2 _sqlstr
Formats a sql string with its args for logging
=cut
sub _sqlstr {
my ($sql, @args) = @_;
my $argstr = join ', ', @args;
return "\"$sql ($argstr)\"";
}
1;
|