Beefy Boxes and Bandwidth Generously Provided by pair Networks
We don't bite newbies here... much
 
PerlMonks  

Re^2: Fast provider feeding slow consumer

by marioroy (Prior)
on Apr 25, 2016 at 15:02 UTC ( [id://1161461]=note: print w/replies, xml ) Need Help??


in reply to Re: Fast provider feeding slow consumer
in thread Fast provider feeding slow consumer

Update: Renamed an enum constant from DBI to DB. Thank you NetWallah.

When Perl lacks threads support, e.g. Perl on Solaris, AIX, one may use MCE::Hobo in place of threads for BrowserUk's demonstration.

A Hobo is a migratory worker inside the machine that carries the asynchronous gene. Hobos are equipped with threads-like capabilities for running code asynchronously. Unlike threads, each hobo is a unique process to the underlying OS. The IPC is managed by MCE::Shared, which runs on all the major platforms including Windows and Cygwin.

#! perl -slw use strict; use MCE::Hobo; use MCE::Shared; our $T //= 10; ## 10 walkers; adjust to suit. sub listener { my( $Qout ) = @_; require IO::Socket::INET::Daemon; ## Requiring here means other +threads don't carry the redundant weight my $host = new IO::Socket::INET::Daemon( host => '172.24.3.208', port => 7777, timeout => 20, callback => { data => sub { my ($io, $host) = @_; chomp( my $line = <$io> ); return 0 unless $line; $Qout->enqueue( $line ); ## send work to listener return !0; } }, ); $host->run; return; } sub walker { my( $Qin, $Qout ) = @_; while( $Qin->dequeue ) { ## receive work from listener my( $type, $ip, $mac, $bsid, $datecode ) = split( ',', $_ ); $mac =~ tr[-][]d; my $output = qx( snmpwalk -v2c -t1 -c public $ip .1.3.6.1.4.1. +9885.9885.1.2.0 2>&1 ); my( $rssi, $error ) = 0; if( $output eq "Timeout: No Response from $ip" ) { $error = 'SNMP not responding. Upgrade firmware'; } else { my @result = split( /:/, $output ); $rssi = $result[ 3 ]; $rssi =~ tr[ \n][]d; if( $rssi < -100 ) { $rssi = $rssi / 100; } $rssi = int( $rssi ); } $Qout->enqueue( join $;, $mac, $ip, $bsid, $rssi ); + ## Send data items to DBI as one scalar } } use enum qw[ IN DB ]; my @Qs = map MCE::Shared->queue(), 1 .. 2; + ## set up two Qs ## start the listener thread my $tListener = MCE::Hobo->create( \&listener, $Qs[ IN ] ); + ## One for the listener to send work to the walkers ## start 10 walkers my @walkers = map{ MCE::Hobo->create( \&walker, @Qs[ IN, DB ] ) } 1 .. + $T; ## And one for the walkers to forward data for adding to the db require DBI; ## Avoid loading DBI into threads my $dbh = DBI->connect( "DBI:mysql:database=cpe_info;host=172.24.3.207 +;port=3306","account_process","neting.!" ); my $sth = $dbh->prepare( "INSERT INTO cpe_info(mac,ip,bsid,rssi) VALU +ES( ?, ?, ?, ? ) ON DUPLICATE KEY UPDATE ip = ?, bsid = ?, rssi = ?" +); ## process data produced by walkers while( $Qs[ DB ]->dequeue ) { my( $mac, $ip, $bsid, $rssi ) = split $;; + ## Retrieve individual data $sth->execute( $mac, $ip, $bsid, $rssi, $ip, $bsid, $rssi ); + ## bind and execute } $dbh->disconnect(); __END__

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://1161461]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others pondering the Monastery: (6)
As of 2024-04-25 11:42 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found