my $queue = QueryQueue->instance('scott','tiger','server'); ... my $sql = qq{ SELECT available FROM wms_rf_server_port WHERE server_id = ? AND device_name = ? }; my $queue = QueryQueue->instance(); my $results = $queue->submit_job($sql, $server_id, $device); ... my $sql = qq{ UPDATE wms_rf_server_port SET available = ?, last_updated_by = ?, last_updated_date = ? WHERE port_id IN ( $port_ids ) }; my @args = ( $available, $0, _timestamp(), ); my $queue = QueryQueue->instance(); $queue->submit_job($sql, @args); SOURCE: package QueryQueue; use 5.008005; use strict; use warnings; require Exporter; use base 'Class::Singleton'; use Log::Log4perl; use Lingua::EN::Numbers::Ordinate; use DBI; our @ISA = qw(Exporter Class::Singleton); # Items to export into callers namespace by default. Note: do not export # names by default without a very good reason. Use EXPORT_OK instead. # Do not simply export all your public functions/methods/constants. our @EXPORT_OK = qw( instance submit_job ); our $VERSION = '0.01'; our @queue; our $dsn; our $dbuser; our $dbpass; =head2 _new_instance Creates a new QueryQueue singleton object - only for first call of instance() =cut sub _new_instance { my $class = shift; ($dsn, $dbuser, $dbpass) = @_; my $self = bless { }, $class; my $logger = Log::Log4perl->get_logger('QueryQueue'); if( $dsn && $dbuser && $dbpass ) { $logger->debug("Created a new QueryQueue singleton object"); return $self; } $logger->error("Missing required parameters"); return undef; } =head2 _dequeue Processes the queue =cut sub _dequeue { my $class = shift; my $logger = Log::Log4perl->get_logger('QueryQueue'); my ($dbh, $sth); my $results_ref = undef; while (@queue) { # while I have things in the queue $queue[0][0]++; # increment the number of tries my ($tries, $sql, @args) = @{ $queue[0] }; eval { $logger->debug('Connecting (cached) to database ', $dsn, ' as ', $dbuser); $dbh = DBI->connect_cached($dsn, $dbuser, $dbpass, { AutoCommit => 0, RaiseError => 1, PrintError => 0, }, ); $dbh->{LongReadLen} = 1024 * 1024 * 32; # read up to 32MB $dbh->{LongTruncOk} = 0; # throw exception instead of truncating $logger->debug('Preparing statement.'); $sth = $dbh->prepare_cached($sql); $logger->debug(ordinate($tries), ' try to execute ', _sqlstr($sql, @args)); $sth->execute(@args); if( $sth->{NUM_OF_FIELDS} ) { $results_ref = $sth->fetchall_arrayref(); } $dbh->commit; }; if ($@) { $logger->error('Error executing statement: ', $@); if(! $dbh->{Active}) { $logger->warn('Not connected to database ', $dsn); sleep 5; } else { $dbh->rollback; $logger->warn('*** Discarding the statement'); shift @queue; return undef; } } else { $logger->debug('Successully executed statement - ', $sth->rows, ' rows affected.'); shift @queue; } if( $#queue >= 0 ) { $logger->debug('Still have ', $#queue + 1, ' items in the queue'); } else { $logger->debug('The queue is empty.'); } } return $results_ref; } =head2 submit_job Submit a job to the queue =cut sub submit_job { my $class = shift; my ($sql, @args) = @_; my $logger = Log::Log4perl->get_logger('QueryQueue'); $logger->debug('Enqueuing ', _sqlstr($sql, @args)); push @queue, [ 0, $sql, @args ]; _dequeue(); } =head2 _sqlstr Formats a sql string with its args for logging =cut sub _sqlstr { my ($sql, @args) = @_; my $argstr = join ', ', @args; return "\"$sql ($argstr)\""; } 1;