leostereo has asked for the wisdom of the Perl Monks concerning the following question:
Hi guys , I will try to sumarize my problem
After getting the monks wisdom, I got working a beatifull piece of code (my first time) working with threds.
The script will read an ip list from a text file and will asign each ip to a thread, then each thread will will do snmp query to this ip and store the result on a mysql db.
THE PROBLEM:
The script works great but it stops working after a random number of lines, sometimes it do 100 , 300 , 800 from a total of 16k ips.
After insert some prints lines on the code I noticed that the the workers are not dequeueing the ips from the ips queue.
I tryed modifing the queue size and the thread numbers but got same result.
I would like to know if is there some way to trigger a reset on the script clearing the queue or reseting the threads after some blocking condition ... or --- do someting to force the script to continue working.
Ok , following the code:
#!/usr/bin/perl -slw use strict; use threads; use Thread::Queue; use Net::Ping; use DBI; $|=1; our $T = 10; ## 10 walkers; adjust to suit. my $community='public'; my $snmp_sinr; my $snmp_rsrp; my $snmp_rsrq; my $snmp_rssi; my $snmp_imsi; my $snmp_bsid; my $cpe; my $snmp_brand; my $snmp_dl; my $snmp_ul; my $output; my $brand; my $error = 'none'; my @result; my $ulrate; my $orig_bsid; my $dlrate; my $firmware; my $model; my $line; my $dbh; my $sth0; my $sth; my $query0; my $query; my $fh; my $qsize = 10; my $imsi; open ($fh,'>>','/opt/cpe_history_test/dates'); my $date=localtime(); print $fh "started at ",$date,"\n"; close $fh; sub listener { open(my $fh1, '<:encoding(UTF-8)', '/opt/cpe_history_test/connected') or die "Could not open file 'connected' $!"; my( $Qout ) = @_; while(<$fh1>){ $Qout->enqueue($_); } sleep(30); open ($fh,'>>','/opt/cpe_history_test/dates'); my $date=localtime(); print $fh "finished at ",$date,"\n"; close $fh; close $fh1; exit(0); } sub walker { my( $Qin, $Qout ) = @_; # while( $Qin->dequeue ) { ## receive work from listener while (defined(my $line = $Qin->dequeue())) { ## receive work from + listener my $ip = $line; chomp($ip); my $pending_jobs=$Qin->pending; my $ping_result = 0; my $p = new Net::Ping('icmp',1); for(my $a=0 ; $a < 3 ; $a++){ # print "attempt $a for $ip\n"; if ($p->ping($ip)){ $ping_result = 1; last; } } if ($ping_result){ #print "$ip is ALIVE\n"; ####snmp my $snmp_fver = '.1.3.6.1.4.1.2700.1.1.8.0'; $output=qx(snmpwalk -v2c -t1 -c $community $ip $snmp_fver 2>&1); chomp($output); if( $output eq "Timeout: No Response from $ip" ) { return; } else{ my @result=split(/:/,$output); if ($result[3]){ $firmware=$result[3]; $firmware=~s/ //g; $firmware=~s/"//g; $firmware=~s/\n//g; if(($firmware=~/SPC630/)||($firmware=~/DBG0521/)||($firmwa +re=~/SPC610/)||($firmware=~/SP006/)){ $snmp_sinr = '.1.3.6.1.4.1.2700.1.1.7.0'; $snmp_rsrp = '.1.3.6.1.4.1.2700.1.1.4.0'; $snmp_rsrq = '.1.3.6.1.4.1.2700.1.1.5.0'; $snmp_rssi = '.1.3.6.1.4.1.2700.1.1.6.0'; $snmp_imsi = '.1.3.6.1.4.1.2700.1.1.2.0'; $snmp_bsid = '.1.3.6.1.4.1.2700.1.1.3.0'; }elsif(($firmware=~/SPC914/)||($firmware=~/SPC892/)||($fir +mware=~/SPC927/)){ $snmp_sinr = '.1.3.6.1.4.1.2700.1.1.7.0'; $snmp_rsrp = '.1.3.6.1.4.1.2700.1.1.4.0'; $snmp_rsrq = '.1.3.6.1.4.1.2700.1.1.5.0'; $snmp_rssi = '.1.3.6.1.4.1.2700.1.1.6.0'; $snmp_imsi = '.1.3.6.1.4.1.2700.1.1.2.0'; $snmp_bsid = '.1.3.6.1.4.1.2700.1.1.11.0'; }else{ $snmp_fver = '.1.3.6.1.4.1.2700.1.1.21.0'; $output=qx(snmpwalk -v2c -t1 -c $community $ip $snmp_fver +2>&1); if ($result[3]){ $firmware=$result[3]; $firmware=~s/ //g; $firmware=~s/"//g; $firmware=~s/\n//g; if(($firmware=~/R15-ARG-P/)||($firmware=~/QCI4NU/) +||($firmware=~/C00SPC040w/)){ print $firmware." corresponde al set 3\n"; + } } print "firmware $firmware could not be resolved\n" +; next; } ############# $Qout->enqueue(join(',',$imsi,$ip,$firmware)); ##snmp_ok }else{ print "retornando\n"; next; return } } ####snmp ##ping_result_ok }else{ print "$ip is dead \n"; next; } } } use enum qw[ IN DBI_ENUM ]; my @Qs = map Thread::Queue->new(), 1 .. 2; # set up two Qs $Qs[0]->limit = $qsize; ## start the listener thread my $tListener = threads->create( \&listener, $Qs[ IN ] ); ## One for t +he listener to send work to the walkers ## start 10 walkers my @walkers = map{ threads->create( \&walker, @Qs[ IN, DBI_ENUM ] ) } +1 .. $T; ## And one for the walkers to forward data for adding to the + db require DBI; ## Avoid loading DBI into threads $dbh = DBI->connect("DBI:mysql:database=cdba_cpe_history;host=172.31.1 +60.207;port=3306","history_process","neTing321!" ); #$sth = $dbh->prepare("INSERT INTO ? (ip,bsid,firmware) VALUES (?, ?, +?)"); while (defined(my $item = $Qs[DBI_ENUM]->dequeue())) { ## receive +work from listener my($imsi, $ip, $bsid, $firmware) = split(',', $item); #Retrieve in +dividual data if (($rssi <= 0) && ($rssi >= -100) && ($sinr >= 0) && ($sinr +<= 120) && ($rsrp <= 0) && ($rsrp >= -100) ){ print "in db task item is $imsi -> $ip"; $query0 = "CREATE TABLE IF NOT EXISTS `rf_$imsi` ( `date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURREN +T_TIMESTAMP, `ip` varchar(16) DEFAULT NULL, `bsid` varchar(15) DEFAULT NULL, `firmware` varchar(25) DEFAULT NULL )"; #print $query0,"\n"; $sth0 = $dbh->prepare($query0); $sth0->execute(); $query = "INSERT INTO rf_$imsi (ip,bsid,firmware) VALUES ('$ip', '$bsi +d', '$firmware')"; # print $query,"\n"; $sth = $dbh->prepare($query); $sth->execute(); ## bind and execute } } $dbh->disconnect();
Any ideas would be weelcome. Regards, Leo.
|
|---|
| Replies are listed 'Best First'. | |
|---|---|
|
Re: Some issues using Thread::Queue
by BrowserUk (Patriarch) on Dec 07, 2016 at 17:37 UTC | |
by BrowserUk (Patriarch) on Dec 09, 2016 at 01:41 UTC | |
by leostereo (Beadle) on Dec 12, 2016 at 17:36 UTC | |
by marioroy (Prior) on Jul 01, 2017 at 05:32 UTC |