sub process_thread() { my ($db_handler) = @_; return 0 if $process_thread_running; $process_thread_running = 1; warn Dumper(%db_ids_in_progress)."\n\n"; %grand_hash = (); $gcnt = 0; &simple_sql($db_handler,"UPDATE daemon_instances SET busy=TRUE,\"version\"=1.0 WHERE id=$daemon_id RETURNING 1;"); my $internal_counter_for_speed = 0; my $starti = [ Time::HiRes::gettimeofday( ) ]; while (keys( %db_ids_in_progress ) || @threads_in_use) { foreach my $id (keys %db_ids_in_progress) { last if (@threads_in_use >= 20); if ($total_counter == 1000000) { $total_counter = 0; } $total_counter++; my $threadname = 'thr'.$total_counter; $db_ids_in_progress{$id}{'to'} =~ s/([^a-zA-Z0-9]+)//g; $db_ids_in_progress{$id}{'msg'} =~ s/^\s+|\s+$//g; $db_ids_in_progress{$id}{'msg'} =~ s/\xe2\x80\x9c|\xe2\x80\x9d/\"/g; $db_ids_in_progress{$id}{'msg'} =~ s/\xe2\x80\x99|\xe2\x80\x98/\'/g; my ($processing) = &simple_sql($db_handler,"SELECT * FROM sp_route_message($db_ids_in_progress{$id}{'uid'},'$db_ids_in_progress{$id}{'to'}',\$a\$$db_ids_in_progress{$id}{'msg'}\$a\$,\$a\$$db_ids_in_progress{$id}{'dlr_track_id'}\$a\$,\$a\$$db_ids_in_progress{$id}{'from'}\$a\$);"); $running_threads{$threadname} = threads->create(\&send_sms_message, $db_ids_in_progress{$id}{'uid'}, $db_ids_in_progress{$id}{'from'}, $db_ids_in_progress{$id}{'to'}, $db_ids_in_progress{$id}{'msg'}, $id, $db_ids_in_progress{$id}{'dlr_track_id'}, $processing); $threads_in_progress++; push(@threads_in_use,$threadname); delete $db_ids_in_progress{$id}; } $name = shift(@threads_in_use); # GET THE OLDER THREAD if ($running_threads{$name}) { my %tr_res = %{$running_threads{$name}->join()}; # FINISH THREAD delete $running_threads{$name}; foreach my $key (keys %tr_res) { foreach my $internal_key (keys %{$tr_res{$key}}) { $gcnt++; if ($key eq 'delete') { $grand_hash{'del'}{$gcnt} = $tr_res{$key}{$internal_key}; } elsif ($key eq 'sqldo') { $grand_hash{'sqldo'}{$gcnt} = $tr_res{$key}{$internal_key}; } } } $internal_counter_for_speed++; } } foreach my $key (keys %{$grand_hash{'sqldo'}}) { warn "DEBUG:\&sqldo($db_handler,\"$grand_hash{'sqldo'}{$key}\");\n"; &simple_sql($db_handler,$grand_hash{'sqldo'}{$key}." RETURNING 1;"); } foreach my $key (keys %{$grand_hash{'del'}}) { warn "DEBUG:\&sqldo($db_handler,\"$grand_hash{'del'}{$key}\");\n"; &simple_sql($db_handler,$grand_hash{'del'}{$key}." RETURNING 1;"); } my $elapsef = Time::HiRes::tv_interval( $starti ); warn "\n=========================\nTotal $internal_counter_for_speed messages processed in $elapsef seconds\n"; &simple_sql($db_handler,"UPDATE daemon_instances SET busy=FALSE WHERE id=$daemon_id RETURNING 1;"); $process_thread_running = 0; %grand_hash = (); %db_ids_in_progress = (); %running_threads = ();