leostereo has asked for the wisdom of the Perl Monks concerning the following question:

Can not get following code work.
It works only when recive the first line.
Any ideas??
Regards.
Leandro.

#!/usr/bin/perl -slw use strict; use Regexp::Common qw/ net number /; use IO::Socket::INET::Daemon; use Parallel::ForkManager; use Net::Ping; use DBI; use YAML::Tiny; ###################################################################### +################## my $pm = Parallel::ForkManager->new(10); my $host = new IO::Socket::INET::Daemon( host => 'localhost', port => 1000, timeout => 20, callback => { data => \&data, }, ); $host->run; sub data{ my ($io, $host) = @_; my $line = $io->getline; return 0 unless $line; chomp($line); if($line =~ m/$RE{net}{IPv4}{-keep}/){ print "I got ip $1\n"; my $pid = $pm->start and next ; fork_work($1); $pm->finish; # Terminates the child process return !0; } print "not a good line:",$line; return !0; } $pm->wait_all_children; ###################################################################### +################## sub fork_work { print "working from new child!!!" }
  • Comment on IO::Socket::INET::Daemon and Parallel::ForkManager working togheter
  • Download Code

Replies are listed 'Best First'.
Re: IO::Socket::INET::Daemon and Parallel::ForkManager working togheter
by roboticus (Chancellor) on Jul 01, 2017 at 00:04 UTC

    Leostereo:

    I've not tried your code, but I looked over the Parallel::ForkManager documentation and noticed one thing: Just as in the normal fork() system call, Parallel::ForkManager returns a PID (Process ID) to the parent process. P::FM uses that to tell you when you're the master/parent process vs. the child process. In the P::FM example, they use that fact to get the parent process to skip the child tasks work, which (as you'll notice) contains the $pm->finish call. Otherwise (and this is the hint!) the parent would *also* execute the $pm->finish task, and quit. (Mea culpa: you *did* have a PID check in there, I managed to miss it!)

    So I'd suggest you do more like:

    . . . callback => { data => \&data, add => \&start_new_cnx, }, . . . # Let the connection start up, and spawn a thread to handle it. # Returns 1 for both parent and child task, so the connection isn't cl +osed. sub start_new_cnx { $pm->start; return 1; } sub data { # the same as you had it, but get rid of the $pm->start call, as it +'s # already been started. Just call $pm->finish when you want to end # the connection and terminate the task. }

    I've not tried this out yet, I just read the docs of both P::FM and IO::Socket::INET::Daemon, neither of which I've used (AFAIRemember).

    Update: That wasn't your problem. I installed the packages and tried my suggestion of forking in the add callback, and had no better luck than you did. I'll try playing with it a little more and see if I can get it going.

    Update 2: OK, I'm not going to mess around with this any further. I suspected that BrowserUk probably had something multithreaded with sockets somewhere around here, and found this thread in which BrowserUk assists Random_Walk getting a similar program going. You may want to review that thread and/or search for other BrowserUk threading-related threads.

    ...roboticus

    When your only tool is a hammer, all problems look like your thumb.

Re: IO::Socket::INET::Daemon and Parallel::ForkManager working together
by marioroy (Prior) on Jul 01, 2017 at 03:57 UTC

    Hi Leostereo,

    Sometimes, a given CPAN module maybe multi-process unsafe regarding DESTROY and END blocks. From testing, that seems to be the case with IO::Socket::INET::Daemon or perhaps a dependency. You were close otherwise.

    # change this statement my $pid = $pm->start and next ; # to the following my $pid = $pm->start and return !0 ; # return 1 works too

    The code now looks like this.

    use POSIX (); ... my $pm = Parallel::ForkManager->new(10); $pm->set_waitpid_blocking_sleep(0); my $host = new IO::Socket::INET::Daemon( host => 'localhost', port => 1000, timeout => 20, callback => { data => \&data, }, ); $host->run; $pm->wait_all_children; sub data{ my ($io, $host) = @_; my $line = $io->getline; return 0 unless $line; chomp($line); if ($line =~ m/$RE{net}{IPv4}{-keep}/) { my $ip = $1; print "I got ip $ip\n"; my $pid = $pm->start and return !0; fork_work($ip); $pm->finish; # not reached due to calling POSIX::_exit } print "not a good line:", $line; return !0; } sub fork_work { my ($ip) = @_; print "working from new child - $ip\n"; # A given CPAN module maybe multi-process unsafe regarding # DESTROY and END blocks. Fortunately, there's a way around it. POSIX::_exit(0); }

    Another possibility is having background workers poll the IP from a queue, not shown here due to lack of time.

    Regards, Mario

      I've toyed around a demonstration, mainly out of curiosity. The example spawns a server process and 20 client processes via MCE::Flow. The server process itself has 10 background helpers.

      For safety, never construct $pm inside a worker process. See Bugs and Limitations.

      use strict; use warnings; use IO::Socket::INET; use Parallel::ForkManager; use MCE::Flow; use POSIX (); # Configure 10 bg_procs max for the server process. # Never construct $pm inside a worker process. my $pm = Parallel::ForkManager->new(10); $pm->set_waitpid_blocking_sleep(0); # Configure MCE options. For this demonstration, # 1 server process and 20 client processes. # Stop the server after clients complete. MCE::Flow::init { posix_exit => 1, max_workers => [ 1 , 20 ], task_name => [ 'S', 'C' ], task_end => sub { my ($mce, $task_id, $task_name) = @_; if ($task_name eq 'C') { my ($client, $reply); $client = new IO::Socket::INET( PeerHost => 'localhost', PeerPort => 5000, ); $reply = $client->getline; $reply =~ s/\r?\n//; $client->print("stop\n"); $client->shutdown(SHUT_RDWR); $client->close; } }, }; # Run parallel 1 server process, 20 client processes. mce_flow \&server, sub { my ($client, $reply); sleep 1; # Each client establishes a connection 4 times. for my $i ( 1 .. 4 ) { $client = new IO::Socket::INET( PeerHost => 'localhost', PeerPort => 5000, ); $reply = $client->getline; $reply =~ s/\r?\n//; print "[$$:$i] got message : $reply\n"; $client->print("Test $$:$i\n"); $reply = $client->getline; $reply =~ s/\r?\n//; print "[$$:$i] got reply : $reply\n"; $client->print("quit\n"); $client->shutdown(SHUT_RDWR); $client->close; } }; exit; sub server { require IO::Socket::INET::Daemon; IO::Socket::INET::Daemon->new( port => 5000, timeout => 20, callback => { add => \&add, remove => \&remove, data => \&data, }, )->run; $pm->wait_all_children; } sub add { my $io = shift; $io->print("Welcome\n"); return 1; } sub remove { my $io = shift; } sub data { my ($io, $host) = @_; my $line = $io->getline; $line =~ s/\r?\n//; if ($line eq 'quit') { return 0; } elsif ($line eq 'stop') { $host->stop; } else { $io->print("Reply $line\n"); $pm->start and return 1; fork_work($line); $pm->finish; # not reached } } sub fork_work { my ($line) = @_; print "[$$] working from child -- $line\n"; sleep 1; # simulate work # A given CPAN module maybe multi-process unsafe regarding # DESTROY and END blocks. Fortunately, there's a way around it. POSIX::_exit(0); }

      The demonstration completes in 9 seconds. Initially, clients sleep for 1 second giving time for the server process. Although there are 20 clients, the server process allows 10 background processes max. Each client loops 4 times for a total of 80 connections. There's also a delay for simulating work.

      [24169:1] got message : Welcome [24167:1] got message : Welcome [24155:1] got message : Welcome [24169:1] got reply : Reply Test 24169:1 [24170] working from child -- Test 24169:1 [24164:1] got message : Welcome [24167:1] got reply : Reply Test 24167:1 [24155:1] got reply : Reply Test 24155:1 [24171] working from child -- Test 24167:1 [24172] working from child -- Test 24155:1 [24153:1] got message : Welcome [24164:1] got reply : Reply Test 24164:1 [24173] working from child -- Test 24164:1 ... [24151:4] got message : Welcome [24156:4] got reply : Reply Test 24156:4 [24246] working from child -- Test 24156:4 [24165:4] got message : Welcome [24151:4] got reply : Reply Test 24151:4 [24247] working from child -- Test 24151:4 [24154:4] got message : Welcome [24165:4] got reply : Reply Test 24165:4 [24154:4] got reply : Reply Test 24154:4 [24248] working from child -- Test 24165:4 [24249] working from child -- Test 24154:4

      Regards, Mario

        ... once again marioroy, a beautiful example. I'm beginning to suspect that you are an AI program. :-)

        I'm not really a human, but I play one on earth. ..... an animated JAPH
Re: IO::Socket::INET::Daemon and Parallel::ForkManager working togheter
by leostereo (Beadle) on Jul 04, 2017 at 18:30 UTC

    Dear all , thanks for the sugestions ... finally I got I workaround with this.
    I will take IO::Socket out and use ncat linux and a pipe command instead.
    Currently Im running the script like this:
    ncat -l 1000 | ./accounting_processor_ver2.pl

    And the accounting processor is:

    #!/usr/bin/perl -slw use strict; use Regexp::Common qw/ net number /; use IO::Socket::INET::Daemon; use Parallel::ForkManager; use Net::Ping; use DBI; use YAML::Tiny; my $pm = Parallel::ForkManager->new(10); while(<>){ my $pid = $pm->start() and next; print "starting fork $$\n"; fork_work($_); print "deleting fork $$\n"; $pm->finish; # Terminates the child process } sub fork_work { my $line = shift @_; if($line =~ m/$RE{net}{IPv4}{-keep}/){ $ip = $1; }else{ print "not a good line:",$line; return 0; } my $ping_result = 0; my $p = new Net::Ping('icmp',1); for(my $a=0 ; $a < 1 ; $a++){ # print "attempt $a for $ip\n"; if ($p->ping($ip)){ $ping_result = 1; last; } } if ($ping_result){ #print "$ip is ALIVE\n"; return 0; }