Beefy Boxes and Bandwidth Generously Provided by pair Networks
Perl: the Markov chain saw
 
PerlMonks  

Re^4: Fast provider feeding slow consumer

by Anonymous Monk
on Apr 27, 2016 at 16:12 UTC ( [id://1161647]=note: print w/replies, xml ) Need Help??


in reply to Re^3: Fast provider feeding slow consumer
in thread Fast provider feeding slow consumer

Dear BrowserUk , this is currently running code.
Regards,
Leo.
#!/usr/bin/perl -slw use strict; use threads; use Thread::Queue; our $T = 20; ## 10 walkers; adjust to suit. my $cpe; my $community; my $snmp_rssi; my $output; my $error = 'none'; my $rssi; my @result; my $line; sub listener { my( $Qout ) = @_; require IO::Socket::INET::Daemon; ## Requiring here means other +threads don't carry the redundant weight my $host = new IO::Socket::INET::Daemon( host => '172.24.3.208', port => 7777, timeout => 20, callback => { data => sub { my ($io, $host) = @_; chomp( my $line = <$io> ); return 0 unless $line; $Qout->enqueue( $line ); ## send work to listener ( +is not sending to walker ?) return !0; } }, ); $host->run; return; } sub walker { my( $Qin, $Qout ) = @_; # while( $Qin->dequeue ) { ## receive work from listener while (defined(my $item = $Qin->dequeue())) { ## receive work from + listener #print "here \$\_ is : $item"; ### my ($type, $ip, $mac, $bsid, $datecode) = split(',', $item); print "on walker we have $ip | $mac | $bsid\n"; $cpe=$ip; $mac=~s/-//g; $community='public'; $output=qx(snmpwalk -v2c -t1 -c $community $cpe $snmp_rssi + 2>&1); if( $output eq "Timeout: No Response from $ip" ) { $rssi=0; $error='SNMP not responding. Upgrade firmware'; } else { @result=split(/:/,$output); $rssi=$result[3]; $rssi=~s/ //g; $rssi=~s/\n//g; if($rssi < -100) { $rssi=$rssi/100; } $rssi=int($rssi); } if(($mac ne '') && ($rssi ne '')){ #### I must check rssi in +order to avoid introducing empty values on db. $Qout->enqueue(join(',',$mac,$ip,$bsid,$rssi));## data + items to DBI }else{ $line = join(',',$type,$ip,$mac,$bsid); print "will reenqueue $line\n"; sleep(5); $Qin->enqueue($line); } } } use enum qw[ IN DBI_ENUM ]; my @Qs = map Thread::Queue->new(), 1 .. 2; # set up two Qs ## start the listener thread my $tListener = threads->create( \&listener, $Qs[ IN ] ); ## One for t +he listener to send work to the walkers ## start 10 walkers my @walkers = map{ threads->create( \&walker, @Qs[ IN, DBI_ENUM ] ) } +1 .. $T; ## And one for the walkers to forward data for adding to the + db require DBI; ## Avoid loading DBI into threads my $dbh = DBI->connect("DBI:mysql:database=cpe_info;host=172.24.3.207; +port=3306","account_process","neting.!" ); my $sth = $dbh->prepare("INSERT INTO cpe_info(mac,ip,bsid,rssi) VALUE +S (?, ?, ?, ?) ON DUPLICATE KEY UPDATE ip = ?, bsid = ?, rssi = +?"); ## process data produced by walkers #while( $Qs[DBI_ENUM]->dequeue ) { while (defined(my $item = $Qs[DBI_ENUM]->dequeue())) { ## receive +work from listener my($mac, $ip, $bsid, $rssi) = split(',', $item); #Retrieve individ +ual data print "in db task item is $item"; $sth->execute($mac, $ip, $bsid, $rssi, $ip, $bsid, $rssi); + ## bind and execute } $dbh->disconnect();

Please note:
It is still returning some empty values so Im checking and re-enqueueing , the lines for the one the query did not work, It takes between 3-8 times to get a valid value.
Im thinking about preparing another subroutine to process those wrong lines.
I also put a little delay just to avoid a loop in the process
On line 28 , a ask if the comment is ok
Ok, That is all. Thanks.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://1161647]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others romping around the Monastery: (2)
As of 2024-04-19 21:15 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found