package SF::DBFork; use strict; use warnings; use threads; use Data::Dumper; use Thread::Queue; use Time::HiRes qw(time usleep); use threads::shared; use POSIX ":sys_wait_h"; use IO::Handle; use IO::Pipe; sub new { my ($junk, $function) = @_; my $self = {}; bless $self; $self->{func} = $function; return $self; } sub CommThread { my ($pipe, $q) = @_; threads->detach; $pipe->reader(); while (my $msg = <$pipe>) { if ($msg eq "KILL_COMMS"){ threads->exit(); } $q->enqueue($msg); } } sub Spawn { my $self = shift; my $communications = shift; my $argnum = shift; my @arguments; for (my $i=0; $i{comms} = 1; $self->{fork_pipe} = IO::Pipe->new(); $self->{main_pipe} = IO::Pipe->new(); } else { $self->{comms} = 0; } my $fid = fork(); if ($fid) { $self->{fork} = $fid; if ($self->{comms}) { $self->{queue} = Thread::Queue->new; $self->{thr} = threads->create(\&CommThread, $self->{fork_pipe}, $self->{queue}); $self->{main_pipe}->writer(); $self->{main_pipe}->autoflush(1); } usleep (5000); return $fid; } else { no strict 'refs'; my $func = $self->{func}; $self->{fork}=0; sleep(1); if ($self->{comms}){ $self->{queue} = Thread::Queue->new; $self->{thr} = threads->create(\&CommThread, $self->{main_pipe}, $self->{queue}); $self->{fork_pipe}->writer(); $self->{fork_pipe}->autoflush(1); } $func->(@arguments, $self); if ($self->{comms}) { $self->SendMessage("KILL_COMMS"); } } } sub ReceiveMessage() { my $self = shift; my $timeout = shift; if (!$self->{comms}) { warn "Trying to receive a message on a fork with no communications..."; return undef; } if ($timeout) { return ($self->{queue}->dequeue_nb()); } return ($self->{queue}->dequeue()); } sub SendMessage() { my $self = shift; my $msg = shift; my $pipe; if ($self->{comms}) { if ($self->{fork}) { $pipe = $self->{main_pipe}; } else { $pipe = $self->{fork_pipe}; } print $pipe $msg; print $pipe "\n"; } } 1; #### #! /usr/bin/perl use strict; use warnings; use threads; use Error qw(:try); use Data::Dumper; use Time::HiRes qw(time sleep); use Time::HiRes qw(time usleep); use Thread::Queue; use threads::shared; use POSIX ":sys_wait_h"; use IO::Handle; use SF::DBFork; my $DEAD_KIDS = 0; my %forks; my $maxKids = 5; sub Persistent() { my $parent = shift; my $count = 0; my $numkids = 0; while (1){ if ($numkids < $maxKids) { $parent->SendMessage(++$count); $numkids++; } my $q = $parent->ReceiveMessage(); if ($q) { chomp $q; $numkids--; warn "[P $$] Received message from parent:"; warn Dumper($q); } else { last; } } } sub Ephemeral() { my $q = shift; chomp $q; warn "[E $$] Received message from parent:"; warn Dumper($q); exit(); } $SIG{CHLD} = \&childHandler; sub childHandler { $DEAD_KIDS++; warn "-Caught SIGCHLD ($DEAD_KIDS) [$$]\n"; $SIG{CHLD} = \&childHandler; } sub Main() { my $persistent = SF::DBFork->new(\&Persistent); my $fid = $persistent->Spawn(1, 0); my $ephemeral = SF::DBFork->new(\&Ephemeral); while (1) { if ($DEAD_KIDS) { $DEAD_KIDS = 0; my $dead_kid; while (($dead_kid = waitpid(-1, WNOHANG)) > 0) { if ($dead_kid == $fid) { die "The persistent thread has died! I'm gonna quit!"; } else { my $q = $forks{$dead_kid}; warn "[M $$] Child $dead_kid ended."; if ($q) { warn "[M] Sending dead kid message."; $persistent->SendMessage($q); warn "[M] Message sent."; delete $forks{$dead_kid}; warn "[M] dead kid deleted from hash."; } } } } warn "[M] Going to ReceiveMessage"; my $q = $persistent->ReceiveMessage(1); warn "[M] Exiting ReceiveMessage"; if ($q) { $forks{$ephemeral->Spawn(0,1,$q)} = $q; } usleep(200 * 1000); } } Main();