##################################lines_dispacher.pl: #!/usr/bin/perl use IO::Socket::INET::Daemon; use Proc::Daemon; use Proc::PID::File; use IO::Handle; STDOUT->autoflush(1); my $host = new IO::Socket::INET::Daemon( host => '172.24.3.208', port => 7777, timeout => 20, callback => { data => \&data, }, ); $host->run; sub data { my ($io, $host) = @_; my $line = $io->getline; chomp($line); return 0 unless $line; print "$line\n"; return !0; } ###########################################lines_consumer_parallel2.pl: #!/usr/bin/perl use DBI; use Parallel::ForkManager; my $pm = Parallel::ForkManager->new(10); $forks =1; while(<>){ $pm->start() and next; # Parent nexts ### my ($type, $ip, $mac, $bsid, $datecode) = split(',', $_); $cpe=$ip; $mac=~s/-//g; $community='public'; $snmp_rssi = '.1.3.6.1.4.1.9885.9885.1.2.0'; $output=qx(snmpwalk -v2c -t1 -c $community $cpe $snmp_rssi 2>&1); #this is the task that delays the consumer process. if( $output eq "Timeout: No Response from $ip" ) { $rssi=0; $error='SNMP not responding. Upgrade firmware'; } else { @result=split(/:/,$output); $rssi=$result[3]; $rssi=~s/ //g; $rssi=~s/\n//g; if($rssi < -100) { $rssi=$rssi/100; } $rssi=int($rssi); } $dbh = DBI->connect("DBI:mysql:database=cpe_info;host=172.24.3.207;port=3306","account_process","neting.!"); $query = "INSERT INTO cpe_info(mac,ip,bsid,rssi) VALUES". "('$mac','$ip','$bsid','$rssi')". "ON DUPLICATE KEY UPDATE ip='$ip',bsid='$bsid',rssi='$rssi'"; $sth = $dbh->prepare($query); $sth->execute(); $dbh->disconnect(); print "we are on fork number $forks\n"; $forks++; ### $pm->finish(); }