in reply to Re: Sqlite: Threads and inserts into a different database for each one. (1 thread fast, >1 slow)
in thread Sqlite: Threads and inserts into a different database for each one. (1 thread fast, >1 slow)
i have some treatments to make on a particular table (Mysql) with some millions of records.
i have the need to treat the data by bloc defined by the composed index -> (v_ma_id , v_mo_id) in one shot.
i have enough memory to have all the table in memory and don't know in advance which blocs will be present in the table (that's why i fetch all the data in one shot).
Now with the full code for the exemple, you will see than i use one thread by bloc, and each thread create his own database and insert the data every 50000 records with a transaction
I've join a sample of data to let you see which kind of data that i need to insert.
The full sample has a size of 4mb (~87700 lines / 38 Blocs) and is retrieve from Mysql and insert in 14s with 1 thread. With 10 threads: 33seconds etc..
Thanks for your help because this problem really make my crazy :/id c_id v_ma_id v_mo_id carr_id carbu_id ref_id p + k y m s_id type co_id dpt p_c 4 146843 92 345 13 2 65204 21900 13600 01 + 4 4 1 1 88 5 11 3941 92 335 2 2 64595 16900 19900 01 +6 4 1 2 59 5 52 108636 92 933 7 1 45978 13990 58678 00 + 4 4 1 2 38 3 73 4351 92 340 2 2 55367 24500 2 0 4 + 1 2 75 5 85 51378 92 335 2 2 43991 15900 12155 01 + 10 4 1 2 35 5 102 156539 92 340 2 2 45093 11200 115000 00 + 11 4 1 9 45 5 114 4495 92 345 13 2 65205 24990 11106 01 + 4 4 1 1 0 5 157 0 92 342 2 1 45639 4500 109700 00 0 + 4 0 4 57 5 174 3946 92 344 5 2 76868 50990 8000 01 +8 4 1 1 62 5 198 4371 92 335 2 2 43731 20490 15300 01 + 2 4 1 1 93 5 280 62391 92 331 13 2 64409 17900 15895 01 + 4 4 1 1 69 0 303 4422 92 343 13 2 53162 34500 11500 01 + 7 4 1 2 03 5 317 4454 92 369 14 2 59802 33990 13900 01 + 4 4 1 1 59 5 323 153801 92 930 9 2 43537 23390 36000 01 + 2 4 1 9 57 3 336 96704 92 336 2 14 82046 27900 100 01 + 9 4 1 2 76 5 337 148468 92 342 2 2 45882 4990 87000 00 + 10 4 1 1 13 5 402 993 92 342 2 1 45627 11900 15062 01 +9 4 1 1 92 5 456 4460 92 335 2 1 43841 29995 29242 01 + 11 4 1 2 03 5 503 4454 92 335 2 2 102205 23990 7500 01 + 9 4 1 3 59 5 538 143329 92 345 13 2 46421 11900 95000 01 + 3 4 1 1 27 5 562 62391 92 335 2 2 43981 17900 19501 01 + 10 4 1 3 69 5 563 133628 92 341 2 2 45470 15999 174636 00 + 1 4 1 2 76 5 564 52348 92 3136 7 2 64478 28500 19000 01 + 1 4 1 2 80 5 582 4414 92 342 2 2 45860 13990 15267 01 + 10 4 1 1 39 5 613 4422 92 342 2 1 45719 11450 20877 00 + 10 4 1 1 03 5 684 4381 92 335 2 2 43981 18490 14842 01 + 11 4 1 7 03 5 701 9948 92 342 2 2 45860 10690 87200 01 + 3 4 1 12 67 5 744 3941 92 335 2 2 102205 24680 100 01 +10 4 1 1 59 5 762 4371 92 933 7 2 82083 27100 2773 01 +10 4 1 2 93 3 778 52348 92 335 2 2 44274 23500 14000 01 + 3 4 1 1 80 5 794 96704 92 3120 2 1 0 14800 100 01 10 + 4 1 4 76 0 795 51731 92 341 2 2 45473 29900 51702 00 + 0 4 1 7 67 5 813 4394 92 3120 2 1 65236 11490 11334 01 + 3 4 1 2 03 3 848 24620 92 335 2 2 55291 15990 36000 01 + 7 4 1 7 02 5 861 41708 92 345 13 2 46421 11900 92977 00 + 7 4 1 1 92 5 900 51865 92 345 13 2 0 7300 171000 00 6 + 4 1 1 39 5 907 106653 92 345 13 2 46417 7495 150000 00 + 8 4 1 7 94 5 917 144149 92 1180 22 2 43524 10790 106005 +01 2 4 1 4 91 0 922 5658 92 933 7 2 45965 25990 15700 01 + 9 4 1 3 03 3 924 137672 92 660 5 2 46197 18490 127700 01 + 1 4 1 3 78 5 925 153907 92 340 2 2 53038 16990 55000 01 + 7 4 1 1 91 5
#!/usr/bin/perl use threads; use Thread::Queue qw( ); use threads::shared; use locale; use strict; use warnings; use DBI; use Data::Dumper; #Multithread our $THREADS = 1; my $where_query = qq{v_ma_id in (92)}; my $db_host = $ENV{host}; my $db_login = $ENV{user}; my $db_password = $ENV{password}; my $country='FR'; my $db_infos='XXXXX'; my $db_data='AAAAAA'; my $table_a='TABLE_A'; my $table_c='TABLE_C'; my $table_d='TABLE_D'; my $table_co='TABLE_CO'; my $table_a_full = "${db_data}.${table_a}"; my $table_c_full = "${db_infos}.${table_c}"; my $table_d_full = "${db_data}.${table_d}"; my $table_co_full = "VO.$table_co"; my $dbfile ='/dev/shm/test.sqlite'; my $table_a_sqlite ='shm_' . $table_a . '_' . $$; my $table_db_sqlite ='shm_doublons_' . $table_a . '_' . $$; my $table_dpt_sqlite ='shm_dpt_' . $table_a . '_' . $$; # Init connection db my $mysql_dbh = mysql_connect(); # Get Data my ($data,$data_ref,$uniq_values) = select_data(\$mysql_dbh,\$table_a_ +full,\$where_query); my ($data_g_id) = select_g_id(\$mysql_dbh,\$table_c_full); my ($data_co_id) = select_co_id(\$mysql_dbh,\$table_co_full); # Start multi threads my @array_share :shared =(); sub worker { my $tid = threads->tid; my( $Qwork, $Qresults ) = @_; while( my $item = $Qwork->dequeue ) { my $result; my ($M,$m) = @{$item}; my $data2process = shift @array_share; my $dbfile_connect = qq{${dbfile}_${M}_${m}}; my $sqlite_dbh = sqlite_connect($dbfile_connect,''); create_tmp_table(\$sqlite_dbh,\$table_a_sqlite,\$table_db_sqlite,\ +%{$uniq_values}); insert_annonces_tmp_table(\$sqlite_dbh,\$table_a_sqlite,\$table_db +_sqlite,\@{$data2process},'',''); #### SOME WORK ON THE SQLITE DATA #### $sqlite_dbh->disconnect(); } } my $Qwork = new Thread::Queue; ## Create the pool of workers my @pool = map{ threads->create( \&worker, $Qwork) } 1 .. $THREADS; # Top id sorted by values foreach my $M_m (sort { $uniq_values->{v_mo_id_top}{$b} <=> $uniq_valu +es->{v_mo_id_top}{$a} } keys(%{$uniq_values->{v_mo_id_top}})) { my ($M,$m) = split('_',$M_m); my $clone = shared_clone(\@{$data_ref->{$M}->{$m}}); push(@array_share,$clone); $Qwork->enqueue([$M,$m]); } # EOF loop ### Tell the workers there are no more work items $Qwork->enqueue( (undef) x $THREADS ); #### Clean up the threads $_->join for @pool; exit; sub create_tmp_table { my ($sqlite_dbh,$table_a_sqlite,$table_db_sqlite,$uniq_values,$M,$ +m) = @_; my $query=qq| CREATE TABLE $$table_a_sqlite ( id INTEGER NOT NULL PRIMARY KEY, c_id INTEGER NOT NULL, g_c_id INTEGER NOT NULL, v_ma_id INTEGER NOT NULL, v_mo_id INTEGER NOT NULL, carr_id INTEGER NOT NULL, carbu_id INTEGER NOT NULL, ref_id INTEGER NOT NULL, p INTEGER NOT NULL, k INTEGER NOT NULL, y INTEGER NOT NULL, m INTEGER NOT NULL, s_id INTEGER NOT NULL, co_id INTEGER NOT NULL, type INTEGER NOT NULL, age INTEGER NOT NULL DEFAULT 0, dpt TEXT NOT NULL DEFAULT 0, p_c TEXT NOT NULL DEFAULT 0 ) |; my $query2=qq{ CREATE TABLE $$table_db_sqlite ( id INTEGER NOT NULL PRIMARY KEY, doublon_id INTEGER NOT NULL, query INTEGER NOT NULL DEFAULT 666 ) }; $$sqlite_dbh->do($query) or die($!); $$sqlite_dbh->do($query2) or die($!); } sub insert_annonces_tmp_table { my ($sqlite_dbh,$table_a_sqlite,$table_db_sqlite,$result_array,$re +sult_array_concessions,$result_code_co) = @_; my %g_c_id; my %code_co; my $query = qq( insert into $$table_a_sqlite ( id, c_id, g_c_id, v_ma_id, v_mo_id, carr_id, carbu_id, ref_id, p, k, y, m, s_id, co_id, type, dpt, p_c ) values ( ? , ? , ? , ? , ? , ? , ? , ? , + ? , ? , ? , ? , ? , ? , ?, ?, ?) ); my $query2 = qq( insert into $$table_db_sqlite (id,doublon_id ) values (?,?); ); my $inserted = 0; my $max_commit = 50000; $$sqlite_dbh->do('BEGIN TRANSACTION'); $$sqlite_dbh->do('PRAGMA ignore_check_constraints = 1'); my $sqlite_sth = $$sqlite_dbh->prepare($query) or die ($!); my $sqlite_sth2 = $$sqlite_dbh->prepare($query2) or die ($!); foreach my $records (@{$result_array}) { $sqlite_sth->bind_param(1,$records->{id}); $sqlite_sth->bind_param(2,$records->{c_id}); $sqlite_sth->bind_param(3,$records->{c_id}); $sqlite_sth->bind_param(4,$records->{v_ma_id}); $sqlite_sth->bind_param(5,$records->{v_mo_id}); $sqlite_sth->bind_param(6,$records->{carr_id}); $sqlite_sth->bind_param(7,$records->{carbu_id}); $sqlite_sth->bind_param(8,$records->{ref_id}); $sqlite_sth->bind_param(9,$records->{p}); $sqlite_sth->bind_param(10,$records->{k}); $sqlite_sth->bind_param(11,$records->{y}); $sqlite_sth->bind_param(12,$records->{m}); $sqlite_sth->bind_param(13,$records->{s_id}); $sqlite_sth->bind_param(14,0); $sqlite_sth->bind_param(15,$records->{type}); $sqlite_sth->bind_param(16,"0"); $sqlite_sth->bind_param(17,$records->{p_c}); $inserted += $sqlite_sth->execute() or die ($!); $sqlite_sth2->bind_param(1,$records->{id}); $sqlite_sth2->bind_param(2,$records->{id}); $inserted += $sqlite_sth2->execute() or die ($!); if (($inserted % $max_commit) == 0) { $$sqlite_dbh->do('commit'); $$sqlite_dbh->do('begin'); } } $$sqlite_dbh->do('COMMIT'); } sub select_data { my ($mysql_dbh,$table,$where) = @_; my $query=qq( select id, c_id, v_ma_id, v_mo_id, carr_id, carbu_id, ref_id, p, k, y, m, s_id, if(SUBSTRING(type,1,3)='pro',1,0) as type, co_id, (CASE when dpt = '' then 0 when dpt = '-' then 0 else dpt END) as dpt, (CASE when p_c in (4,5) then 5 when p_c in (2,3) then 3 else 0 END) as p_c from $$table TA ); $query .= 'where '.$$where if $$where ne ''; my $mysql_sth=$$mysql_dbh->prepare($query); $mysql_sth->execute(); my $data = $mysql_sth->fetchall_arrayref({}); $mysql_sth->finish(); my %uniq_values; my %data_ref; foreach my $records (@{$data}) { push(@{$data_ref{${records}->{'v_ma_id'}}{${records}->{'v_mo_i +d'}}} , \%{$records}); my $tmp = $records->{'v_ma_id'} . '_' . $records->{'v_mo_id'}; $uniq_values{v_mo_id_top}{$tmp}++; } return (\@{$data},\%data_ref,\%uniq_values); } sub sqlite_connect { my ($dbfile) = @_; my $sqlite_dbh; $sqlite_dbh = DBI->connect("dbi:SQLite:dbname=${dbfile}", "", "", +{ RaiseError => 1 , FetchHashKeyName => "NAME_lc" , AutoCommit => 1 } +) or die $DBI::errstr; return $sqlite_dbh; } sub mysql_connect { my $dsn = "dbi:mysql:dbname=$db_infos;host=$db_host"; my $mysql_dbh = DBI->connect( $dsn,$db_login,$db_password , {Raise +Error=>1 , FetchHashKeyName => "NAME_lc" }) or die "Can't connect on +$db_infos !"; return $mysql_dbh; }
|
|---|