my $dbh = MCE::Shared->share({ module => 'DBI', new => 'connect' }, $dsn, $user, $password, $params );
####
create table mytable(
field1 integer,
field2 varchar(24),
field3 varchar(24),
field4 varchar(24),
field5 varchar(24)
);
##
##
use strict;
use warnings;
use Data::GUID;
use DBD::Pg;
use SQL::Abstract;
use Tie::Cycle;
use MCE::Loop max_workers => 4;
use MCE::Shared;
my $sqla = SQL::Abstract->new;
my @cols = map {"field$_"} 1..5;
# https://www.cattlegrid.info/2006/06/13/write-no-more-sql-abstract-it.html
my $ins_sql = $sqla->insert('mytable', { map { $_ => '' } @cols });
my $sel_sql = $sqla->select('mytable', 'count(*)', { field2 => '' });
my $upd_sql = $sqla->update('mytable', { field2 => '' }, { field2 => '' });
#--------------------------------------------------------------------#
package My::DBI {
use DBI;
sub new {
my ( $class, $dsn, $user, $password, $params ) = @_;
my $self = {};
# MCE::Shared will emit the error and exit if fail to connect
$self->{DBH} = DBI->connect($dsn, $user, $password, $params);
$self->{STMT} = {};
bless $self, $class;
}
sub prepare_cached {
my ( $self, $key, $sql ) = @_;
$self->{STMT}{$key} = $self->{DBH}->prepare_cached($sql);
1;
}
sub do {
my $self = shift;
$self->{DBH}->do(@_);
}
sub execute {
my ( $self, $key ) = ( shift, shift );
if ( my $stmt = $self->{STMT}{$key} ) {
$stmt->execute(@_);
}
}
sub fetchrow_array {
my ( $self, $key ) = ( shift, shift );
if ( my $stmt = $self->{STMT}{$key} ) {
$stmt->execute(@_);
$stmt->fetchrow_array;
}
}
sub finish {
my ( $self, $key ) = @_;
if ( $key ) {
$self->{STMT}{$key}->finish if $self->{STMT}{$key};
} else {
$self->{STMT}{$_}->finish for keys %{ $self->{STMT} };
}
return 1;
}
sub disconnect {
my ( $self ) = @_;
$self->finish;
$self->{DBH}->disconnect;
1;
}
};
#--------------------------------------------------------------------#
my $dsn = 'DBI:Pg:dbname=test_db;host=localhost;port=5432';
my $sdb = MCE::Shared->share(
{ module => 'My::DBI' }, $dsn, $ENV{USER}, undef, {
AutoCommit => 1,
RaiseError => 1,
PrintError => 1
},
);
$sdb->prepare_cached('ins_sth', $ins_sql);
$sdb->prepare_cached('sel_sth', $sel_sql);
$sdb->prepare_cached('upd_sth', $upd_sql);
mce_loop {
my ($mce, $chunk, $chunk_id) = @_;
for my $record( @{$chunk} ) {
$sdb->execute('ins_sth', @{$record});
my $field2_old = $record->[1];
my $field2_new1 = Data::GUID->new->as_base64;
my $field2_new2 = Data::GUID->new->as_base64;
# update using a prepared statement
$sdb->execute('upd_sth', $field2_new1, $field2_old);
# update using the dbh handle inside the shared class
my ($query, @bind) = $sqla->update(
'mytable',
{ field2 => $field2_new2 },
{ field2 => $field2_new1 },
);
$sdb->do($query, undef, @bind);
# pass any arguments for execute inside the shared class
my ($count) = $sdb->fetchrow_array('sel_sth', $field2_new2);
# count is 1 due to selecting field2 = $field2_new2
my $msg = sprintf 'wid %s; chnk %s; ins %s; cnt %s',
$mce->wid, $chunk_id, $record->[0], $count;
MCE->say($msg);
}
} get_sample_data();
# ^^ do not pass @{ get_sample_data() } to mce_loop
# it will not work if @{ [ has 1 element ] }
# pass the array ref instead, MCE accepts it
MCE::Loop->finish;
$sdb->disconnect;
#--------------------------------------------------------------------#
sub get_sample_data {
tie my $value1, 'Tie::Cycle', [ 40 .. 49 ];
return [ map {
[ $value1, map { Data::GUID->new->as_base64 } 0..3]
} 1..1000 ];
}