Corion, that is a great example for demonstrating an input iterator with MCE. A global dbh variable is not necessary. Below, the db_iter closure is called by the manager process each time a worker requests the next input item.
Pretty much everything is wrapped inside db_iter for db apps. The closure simply returns the next row.
use MCE::Loop chunk_size => 1, max_workers => 4;
use DBI;
sub db_iter {
my $dbh = DBI->connect("dbi:SQLite:dbname=sample.db", "", "", {
PrintError => 0, RaiseError => 1, AutoCommit => 1,
FetchHashKeyName => 'NAME_lc',
});
my $sth = $dbh->prepare("select fname, lname from people");
$sth->execute();
return sub {
if (my @row = $sth->fetchrow_array) {
return @row;
}
return;
}
}
mce_loop {
my ($mce, $chunk_ref, $chunk_id) = @_;
my ($fname, $lname) = @{ $chunk_ref };
MCE->say("Hello, $fname $lname");
} db_iter();
See https://metacpan.org/pod/MCE::Core#SYNTAX-for-INPUT_DATA if chunking is desired. The db_iter example, on the page, is written using the core API. Below same example using MCE::Loop instead.
use MCE::Loop;
use DBI;
sub db_iter {
my $dsn = "DBI:Oracle:host=db_server;port=db_port;sid=db_name";
my $dbh = DBI->connect($dsn, 'db_user', 'db_passwd') ||
die "Could not connect to database: $DBI::errstr";
my $sth = $dbh->prepare('select color, desc from table');
$sth->execute();
return sub {
my $chunk_size = shift;
if (my $aref = $sth->fetchall_arrayref(undef, $chunk_size)) {
return @{ $aref };
}
return;
}
}
## Let's enumerate column indexes for easy column retrieval.
my ($i_color, $i_desc) = (0 .. 1);
MCE::Loop::init {
max_workers => 3, chunk_size => 1000,
input_data => db_iter(),
};
mce_loop {
my ($mce, $chunk_ref, $chunk_id) = @_;
my $ret = '';
foreach my $row (@{ $chunk_ref }) {
$ret .= $row->[$i_color] .": ". $row->[$i_desc] ."\n";
}
MCE->print($ret);
};
MCE::Loop::finish;
But one thing is missing from the 2 examples above. Perhaps, workers also need to communicate with the DB. The user_begin and user_end options are where workers obtain a db connection and disconnect. The dbh handle is stored in the MCE hash for later retrieval by the loop block. Thus, the db connection is obtained once.
MCE::Loop::init {
max_workers => 3, chunk_size => 1000,
input_data => db_iter(),
user_begin => sub {
my ($mce) = @_;
my $dsn = "DBI:Oracle:host=db_server;port=db_port;sid=db_name";
$mce->{dbh} = DBI->connect($dsn, 'db_user', 'db_passwd') ||
die "Could not connect to database: $DBI::errstr";
},
user_end => sub {
my ($mce) = @_;
$mce->{dbh}->disconnect;
},
};
mce_loop {
my ($mce, $chunk_ref, $chunk_id) = @_;
my $dbh = $mce->{dbh}; my $ret = '';
foreach my $row (@{ $chunk_ref }) {
$ret .= $row->[$i_color] .": ". $row->[$i_desc] ."\n";
}
MCE->print($ret);
};
|