Beefy Boxes and Bandwidth Generously Provided by pair Networks
Problems? Is your data what you think it is?
 
PerlMonks  

Re^2: Fast provider feeding slow consumer

by leostereo (Beadle)
on Apr 26, 2016 at 14:08 UTC ( [id://1161556]=note: print w/replies, xml ) Need Help??


in reply to Re: Fast provider feeding slow consumer
in thread Fast provider feeding slow consumer

Dear BrowserUk , the script is almost done, After applying both changes it begun working, but there is something wrong when the walker receives the line from the listener, it receives only a number instead of the complete line.
I put them to print as following:

sub listener { my( $Qout ) = @_; require IO::Socket::INET::Daemon; ## Requiring here means other +threads don't carry the redundant weight my $host = new IO::Socket::INET::Daemon( host => '172.24.3.208', port => 7777, timeout => 20, callback => { data => sub { my ($io, $host) = @_; chomp( my $line = <$io> ); return 0 unless $line; print "here line is : $line"; $Qout->enqueue( $line ); ## send work to listener return !0; } }, ); $host->run; return; } sub walker { my( $Qin, $Qout ) = @_; while( $Qin->dequeue ) { ## receive work from listener print "here \$\_ is : $_"; my( $type, $ip, $mac, $bsid, $datecode ) = split( ',', $_ ); $mac =~ tr[-][]d;
and got the following:
here line is : 2,190.115.32.180,00-21-07-2E-DD-6A,000044000070,20160426105156,D
here $_ is : 7
Use of uninitialized value in transliteration (tr///) at ./lines_dispacher_and_consumer_threads.pl line 34.
Use of uninitialized value $ip
....
...
..
.

Im reading the Thread::Queue documentacion and the example provided looks very similar like your method ... I will keep investigating it. Thanks

Replies are listed 'Best First'.
Re^3: Fast provider feeding slow consumer
by BrowserUk (Patriarch) on Apr 26, 2016 at 15:20 UTC
    it receives only a number instead of the complete line.

    I have really no idea how that could possibly happen; nor do I see anything in the code you posted that would explain it.

    If you haven't already solved it; please post the full code that demonstrates the problem and I'll try and re-create it here.


    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority". I knew I was on the right track :)
    In the absence of evidence, opinion is indistinguishable from prejudice.
      Same reply , but this time im logged in ... sorry for that.
      Dear BrowserUk
      Following , the current running code.
      Please note that
      . Since there is still some queries that returns empty values, I must check the result before sending to the db queue.
      When querie returns empty rssi , I put in the queue back and it takes between 3-8 times to get a valid value.
      I also introduce some delay, in order to avoid a loop in this process.
      In comment at line 28 , Im asking if are they going to the walker instead ?
      That's all
      . Leandro.
      #!/usr/bin/perl -slw use strict; use threads; use Thread::Queue; our $T = 20; ## 10 walkers; adjust to suit. my $cpe; my $community; my $snmp_rssi; my $output; my $error = 'none'; my $rssi; my @result; my $line; sub listener { my( $Qout ) = @_; require IO::Socket::INET::Daemon; ## Requiring here means other +threads don't carry the redundant weight my $host = new IO::Socket::INET::Daemon( host => '172.24.3.208', port => 7777, timeout => 20, callback => { data => sub { my ($io, $host) = @_; chomp( my $line = <$io> ); return 0 unless $line; $Qout->enqueue( $line ); ## send work to listener ( +is not sending to walker ?) return !0; } }, ); $host->run; return; } sub walker { my( $Qin, $Qout ) = @_; # while( $Qin->dequeue ) { ## receive work from listener while (defined(my $item = $Qin->dequeue())) { ## receive work from + listener #print "here \$\_ is : $item"; ### my ($type, $ip, $mac, $bsid, $datecode) = split(',', $item); print "on walker we have $ip | $mac | $bsid\n"; $cpe=$ip; $mac=~s/-//g; $community='public'; $output=qx(snmpwalk -v2c -t1 -c $community $cpe $snmp_rssi + 2>&1); if( $output eq "Timeout: No Response from $ip" ) { $rssi=0; $error='SNMP not responding. Upgrade firmware'; } else { @result=split(/:/,$output); $rssi=$result[3]; $rssi=~s/ //g; $rssi=~s/\n//g; if($rssi < -100) { $rssi=$rssi/100; } $rssi=int($rssi); } if(($mac ne '') && ($rssi ne '')){ #### I must check rssi in +order to avoid introducing empty values on db. $Qout->enqueue(join(',',$mac,$ip,$bsid,$rssi));## data + items to DBI }else{ $line = join(',',$type,$ip,$mac,$bsid); print "will reenqueue $line\n"; sleep(5); $Qin->enqueue($line); } } } use enum qw[ IN DBI_ENUM ]; my @Qs = map Thread::Queue->new(), 1 .. 2; # set up two Qs ## start the listener thread my $tListener = threads->create( \&listener, $Qs[ IN ] ); ## One for t +he listener to send work to the walkers ## start 10 walkers my @walkers = map{ threads->create( \&walker, @Qs[ IN, DBI_ENUM ] ) } +1 .. $T; ## And one for the walkers to forward data for adding to the + db require DBI; ## Avoid loading DBI into threads my $dbh = DBI->connect("DBI:mysql:database=cpe_info;host=172.24.3.207; +port=3306","account_process","neting.!" ); my $sth = $dbh->prepare("INSERT INTO cpe_info(mac,ip,bsid,rssi) VALUE +S (?, ?, ?, ?) ON DUPLICATE KEY UPDATE ip = ?, bsid = ?, rssi = +?"); ## process data produced by walkers #while( $Qs[DBI_ENUM]->dequeue ) { while (defined(my $item = $Qs[DBI_ENUM]->dequeue())) { ## receive +work from listener my($mac, $ip, $bsid, $rssi) = split(',', $item); #Retrieve individ +ual data print "in db task item is $item"; $sth->execute($mac, $ip, $bsid, $rssi, $ip, $bsid, $rssi); + ## bind and execute } $dbh->disconnect();

        This is the third copy of the same code you've posted.

        I downloaded the first copy a couple of hours ago; but as I don't have your DB; nor a bunch of willing clients connecting; nor a gaggle of machines who's smnp I can walk; it's going to take me a while to set things up here to see what is going on in your code.

        Be patient.


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority". I knew I was on the right track :)
        In the absence of evidence, opinion is indistinguishable from prejudice.
        s going to take me a while to set things up here to see what is going on in your code.
        Since there is still some queries that returns empty values,

        Leandro, this appears to be a bug in IO::Socket::INET::Daemon, in as much as, it frequently calls the data callback when there is nothing to available read; and getline returns undef.

        I do not have the time to debug that module; but the simple workaround to it, is to move one line of code in your callback. change this:

        23 callback => { data => 24 sub { 25 my ($io, $host) = @_; 26 chomp( my $line = <$io> ); 27 return 0 unless $line; ####### MOVE THIS LINE + UP AND TEST FOR DEFINEDNESS ################### 28 $Qout->enqueue( $line ); ## send work to listen +er (is not sending to walker ?) 29 return !0; 30 } 31 },

        To become this:

        23 callback => { data => 24 sub { 25 my ($io, $host) = @_; 26 return 0 unless defined $line; ################ M +OVED LINE ##################### 27 chomp( my $line = <$io> ); 28 $Qout->enqueue( $line ); ## send work to walker 29 return !0; 30 } 31 },

        I think that should get you further.

        (And yes, the comment was wrong!).

        (And BTW: you really should get into the habit of using strict and my. Debugging would have been much simpler had your code been strict compliant.)


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority". I knew I was on the right track :)
        In the absence of evidence, opinion is indistinguishable from prejudice.
      Dear BrowserUk
      Following , the current running code.
      Please note that
      . Since there is still some queries that returns empty values, I must check the result before sending to the db queue.
      When querie returns empty rssi , I put in the queue back and it takes between 3-8 times to get a valid value.
      I also introduce some delay, in order to avoid a loop in this process.
      In comment at line 28 , Im asking if are they going to the walker instead ?
      That's all
      . Leandro.
      #!/usr/bin/perl -slw use strict; use threads; use Thread::Queue; our $T = 20; ## 10 walkers; adjust to suit. my $cpe; my $community; my $snmp_rssi; my $output; my $error = 'none'; my $rssi; my @result; my $line; sub listener { my( $Qout ) = @_; require IO::Socket::INET::Daemon; ## Requiring here means other +threads don't carry the redundant weight my $host = new IO::Socket::INET::Daemon( host => '172.24.3.208', port => 7777, timeout => 20, callback => { data => sub { my ($io, $host) = @_; chomp( my $line = <$io> ); return 0 unless $line; $Qout->enqueue( $line ); ## send work to listener ( +is not sending to walker ?) return !0; } }, ); $host->run; return; } sub walker { my( $Qin, $Qout ) = @_; # while( $Qin->dequeue ) { ## receive work from listener while (defined(my $item = $Qin->dequeue())) { ## receive work from + listener #print "here \$\_ is : $item"; ### my ($type, $ip, $mac, $bsid, $datecode) = split(',', $item); print "on walker we have $ip | $mac | $bsid\n"; $cpe=$ip; $mac=~s/-//g; $community='public'; $output=qx(snmpwalk -v2c -t1 -c $community $cpe $snmp_rssi + 2>&1); if( $output eq "Timeout: No Response from $ip" ) { $rssi=0; $error='SNMP not responding. Upgrade firmware'; } else { @result=split(/:/,$output); $rssi=$result[3]; $rssi=~s/ //g; $rssi=~s/\n//g; if($rssi < -100) { $rssi=$rssi/100; } $rssi=int($rssi); } if(($mac ne '') && ($rssi ne '')){ #### I must check rssi in +order to avoid introducing empty values on db. $Qout->enqueue(join(',',$mac,$ip,$bsid,$rssi));## data + items to DBI }else{ $line = join(',',$type,$ip,$mac,$bsid); print "will reenqueue $line\n"; sleep(5); $Qin->enqueue($line); } } } use enum qw[ IN DBI_ENUM ]; my @Qs = map Thread::Queue->new(), 1 .. 2; # set up two Qs ## start the listener thread my $tListener = threads->create( \&listener, $Qs[ IN ] ); ## One for t +he listener to send work to the walkers ## start 10 walkers my @walkers = map{ threads->create( \&walker, @Qs[ IN, DBI_ENUM ] ) } +1 .. $T; ## And one for the walkers to forward data for adding to the + db require DBI; ## Avoid loading DBI into threads my $dbh = DBI->connect("DBI:mysql:database=cpe_info;host=172.24.3.207; +port=3306","account_process","neting.!" ); my $sth = $dbh->prepare("INSERT INTO cpe_info(mac,ip,bsid,rssi) VALUE +S (?, ?, ?, ?) ON DUPLICATE KEY UPDATE ip = ?, bsid = ?, rssi = +?"); ## process data produced by walkers #while( $Qs[DBI_ENUM]->dequeue ) { while (defined(my $item = $Qs[DBI_ENUM]->dequeue())) { ## receive +work from listener my($mac, $ip, $bsid, $rssi) = split(',', $item); #Retrieve individ +ual data print "in db task item is $item"; $sth->execute($mac, $ip, $bsid, $rssi, $ip, $bsid, $rssi); + ## bind and execute } $dbh->disconnect();
      Dear BrowserUk , this is currently running code.
      Regards,
      Leo.
      #!/usr/bin/perl -slw use strict; use threads; use Thread::Queue; our $T = 20; ## 10 walkers; adjust to suit. my $cpe; my $community; my $snmp_rssi; my $output; my $error = 'none'; my $rssi; my @result; my $line; sub listener { my( $Qout ) = @_; require IO::Socket::INET::Daemon; ## Requiring here means other +threads don't carry the redundant weight my $host = new IO::Socket::INET::Daemon( host => '172.24.3.208', port => 7777, timeout => 20, callback => { data => sub { my ($io, $host) = @_; chomp( my $line = <$io> ); return 0 unless $line; $Qout->enqueue( $line ); ## send work to listener ( +is not sending to walker ?) return !0; } }, ); $host->run; return; } sub walker { my( $Qin, $Qout ) = @_; # while( $Qin->dequeue ) { ## receive work from listener while (defined(my $item = $Qin->dequeue())) { ## receive work from + listener #print "here \$\_ is : $item"; ### my ($type, $ip, $mac, $bsid, $datecode) = split(',', $item); print "on walker we have $ip | $mac | $bsid\n"; $cpe=$ip; $mac=~s/-//g; $community='public'; $output=qx(snmpwalk -v2c -t1 -c $community $cpe $snmp_rssi + 2>&1); if( $output eq "Timeout: No Response from $ip" ) { $rssi=0; $error='SNMP not responding. Upgrade firmware'; } else { @result=split(/:/,$output); $rssi=$result[3]; $rssi=~s/ //g; $rssi=~s/\n//g; if($rssi < -100) { $rssi=$rssi/100; } $rssi=int($rssi); } if(($mac ne '') && ($rssi ne '')){ #### I must check rssi in +order to avoid introducing empty values on db. $Qout->enqueue(join(',',$mac,$ip,$bsid,$rssi));## data + items to DBI }else{ $line = join(',',$type,$ip,$mac,$bsid); print "will reenqueue $line\n"; sleep(5); $Qin->enqueue($line); } } } use enum qw[ IN DBI_ENUM ]; my @Qs = map Thread::Queue->new(), 1 .. 2; # set up two Qs ## start the listener thread my $tListener = threads->create( \&listener, $Qs[ IN ] ); ## One for t +he listener to send work to the walkers ## start 10 walkers my @walkers = map{ threads->create( \&walker, @Qs[ IN, DBI_ENUM ] ) } +1 .. $T; ## And one for the walkers to forward data for adding to the + db require DBI; ## Avoid loading DBI into threads my $dbh = DBI->connect("DBI:mysql:database=cpe_info;host=172.24.3.207; +port=3306","account_process","neting.!" ); my $sth = $dbh->prepare("INSERT INTO cpe_info(mac,ip,bsid,rssi) VALUE +S (?, ?, ?, ?) ON DUPLICATE KEY UPDATE ip = ?, bsid = ?, rssi = +?"); ## process data produced by walkers #while( $Qs[DBI_ENUM]->dequeue ) { while (defined(my $item = $Qs[DBI_ENUM]->dequeue())) { ## receive +work from listener my($mac, $ip, $bsid, $rssi) = split(',', $item); #Retrieve individ +ual data print "in db task item is $item"; $sth->execute($mac, $ip, $bsid, $rssi, $ip, $bsid, $rssi); + ## bind and execute } $dbh->disconnect();

      Please note:
      It is still returning some empty values so Im checking and re-enqueueing , the lines for the one the query did not work, It takes between 3-8 times to get a valid value.
      Im thinking about preparing another subroutine to process those wrong lines.
      I also put a little delay just to avoid a loop in the process
      On line 28 , a ask if the comment is ok
      Ok, That is all. Thanks.

Re^3: Fast provider feeding slow consumer
by leostereo (Beadle) on Apr 26, 2016 at 15:31 UTC

    Ok , the script is working , there is one good and one bad news. The bad news is that there is 1 on 10 aprox snmp queries that returns empty rssi values.
    (I manually cheked that device is alive and can process snmp queries).
    I tryed incrementing the number of workers to 20 but it has no effect.
    I wonder if is it possible to put those empty lines back on the walkers queue.
    The good news is that the processor load is 8 times less thant the script at the begining of this post.
    I need to read/learn about why threads are much efficient than fork/prefork.
    O .... lets try.
    And ...
    Thanks fo everything!

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://1161556]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others contemplating the Monastery: (7)
As of 2024-04-19 10:32 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found