#!/usr/bin/perl use threads; use Thread::Queue qw( ); use threads::shared; use locale; use strict; use warnings; use DBI; use Data::Dumper; #Multithread our $THREADS = 1; my $where_query = qq{v_ma_id in (92)}; my $db_host = $ENV{host}; my $db_login = $ENV{user}; my $db_password = $ENV{password}; my $country='FR'; my $db_infos='XXXXX'; my $db_data='AAAAAA'; my $table_a='TABLE_A'; my $table_c='TABLE_C'; my $table_d='TABLE_D'; my $table_co='TABLE_CO'; my $table_a_full = "${db_data}.${table_a}"; my $table_c_full = "${db_infos}.${table_c}"; my $table_d_full = "${db_data}.${table_d}"; my $table_co_full = "VO.$table_co"; my $dbfile ='/dev/shm/test.sqlite'; my $table_a_sqlite ='shm_' . $table_a . '_' . $$; my $table_db_sqlite ='shm_doublons_' . $table_a . '_' . $$; my $table_dpt_sqlite ='shm_dpt_' . $table_a . '_' . $$; # Init connection db my $mysql_dbh = mysql_connect(); # Get Data my ($data,$data_ref,$uniq_values) = select_data(\$mysql_dbh,\$table_a_full,\$where_query); my ($data_g_id) = select_g_id(\$mysql_dbh,\$table_c_full); my ($data_co_id) = select_co_id(\$mysql_dbh,\$table_co_full); # Start multi threads my @array_share :shared =(); sub worker { my $tid = threads->tid; my( $Qwork, $Qresults ) = @_; while( my $item = $Qwork->dequeue ) { my $result; my ($M,$m) = @{$item}; my $data2process = shift @array_share; my $dbfile_connect = qq{${dbfile}_${M}_${m}}; my $sqlite_dbh = sqlite_connect($dbfile_connect,''); create_tmp_table(\$sqlite_dbh,\$table_a_sqlite,\$table_db_sqlite,\%{$uniq_values}); insert_annonces_tmp_table(\$sqlite_dbh,\$table_a_sqlite,\$table_db_sqlite,\@{$data2process},'',''); #### SOME WORK ON THE SQLITE DATA #### $sqlite_dbh->disconnect(); } } my $Qwork = new Thread::Queue; ## Create the pool of workers my @pool = map{ threads->create( \&worker, $Qwork) } 1 .. $THREADS; # Top id sorted by values foreach my $M_m (sort { $uniq_values->{v_mo_id_top}{$b} <=> $uniq_values->{v_mo_id_top}{$a} } keys(%{$uniq_values->{v_mo_id_top}})) { my ($M,$m) = split('_',$M_m); my $clone = shared_clone(\@{$data_ref->{$M}->{$m}}); push(@array_share,$clone); $Qwork->enqueue([$M,$m]); } # EOF loop ### Tell the workers there are no more work items $Qwork->enqueue( (undef) x $THREADS ); #### Clean up the threads $_->join for @pool; exit; sub create_tmp_table { my ($sqlite_dbh,$table_a_sqlite,$table_db_sqlite,$uniq_values,$M,$m) = @_; my $query=qq| CREATE TABLE $$table_a_sqlite ( id INTEGER NOT NULL PRIMARY KEY, c_id INTEGER NOT NULL, g_c_id INTEGER NOT NULL, v_ma_id INTEGER NOT NULL, v_mo_id INTEGER NOT NULL, carr_id INTEGER NOT NULL, carbu_id INTEGER NOT NULL, ref_id INTEGER NOT NULL, p INTEGER NOT NULL, k INTEGER NOT NULL, y INTEGER NOT NULL, m INTEGER NOT NULL, s_id INTEGER NOT NULL, co_id INTEGER NOT NULL, type INTEGER NOT NULL, age INTEGER NOT NULL DEFAULT 0, dpt TEXT NOT NULL DEFAULT 0, p_c TEXT NOT NULL DEFAULT 0 ) |; my $query2=qq{ CREATE TABLE $$table_db_sqlite ( id INTEGER NOT NULL PRIMARY KEY, doublon_id INTEGER NOT NULL, query INTEGER NOT NULL DEFAULT 666 ) }; $$sqlite_dbh->do($query) or die($!); $$sqlite_dbh->do($query2) or die($!); } sub insert_annonces_tmp_table { my ($sqlite_dbh,$table_a_sqlite,$table_db_sqlite,$result_array,$result_array_concessions,$result_code_co) = @_; my %g_c_id; my %code_co; my $query = qq( insert into $$table_a_sqlite ( id, c_id, g_c_id, v_ma_id, v_mo_id, carr_id, carbu_id, ref_id, p, k, y, m, s_id, co_id, type, dpt, p_c ) values ( ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ?, ?, ?) ); my $query2 = qq( insert into $$table_db_sqlite (id,doublon_id ) values (?,?); ); my $inserted = 0; my $max_commit = 50000; $$sqlite_dbh->do('BEGIN TRANSACTION'); $$sqlite_dbh->do('PRAGMA ignore_check_constraints = 1'); my $sqlite_sth = $$sqlite_dbh->prepare($query) or die ($!); my $sqlite_sth2 = $$sqlite_dbh->prepare($query2) or die ($!); foreach my $records (@{$result_array}) { $sqlite_sth->bind_param(1,$records->{id}); $sqlite_sth->bind_param(2,$records->{c_id}); $sqlite_sth->bind_param(3,$records->{c_id}); $sqlite_sth->bind_param(4,$records->{v_ma_id}); $sqlite_sth->bind_param(5,$records->{v_mo_id}); $sqlite_sth->bind_param(6,$records->{carr_id}); $sqlite_sth->bind_param(7,$records->{carbu_id}); $sqlite_sth->bind_param(8,$records->{ref_id}); $sqlite_sth->bind_param(9,$records->{p}); $sqlite_sth->bind_param(10,$records->{k}); $sqlite_sth->bind_param(11,$records->{y}); $sqlite_sth->bind_param(12,$records->{m}); $sqlite_sth->bind_param(13,$records->{s_id}); $sqlite_sth->bind_param(14,0); $sqlite_sth->bind_param(15,$records->{type}); $sqlite_sth->bind_param(16,"0"); $sqlite_sth->bind_param(17,$records->{p_c}); $inserted += $sqlite_sth->execute() or die ($!); $sqlite_sth2->bind_param(1,$records->{id}); $sqlite_sth2->bind_param(2,$records->{id}); $inserted += $sqlite_sth2->execute() or die ($!); if (($inserted % $max_commit) == 0) { $$sqlite_dbh->do('commit'); $$sqlite_dbh->do('begin'); } } $$sqlite_dbh->do('COMMIT'); } sub select_data { my ($mysql_dbh,$table,$where) = @_; my $query=qq( select id, c_id, v_ma_id, v_mo_id, carr_id, carbu_id, ref_id, p, k, y, m, s_id, if(SUBSTRING(type,1,3)='pro',1,0) as type, co_id, (CASE when dpt = '' then 0 when dpt = '-' then 0 else dpt END) as dpt, (CASE when p_c in (4,5) then 5 when p_c in (2,3) then 3 else 0 END) as p_c from $$table TA ); $query .= 'where '.$$where if $$where ne ''; my $mysql_sth=$$mysql_dbh->prepare($query); $mysql_sth->execute(); my $data = $mysql_sth->fetchall_arrayref({}); $mysql_sth->finish(); my %uniq_values; my %data_ref; foreach my $records (@{$data}) { push(@{$data_ref{${records}->{'v_ma_id'}}{${records}->{'v_mo_id'}}} , \%{$records}); my $tmp = $records->{'v_ma_id'} . '_' . $records->{'v_mo_id'}; $uniq_values{v_mo_id_top}{$tmp}++; } return (\@{$data},\%data_ref,\%uniq_values); } sub sqlite_connect { my ($dbfile) = @_; my $sqlite_dbh; $sqlite_dbh = DBI->connect("dbi:SQLite:dbname=${dbfile}", "", "", { RaiseError => 1 , FetchHashKeyName => "NAME_lc" , AutoCommit => 1 }) or die $DBI::errstr; return $sqlite_dbh; } sub mysql_connect { my $dsn = "dbi:mysql:dbname=$db_infos;host=$db_host"; my $mysql_dbh = DBI->connect( $dsn,$db_login,$db_password , {RaiseError=>1 , FetchHashKeyName => "NAME_lc" }) or die "Can't connect on $db_infos !"; return $mysql_dbh; }