in reply to IO::Socket::INET::Daemon and Parallel::ForkManager working togheter

Hi Leostereo,

Sometimes, a given CPAN module maybe multi-process unsafe regarding DESTROY and END blocks. From testing, that seems to be the case with IO::Socket::INET::Daemon or perhaps a dependency. You were close otherwise.

# change this statement my $pid = $pm->start and next ; # to the following my $pid = $pm->start and return !0 ; # return 1 works too

The code now looks like this.

use POSIX (); ... my $pm = Parallel::ForkManager->new(10); $pm->set_waitpid_blocking_sleep(0); my $host = new IO::Socket::INET::Daemon( host => 'localhost', port => 1000, timeout => 20, callback => { data => \&data, }, ); $host->run; $pm->wait_all_children; sub data{ my ($io, $host) = @_; my $line = $io->getline; return 0 unless $line; chomp($line); if ($line =~ m/$RE{net}{IPv4}{-keep}/) { my $ip = $1; print "I got ip $ip\n"; my $pid = $pm->start and return !0; fork_work($ip); $pm->finish; # not reached due to calling POSIX::_exit } print "not a good line:", $line; return !0; } sub fork_work { my ($ip) = @_; print "working from new child - $ip\n"; # A given CPAN module maybe multi-process unsafe regarding # DESTROY and END blocks. Fortunately, there's a way around it. POSIX::_exit(0); }

Another possibility is having background workers poll the IP from a queue, not shown here due to lack of time.

Regards, Mario

Replies are listed 'Best First'.
Re^2: IO::Socket::INET::Daemon and Parallel::ForkManager working together
by marioroy (Prior) on Jul 01, 2017 at 04:07 UTC

    I've toyed around a demonstration, mainly out of curiosity. The example spawns a server process and 20 client processes via MCE::Flow. The server process itself has 10 background helpers.

    For safety, never construct $pm inside a worker process. See Bugs and Limitations.

    use strict; use warnings; use IO::Socket::INET; use Parallel::ForkManager; use MCE::Flow; use POSIX (); # Configure 10 bg_procs max for the server process. # Never construct $pm inside a worker process. my $pm = Parallel::ForkManager->new(10); $pm->set_waitpid_blocking_sleep(0); # Configure MCE options. For this demonstration, # 1 server process and 20 client processes. # Stop the server after clients complete. MCE::Flow::init { posix_exit => 1, max_workers => [ 1 , 20 ], task_name => [ 'S', 'C' ], task_end => sub { my ($mce, $task_id, $task_name) = @_; if ($task_name eq 'C') { my ($client, $reply); $client = new IO::Socket::INET( PeerHost => 'localhost', PeerPort => 5000, ); $reply = $client->getline; $reply =~ s/\r?\n//; $client->print("stop\n"); $client->shutdown(SHUT_RDWR); $client->close; } }, }; # Run parallel 1 server process, 20 client processes. mce_flow \&server, sub { my ($client, $reply); sleep 1; # Each client establishes a connection 4 times. for my $i ( 1 .. 4 ) { $client = new IO::Socket::INET( PeerHost => 'localhost', PeerPort => 5000, ); $reply = $client->getline; $reply =~ s/\r?\n//; print "[$$:$i] got message : $reply\n"; $client->print("Test $$:$i\n"); $reply = $client->getline; $reply =~ s/\r?\n//; print "[$$:$i] got reply : $reply\n"; $client->print("quit\n"); $client->shutdown(SHUT_RDWR); $client->close; } }; exit; sub server { require IO::Socket::INET::Daemon; IO::Socket::INET::Daemon->new( port => 5000, timeout => 20, callback => { add => \&add, remove => \&remove, data => \&data, }, )->run; $pm->wait_all_children; } sub add { my $io = shift; $io->print("Welcome\n"); return 1; } sub remove { my $io = shift; } sub data { my ($io, $host) = @_; my $line = $io->getline; $line =~ s/\r?\n//; if ($line eq 'quit') { return 0; } elsif ($line eq 'stop') { $host->stop; } else { $io->print("Reply $line\n"); $pm->start and return 1; fork_work($line); $pm->finish; # not reached } } sub fork_work { my ($line) = @_; print "[$$] working from child -- $line\n"; sleep 1; # simulate work # A given CPAN module maybe multi-process unsafe regarding # DESTROY and END blocks. Fortunately, there's a way around it. POSIX::_exit(0); }

    The demonstration completes in 9 seconds. Initially, clients sleep for 1 second giving time for the server process. Although there are 20 clients, the server process allows 10 background processes max. Each client loops 4 times for a total of 80 connections. There's also a delay for simulating work.

    [24169:1] got message : Welcome [24167:1] got message : Welcome [24155:1] got message : Welcome [24169:1] got reply : Reply Test 24169:1 [24170] working from child -- Test 24169:1 [24164:1] got message : Welcome [24167:1] got reply : Reply Test 24167:1 [24155:1] got reply : Reply Test 24155:1 [24171] working from child -- Test 24167:1 [24172] working from child -- Test 24155:1 [24153:1] got message : Welcome [24164:1] got reply : Reply Test 24164:1 [24173] working from child -- Test 24164:1 ... [24151:4] got message : Welcome [24156:4] got reply : Reply Test 24156:4 [24246] working from child -- Test 24156:4 [24165:4] got message : Welcome [24151:4] got reply : Reply Test 24151:4 [24247] working from child -- Test 24151:4 [24154:4] got message : Welcome [24165:4] got reply : Reply Test 24165:4 [24154:4] got reply : Reply Test 24154:4 [24248] working from child -- Test 24165:4 [24249] working from child -- Test 24154:4

    Regards, Mario

      ... once again marioroy, a beautiful example. I'm beginning to suspect that you are an AI program. :-)

      I'm not really a human, but I play one on earth. ..... an animated JAPH