CREATE TABLE cloverleaf_ingestion(
pk NUMBER(20),
File_name VARCHAR2(200),
Cl_file_size_in_bytes NUMBER(22),
Cl_checksum VARCHAR2(32),
CL_created_date_time DATE,
Status VARCHAR2(20),
Mn_file_size_in_bytes NUMBER(22),
Mn_checksum VARCHAR2(32),
Failed_retries NUMBER(11),
Processed_date_time DATE,
ftp_acknowledgement VARCHAR2(20),
ohf_processing_status VARCHAR2(20),
ohf_processing_datetime DATE,
ohf_total_rows NUMBER(22),
ohf_failed_rows NUMBER(22),
ohf_processed_rows NUMBER(22),
hdfs_processing_status VARCHAR2(20),
hdfs_processing_datetime DATE,
hdfs_checksum VARCHAR2(60),
hdfs_file_size_in_bytes NUMBER(22),
CONSTRAINT cloverleaf_ingestion_pk PRIMARY KEY (pk)
);
####
sub getOracleRowHahs{
my $odbh = DBI->connect($oracledsn, $oracleuser, $oraclepass) || die "Can't connect to oracle db:$DBI::errstr\n";
my $osth = $odbh->prepare("select * from emp");
$osth->execute();
my $tableref;
while(my $row = $osth->fetchrow_hashref()){
no warnings;
$tableref->{$row->{'PK'}} = $row;
}
$odbh->disconnect();
return $tableref;
}
##
##
my $sdbh = DBI->connect($sqldsn, $sqluser, $sqlpass) || die "Can't connect to mysql db:$DBI::errstr\n";
my $odbh = DBI->connect($oracledsn, $oracleuser, $oraclepass) || die "Can't connect to oracle db:$DBI::errstr\n";
my $oracleTableHash = &getOracleRowHahs();
no warnings;
my $query = "select * from $table";
my $ssth = $sdbh->prepare($query) || die "Error in prepare to mysql: $DBI::errstr\n";;
$ssth->execute() || die "Error in execute to mysql: $DBI::errstr\n";;
while(my @row = $ssth->fetchrow_array()){
# say join("\t",@row);
if(!(exists($oracleTableHash->{$row[0]}))){
my $columns = join(",",@row);
say $columns;
my $insertquery = "insert into $table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
#say $insertquery;
#exit();
my $osth = $odbh->prepare($insertquery) || die "Error in prepare: $DBI::errstr";
$osth->execute(@row);
}
}
$sdbh->disconnect() || die "Error in disconnect to mysql: $DBI::errstr\n";
$odbh->disconnect() || die "Error in disconnect to oracle: $DBI::errstr\n";