in reply to Re: Fast provider feeding slow consumer in thread Fast provider feeding slow consumer
Dear BrowserUk , the script is almost done, After applying both changes it begun working, but there is something wrong when the walker receives the line from the listener, it receives only a number instead of the complete line.
I put them to print as following:
sub listener {
my( $Qout ) = @_;
require IO::Socket::INET::Daemon; ## Requiring here means other
+threads don't carry the redundant weight
my $host = new IO::Socket::INET::Daemon(
host => '172.24.3.208', port => 7777, timeout => 20,
callback => { data =>
sub {
my ($io, $host) = @_;
chomp( my $line = <$io> );
return 0 unless $line;
print "here line is : $line";
$Qout->enqueue( $line ); ## send work to listener
return !0;
}
},
);
$host->run;
return;
}
sub walker {
my( $Qin, $Qout ) = @_;
while( $Qin->dequeue ) { ## receive work from listener
print "here \$\_ is : $_";
my( $type, $ip, $mac, $bsid, $datecode ) = split( ',', $_ );
$mac =~ tr[-][]d;
and got the following:
here line is : 2,190.115.32.180,00-21-07-2E-DD-6A,000044000070,20160426105156,D
here $_ is : 7
Use of uninitialized value in transliteration (tr///) at ./lines_dispacher_and_consumer_threads.pl line 34.
Use of uninitialized value $ip
....
...
..
.
Im reading the Thread::Queue documentacion and the example provided looks very similar like your method ... I will keep investigating it.
Thanks
Re^3: Fast provider feeding slow consumer
by BrowserUk (Patriarch) on Apr 26, 2016 at 15:20 UTC
|
it receives only a number instead of the complete line.
I have really no idea how that could possibly happen; nor do I see anything in the code you posted that would explain it.
If you haven't already solved it; please post the full code that demonstrates the problem and I'll try and re-create it here.
With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
In the absence of evidence, opinion is indistinguishable from prejudice.
| [reply] |
|
Same reply , but this time im logged in ... sorry for that.
Dear BrowserUk
Following , the current running code.
Please note that .
Since there is still some queries that returns empty values, I must check the result before sending to the db queue.
When querie returns empty rssi , I put in the queue back and it takes between 3-8 times to get a valid value.
I also introduce some delay, in order to avoid a loop in this process.
In comment at line 28 , Im asking if are they going to the walker instead ?
That's all .
Leandro.
#!/usr/bin/perl -slw
use strict;
use threads;
use Thread::Queue;
our $T = 20; ## 10 walkers; adjust to suit.
my $cpe;
my $community;
my $snmp_rssi;
my $output;
my $error = 'none';
my $rssi;
my @result;
my $line;
sub listener {
my( $Qout ) = @_;
require IO::Socket::INET::Daemon; ## Requiring here means other
+threads don't carry the redundant weight
my $host = new IO::Socket::INET::Daemon(
host => '172.24.3.208', port => 7777, timeout => 20,
callback => { data =>
sub {
my ($io, $host) = @_;
chomp( my $line = <$io> );
return 0 unless $line;
$Qout->enqueue( $line ); ## send work to listener (
+is not sending to walker ?)
return !0;
}
},
);
$host->run;
return;
}
sub walker {
my( $Qin, $Qout ) = @_;
# while( $Qin->dequeue ) { ## receive work from listener
while (defined(my $item = $Qin->dequeue())) { ## receive work from
+ listener
#print "here \$\_ is : $item";
###
my ($type, $ip, $mac, $bsid, $datecode) = split(',', $item);
print "on walker we have $ip | $mac | $bsid\n";
$cpe=$ip;
$mac=~s/-//g;
$community='public';
$output=qx(snmpwalk -v2c -t1 -c $community $cpe $snmp_rssi
+ 2>&1);
if( $output eq "Timeout: No Response from $ip" ) {
$rssi=0;
$error='SNMP not responding. Upgrade firmware';
}
else
{
@result=split(/:/,$output);
$rssi=$result[3];
$rssi=~s/ //g;
$rssi=~s/\n//g;
if($rssi < -100) {
$rssi=$rssi/100;
}
$rssi=int($rssi);
}
if(($mac ne '') && ($rssi ne '')){ #### I must check rssi in
+order to avoid introducing empty values on db.
$Qout->enqueue(join(',',$mac,$ip,$bsid,$rssi));## data
+ items to DBI
}else{
$line = join(',',$type,$ip,$mac,$bsid);
print "will reenqueue $line\n";
sleep(5);
$Qin->enqueue($line);
}
}
}
use enum qw[ IN DBI_ENUM ];
my @Qs = map Thread::Queue->new(), 1 .. 2; # set up two Qs
## start the listener thread
my $tListener = threads->create( \&listener, $Qs[ IN ] ); ## One for t
+he listener to send work to the walkers
## start 10 walkers
my @walkers = map{ threads->create( \&walker, @Qs[ IN, DBI_ENUM ] ) }
+1 .. $T; ## And one for the walkers to forward data for adding to the
+ db
require DBI; ## Avoid loading DBI into threads
my $dbh = DBI->connect("DBI:mysql:database=cpe_info;host=172.24.3.207;
+port=3306","account_process","neting.!" );
my $sth = $dbh->prepare("INSERT INTO cpe_info(mac,ip,bsid,rssi) VALUE
+S
(?, ?, ?, ?) ON DUPLICATE KEY UPDATE ip = ?, bsid = ?, rssi =
+?");
## process data produced by walkers
#while( $Qs[DBI_ENUM]->dequeue ) {
while (defined(my $item = $Qs[DBI_ENUM]->dequeue())) { ## receive
+work from listener
my($mac, $ip, $bsid, $rssi) = split(',', $item); #Retrieve individ
+ual data
print "in db task item is $item";
$sth->execute($mac, $ip, $bsid, $rssi, $ip, $bsid, $rssi);
+ ## bind and execute
}
$dbh->disconnect();
| [reply] [d/l] |
|
This is the third copy of the same code you've posted.
I downloaded the first copy a couple of hours ago; but as I don't have your DB; nor a bunch of willing clients connecting; nor a gaggle of machines who's smnp I can walk; it's going to take me a while to set things up here to see what is going on in your code.
Be patient.
With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
In the absence of evidence, opinion is indistinguishable from prejudice.
s going to take me a while to set things up here to see what is going on in your code.
| [reply] |
|
Since there is still some queries that returns empty values,
Leandro, this appears to be a bug in IO::Socket::INET::Daemon, in as much as, it frequently calls the data callback when there is nothing to available read; and getline returns undef.
I do not have the time to debug that module; but the simple workaround to it, is to move one line of code in your callback. change this:
23 callback => { data =>
24 sub {
25 my ($io, $host) = @_;
26 chomp( my $line = <$io> );
27 return 0 unless $line; ####### MOVE THIS LINE
+ UP AND TEST FOR DEFINEDNESS ###################
28 $Qout->enqueue( $line ); ## send work to listen
+er (is not sending to walker ?)
29 return !0;
30 }
31 },
To become this: 23 callback => { data =>
24 sub {
25 my ($io, $host) = @_;
26 return 0 unless defined $line; ################ M
+OVED LINE #####################
27 chomp( my $line = <$io> );
28 $Qout->enqueue( $line ); ## send work to walker
29 return !0;
30 }
31 },
I think that should get you further.
(And yes, the comment was wrong!).
(And BTW: you really should get into the habit of using strict and my. Debugging would have been much simpler had your code been strict compliant.)
With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
In the absence of evidence, opinion is indistinguishable from prejudice.
| [reply] [d/l] [select] |
|
Dear BrowserUk
Following , the current running code.
Please note that .
Since there is still some queries that returns empty values, I must check the result before sending to the db queue.
When querie returns empty rssi , I put in the queue back and it takes between 3-8 times to get a valid value.
I also introduce some delay, in order to avoid a loop in this process.
In comment at line 28 , Im asking if are they going to the walker instead ?
That's all .
Leandro.
#!/usr/bin/perl -slw
use strict;
use threads;
use Thread::Queue;
our $T = 20; ## 10 walkers; adjust to suit.
my $cpe;
my $community;
my $snmp_rssi;
my $output;
my $error = 'none';
my $rssi;
my @result;
my $line;
sub listener {
my( $Qout ) = @_;
require IO::Socket::INET::Daemon; ## Requiring here means other
+threads don't carry the redundant weight
my $host = new IO::Socket::INET::Daemon(
host => '172.24.3.208', port => 7777, timeout => 20,
callback => { data =>
sub {
my ($io, $host) = @_;
chomp( my $line = <$io> );
return 0 unless $line;
$Qout->enqueue( $line ); ## send work to listener (
+is not sending to walker ?)
return !0;
}
},
);
$host->run;
return;
}
sub walker {
my( $Qin, $Qout ) = @_;
# while( $Qin->dequeue ) { ## receive work from listener
while (defined(my $item = $Qin->dequeue())) { ## receive work from
+ listener
#print "here \$\_ is : $item";
###
my ($type, $ip, $mac, $bsid, $datecode) = split(',', $item);
print "on walker we have $ip | $mac | $bsid\n";
$cpe=$ip;
$mac=~s/-//g;
$community='public';
$output=qx(snmpwalk -v2c -t1 -c $community $cpe $snmp_rssi
+ 2>&1);
if( $output eq "Timeout: No Response from $ip" ) {
$rssi=0;
$error='SNMP not responding. Upgrade firmware';
}
else
{
@result=split(/:/,$output);
$rssi=$result[3];
$rssi=~s/ //g;
$rssi=~s/\n//g;
if($rssi < -100) {
$rssi=$rssi/100;
}
$rssi=int($rssi);
}
if(($mac ne '') && ($rssi ne '')){ #### I must check rssi in
+order to avoid introducing empty values on db.
$Qout->enqueue(join(',',$mac,$ip,$bsid,$rssi));## data
+ items to DBI
}else{
$line = join(',',$type,$ip,$mac,$bsid);
print "will reenqueue $line\n";
sleep(5);
$Qin->enqueue($line);
}
}
}
use enum qw[ IN DBI_ENUM ];
my @Qs = map Thread::Queue->new(), 1 .. 2; # set up two Qs
## start the listener thread
my $tListener = threads->create( \&listener, $Qs[ IN ] ); ## One for t
+he listener to send work to the walkers
## start 10 walkers
my @walkers = map{ threads->create( \&walker, @Qs[ IN, DBI_ENUM ] ) }
+1 .. $T; ## And one for the walkers to forward data for adding to the
+ db
require DBI; ## Avoid loading DBI into threads
my $dbh = DBI->connect("DBI:mysql:database=cpe_info;host=172.24.3.207;
+port=3306","account_process","neting.!" );
my $sth = $dbh->prepare("INSERT INTO cpe_info(mac,ip,bsid,rssi) VALUE
+S
(?, ?, ?, ?) ON DUPLICATE KEY UPDATE ip = ?, bsid = ?, rssi =
+?");
## process data produced by walkers
#while( $Qs[DBI_ENUM]->dequeue ) {
while (defined(my $item = $Qs[DBI_ENUM]->dequeue())) { ## receive
+work from listener
my($mac, $ip, $bsid, $rssi) = split(',', $item); #Retrieve individ
+ual data
print "in db task item is $item";
$sth->execute($mac, $ip, $bsid, $rssi, $ip, $bsid, $rssi);
+ ## bind and execute
}
$dbh->disconnect();
| [reply] [d/l] |
|
Dear BrowserUk , this is currently running code.
Regards,
Leo.
#!/usr/bin/perl -slw
use strict;
use threads;
use Thread::Queue;
our $T = 20; ## 10 walkers; adjust to suit.
my $cpe;
my $community;
my $snmp_rssi;
my $output;
my $error = 'none';
my $rssi;
my @result;
my $line;
sub listener {
my( $Qout ) = @_;
require IO::Socket::INET::Daemon; ## Requiring here means other
+threads don't carry the redundant weight
my $host = new IO::Socket::INET::Daemon(
host => '172.24.3.208', port => 7777, timeout => 20,
callback => { data =>
sub {
my ($io, $host) = @_;
chomp( my $line = <$io> );
return 0 unless $line;
$Qout->enqueue( $line ); ## send work to listener (
+is not sending to walker ?)
return !0;
}
},
);
$host->run;
return;
}
sub walker {
my( $Qin, $Qout ) = @_;
# while( $Qin->dequeue ) { ## receive work from listener
while (defined(my $item = $Qin->dequeue())) { ## receive work from
+ listener
#print "here \$\_ is : $item";
###
my ($type, $ip, $mac, $bsid, $datecode) = split(',', $item);
print "on walker we have $ip | $mac | $bsid\n";
$cpe=$ip;
$mac=~s/-//g;
$community='public';
$output=qx(snmpwalk -v2c -t1 -c $community $cpe $snmp_rssi
+ 2>&1);
if( $output eq "Timeout: No Response from $ip" ) {
$rssi=0;
$error='SNMP not responding. Upgrade firmware';
}
else
{
@result=split(/:/,$output);
$rssi=$result[3];
$rssi=~s/ //g;
$rssi=~s/\n//g;
if($rssi < -100) {
$rssi=$rssi/100;
}
$rssi=int($rssi);
}
if(($mac ne '') && ($rssi ne '')){ #### I must check rssi in
+order to avoid introducing empty values on db.
$Qout->enqueue(join(',',$mac,$ip,$bsid,$rssi));## data
+ items to DBI
}else{
$line = join(',',$type,$ip,$mac,$bsid);
print "will reenqueue $line\n";
sleep(5);
$Qin->enqueue($line);
}
}
}
use enum qw[ IN DBI_ENUM ];
my @Qs = map Thread::Queue->new(), 1 .. 2; # set up two Qs
## start the listener thread
my $tListener = threads->create( \&listener, $Qs[ IN ] ); ## One for t
+he listener to send work to the walkers
## start 10 walkers
my @walkers = map{ threads->create( \&walker, @Qs[ IN, DBI_ENUM ] ) }
+1 .. $T; ## And one for the walkers to forward data for adding to the
+ db
require DBI; ## Avoid loading DBI into threads
my $dbh = DBI->connect("DBI:mysql:database=cpe_info;host=172.24.3.207;
+port=3306","account_process","neting.!" );
my $sth = $dbh->prepare("INSERT INTO cpe_info(mac,ip,bsid,rssi) VALUE
+S
(?, ?, ?, ?) ON DUPLICATE KEY UPDATE ip = ?, bsid = ?, rssi =
+?");
## process data produced by walkers
#while( $Qs[DBI_ENUM]->dequeue ) {
while (defined(my $item = $Qs[DBI_ENUM]->dequeue())) { ## receive
+work from listener
my($mac, $ip, $bsid, $rssi) = split(',', $item); #Retrieve individ
+ual data
print "in db task item is $item";
$sth->execute($mac, $ip, $bsid, $rssi, $ip, $bsid, $rssi);
+ ## bind and execute
}
$dbh->disconnect();
Please note:
It is still returning some empty values so Im checking and re-enqueueing , the lines for the one the query did not work, It takes between 3-8 times to get a valid value.
Im thinking about preparing another subroutine to process those wrong lines.
I also put a little delay just to avoid a loop in the process
On line 28 , a ask if the comment is ok
Ok,
That is all.
Thanks. | [reply] [d/l] |
Re^3: Fast provider feeding slow consumer
by leostereo (Beadle) on Apr 26, 2016 at 15:31 UTC
|
Ok , the script is working , there is one good and one bad news.
The bad news is that there is 1 on 10 aprox snmp queries that returns empty rssi values.
(I manually cheked that device is alive and can process snmp queries).
I tryed incrementing the number of workers to 20 but it has no effect.
I wonder if is it possible to put those empty lines back on the walkers queue.
The good news is that the processor load is 8 times less thant the script at the begining of this post.
I need to read/learn about why threads are much efficient than fork/prefork.
O .... lets try.
And ...
Thanks fo everything!
| [reply] |
|
|