leostereo has asked for the wisdom of the Perl Monks concerning the following question:

Hi guys , I will try to sumarize my problem

After getting the monks wisdom, I got working a beatifull piece of code (my first time) working with threds. The script will read an ip list from a text file and will asign each ip to a thread, then each thread will will do snmp query to this ip and store the result on a mysql db.
THE PROBLEM: The script works great but it stops working after a random number of lines, sometimes it do 100 , 300 , 800 from a total of 16k ips.
After insert some prints lines on the code I noticed that the the workers are not dequeueing the ips from the ips queue.
I tryed modifing the queue size and the thread numbers but got same result.
I would like to know if is there some way to trigger a reset on the script clearing the queue or reseting the threads after some blocking condition ... or --- do someting to force the script to continue working. Ok , following the code:

#!/usr/bin/perl -slw use strict; use threads; use Thread::Queue; use Net::Ping; use DBI; $|=1; our $T = 10; ## 10 walkers; adjust to suit. my $community='public'; my $snmp_sinr; my $snmp_rsrp; my $snmp_rsrq; my $snmp_rssi; my $snmp_imsi; my $snmp_bsid; my $cpe; my $snmp_brand; my $snmp_dl; my $snmp_ul; my $output; my $brand; my $error = 'none'; my @result; my $ulrate; my $orig_bsid; my $dlrate; my $firmware; my $model; my $line; my $dbh; my $sth0; my $sth; my $query0; my $query; my $fh; my $qsize = 10; my $imsi; open ($fh,'>>','/opt/cpe_history_test/dates'); my $date=localtime(); print $fh "started at ",$date,"\n"; close $fh; sub listener { open(my $fh1, '<:encoding(UTF-8)', '/opt/cpe_history_test/connected') or die "Could not open file 'connected' $!"; my( $Qout ) = @_; while(<$fh1>){ $Qout->enqueue($_); } sleep(30); open ($fh,'>>','/opt/cpe_history_test/dates'); my $date=localtime(); print $fh "finished at ",$date,"\n"; close $fh; close $fh1; exit(0); } sub walker { my( $Qin, $Qout ) = @_; # while( $Qin->dequeue ) { ## receive work from listener while (defined(my $line = $Qin->dequeue())) { ## receive work from + listener my $ip = $line; chomp($ip); my $pending_jobs=$Qin->pending; my $ping_result = 0; my $p = new Net::Ping('icmp',1); for(my $a=0 ; $a < 3 ; $a++){ # print "attempt $a for $ip\n"; if ($p->ping($ip)){ $ping_result = 1; last; } } if ($ping_result){ #print "$ip is ALIVE\n"; ####snmp my $snmp_fver = '.1.3.6.1.4.1.2700.1.1.8.0'; $output=qx(snmpwalk -v2c -t1 -c $community $ip $snmp_fver 2>&1); chomp($output); if( $output eq "Timeout: No Response from $ip" ) { return; } else{ my @result=split(/:/,$output); if ($result[3]){ $firmware=$result[3]; $firmware=~s/ //g; $firmware=~s/"//g; $firmware=~s/\n//g; if(($firmware=~/SPC630/)||($firmware=~/DBG0521/)||($firmwa +re=~/SPC610/)||($firmware=~/SP006/)){ $snmp_sinr = '.1.3.6.1.4.1.2700.1.1.7.0'; $snmp_rsrp = '.1.3.6.1.4.1.2700.1.1.4.0'; $snmp_rsrq = '.1.3.6.1.4.1.2700.1.1.5.0'; $snmp_rssi = '.1.3.6.1.4.1.2700.1.1.6.0'; $snmp_imsi = '.1.3.6.1.4.1.2700.1.1.2.0'; $snmp_bsid = '.1.3.6.1.4.1.2700.1.1.3.0'; }elsif(($firmware=~/SPC914/)||($firmware=~/SPC892/)||($fir +mware=~/SPC927/)){ $snmp_sinr = '.1.3.6.1.4.1.2700.1.1.7.0'; $snmp_rsrp = '.1.3.6.1.4.1.2700.1.1.4.0'; $snmp_rsrq = '.1.3.6.1.4.1.2700.1.1.5.0'; $snmp_rssi = '.1.3.6.1.4.1.2700.1.1.6.0'; $snmp_imsi = '.1.3.6.1.4.1.2700.1.1.2.0'; $snmp_bsid = '.1.3.6.1.4.1.2700.1.1.11.0'; }else{ $snmp_fver = '.1.3.6.1.4.1.2700.1.1.21.0'; $output=qx(snmpwalk -v2c -t1 -c $community $ip $snmp_fver +2>&1); if ($result[3]){ $firmware=$result[3]; $firmware=~s/ //g; $firmware=~s/"//g; $firmware=~s/\n//g; if(($firmware=~/R15-ARG-P/)||($firmware=~/QCI4NU/) +||($firmware=~/C00SPC040w/)){ print $firmware." corresponde al set 3\n"; + } } print "firmware $firmware could not be resolved\n" +; next; } ############# $Qout->enqueue(join(',',$imsi,$ip,$firmware)); ##snmp_ok }else{ print "retornando\n"; next; return } } ####snmp ##ping_result_ok }else{ print "$ip is dead \n"; next; } } } use enum qw[ IN DBI_ENUM ]; my @Qs = map Thread::Queue->new(), 1 .. 2; # set up two Qs $Qs[0]->limit = $qsize; ## start the listener thread my $tListener = threads->create( \&listener, $Qs[ IN ] ); ## One for t +he listener to send work to the walkers ## start 10 walkers my @walkers = map{ threads->create( \&walker, @Qs[ IN, DBI_ENUM ] ) } +1 .. $T; ## And one for the walkers to forward data for adding to the + db require DBI; ## Avoid loading DBI into threads $dbh = DBI->connect("DBI:mysql:database=cdba_cpe_history;host=172.31.1 +60.207;port=3306","history_process","neTing321!" ); #$sth = $dbh->prepare("INSERT INTO ? (ip,bsid,firmware) VALUES (?, ?, +?)"); while (defined(my $item = $Qs[DBI_ENUM]->dequeue())) { ## receive +work from listener my($imsi, $ip, $bsid, $firmware) = split(',', $item); #Retrieve in +dividual data if (($rssi <= 0) && ($rssi >= -100) && ($sinr >= 0) && ($sinr +<= 120) && ($rsrp <= 0) && ($rsrp >= -100) ){ print "in db task item is $imsi -> $ip"; $query0 = "CREATE TABLE IF NOT EXISTS `rf_$imsi` ( `date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURREN +T_TIMESTAMP, `ip` varchar(16) DEFAULT NULL, `bsid` varchar(15) DEFAULT NULL, `firmware` varchar(25) DEFAULT NULL )"; #print $query0,"\n"; $sth0 = $dbh->prepare($query0); $sth0->execute(); $query = "INSERT INTO rf_$imsi (ip,bsid,firmware) VALUES ('$ip', '$bsi +d', '$firmware')"; # print $query,"\n"; $sth = $dbh->prepare($query); $sth->execute(); ## bind and execute } } $dbh->disconnect();

Any ideas would be weelcome. Regards, Leo.

Replies are listed 'Best First'.
Re: Some issues using Thread::Queue
by BrowserUk (Patriarch) on Dec 07, 2016 at 17:37 UTC

    What happens if you change this: $Qs[0]->limit = $qsize; to this: $Qs[0]->limit = $T * 200;?


    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority". The enemy of (IT) success is complexity.
    In the absence of evidence, opinion is indistinguishable from prejudice.
      Did this fix your problem?

        Dear BrowserUk it didn'work. In fact now it processes less lines than before.
        Any other idea ?? perhaps you can sugest some prints for debuging ?
        Regards;
        Leo.