in reply to Inline::Java with MCE::Hobo

While i'm not familiar with MCE::Hobo (or Inline::Java for that matter), it seems it uses fork() to create new workers. While this works great in general, there are a few caveats that you have to keep in mind and check for.

For once, open file handles are shared with child processes. That means you might need to close all file handles and open new ones. Including any temporary files Inline::Java might have created. For example, Inline::Java might create some external temporary files that it can execute with the java engine in the parent process during loading of your module. When the first child process exits, it might clean up (delete) those files, thus preventing other child processes from working correctly. And/or it might share a network socket to the JVM.

Not sure how to solve this in Inline::Java, or even if it's solvable in it's current form. There is a thread on stackoverflow discussing a similar problem.

One way to work around this would be to have a master process that forks and then calls up a completely new instance of the worker script. Something along the lines of (untested, written from memory)

#!/usr/bin/env perl use strict; use warnings; use Time::HiRes qw(sleep); my $workercount = 0; # Counts active workers SIG{CHLD} = sub { # Child exit detected $workercount--; # remove child from "active" count return; } while(1) { if($workercount < 20) { my $pid = fork(); if($pid) { # parent $workercount++; # forked a new child } else { # child my $cmd = 'perl myworkerscript.pl'; eval { exec($cmd); }; exit(0); } } else { sleep(0.5); } }

There is also the funny matter of things like the pseudo-random number generator. When you fork(), all child processes will generate the same sequence of random numbers. Some protocols (and other stuff) use randomly generated unique IDs to, say for example, identify a transaction or a data block. If you launch multiple child processes that now generate the same "unique" IDs, this could lead to all sorts of strange problems. Calling srand() immediately after fork in the child (in whatever function you call from MCE::Hobo) might be a good idea.

perl -e 'use Crypt::Digest::SHA256 qw[sha256_hex]; print substr(sha256_hex("the Answer To Life, The Universe And Everything"), 6, 2), "\n";'

Replies are listed 'Best First'.
Re^2: Inline::Java with MCE::Hobo
by eraskin (Novice) on May 27, 2021 at 14:33 UTC

    Thanks. I wish the children were independent enough to do this, but they are really just an internal part of a much bigger program.

    We are in a business that receives hundreds of files a day, all in different formats. To process them, we convert them to a standard format. I had a tool that I purchased 20 years ago written in COBOL(!). Rather than keeping that up to date, I wrote a perl script to take on the functions we needed.

    The code is basically just a read-process-write loop. Read the input, parse the data, generate the output format, write the output file, track counts and statistics about the data.

    Part of "parse the data" is a call to Data Services to parse the input name and generate titles and gender codes. That's where my bottleneck is. I have taken the internals of the "parse the data" and put them into a sub. This sub reads from a queue and processes the data until the queue is empty. Each worker sub writes to an output queue. An output loop reads each record from the queue, generates the output format and writes the file. Here are some relevant parts:

    my $num_workers = 5; use SexCoder; use MCE::Hobo; use MCE::Shared; my $input_q = MCE::Shared->queue( fast => 1 ); my $output_q = MCE::Shared->queue( fast => 1 ); mce_open my $OUT, ">>", \*STDOUT or die "open error: $!"; mce_open my $ERR, ">>", \*STDERR or die "open error: $!"; MCE::Hobo->create({ posix_exit => 1 }, 'input_task', $_) for 1 .. $num +_workers; # open the files, etc... - not shown here ######### MAIN INPUT LOOP while (my $row = readdata($fhin, \$dbfrownum, \$inbuffer, @sortedfield +s)) { $input_q->enqueue($row); } ######### END OF INPUT LOOP # exit worker threads (flag end of input queue) $input_q->end(); ######### MAIN OUTPUT LOOP while(my $result = $output_q->dequeue()) { if (exists $result->{finished}) { MCE::Hobo->waitone; $output_q->end unless MCE::Hobo->pending; next; } output_row($result); # THIS OUTPUTS A ROW } ######### END OF MAIN OUTPUT LOOP exit; sub input_task { # THIS CALLS SOME JAVA CODE TO CREATE THE CONNECTION TO DATA SERV +ICES $sexcoder=SexCoder->new("$ds_server", 4000); while (my $row = $input_q->dequeue()) { my $outrow = mapdata($row); my @result = ($reject, $row, $outrow); $output_q->enqueue(\@result); } # THIS DESTROYS THE BLESSED OBJECT, CLOSING THE CONNECTION $sexcoder = undef; $output_q->enqueue({ finished => $$ }); }

    The code that actually calls the service is deep inside the mapdata() function. The relevant piece of code is:

    sub parsename() { my $dsparse = 0; if ($dsparsename == 1) { eval { my $result = $sexcoder->getsexcode($countrycode, $name); $parsedname{'TITLE'} = $result->{title}; $parsedname{'FIRST'} = $result->{first}; $parsedname{'MIDDLE'} = $result->{middle}; $parsedname{'LAST'} = $result->{last}; $parsedname{'SUFFIX'} = $result->{suffix}; $parsedname{'PROFSFX'} = $result->{profsfx}; $dsname = $result->{name}; $data->{'out$gender'} = $result->{gender}; $data->{'out$sexcode'} = $result->{sexcode}; $dsparse = 1; }; } return $dsname if $dsparse == 1; # Fall back to simpler parser if $dsparse != 1 or $dsparsename != 1 }

    Finally, here is the complete code from the SexCoder object

    package SexCoder; use strict; use warnings; use Thread::Bless; # NOT SURE IF I NEED THIS use XML::LibXML; use XML::Compile::Schema; use XML::Compile::Util qw(pack_type); use Encode; use Data::HexDump; use Inline::Java qw(caught); require Exporter; our @ISA = qw(Exporter); # Items to export into callers namespace by default. Note: do not expo +rt # names by default without a very good reason. Use EXPORT_OK instead. # Do not simply export all your public functions/methods/constants. # This allows declaration use SexCoder ':all'; # If you do not need this, moving things directly into @EXPORT or @EXP +ORT_OK # will save memory. our %EXPORT_TAGS = ( 'all' => [ qw( new getsexcode ) ] ); our @EXPORT_OK = ( @{ $EXPORT_TAGS{'all'} } ); our @EXPORT = qw( new getsexcode ); our $VERSION = '1.00'; BEGIN { $ENV{'CLASSPATH'} = "/pas/src/perl/SexCoder/lib/rtsClient.jar"; $ENV{'LINK_DIR'} = "/pas/src/perl/SexCoder/lib"; } use Inline Java => <<'END_OF_JAVA_CODE', CLASSPATH=>$ENV{CLASSPATH}, import com.businessobjects.rtsclient.*; import java.security.Security; public class dsclient { RTServiceClient rts; String keystorePasswd = "dskeystore"; String keystorePath = "/pas/src/perl/SexCoder/keystore/dskeystore. +jks"; public dsclient() throws RTServiceException { System.setProperty("javax.net.ssl.trustStore", keystorePath); Security.setProperty("javax.net.ssl.trustStorePassword", keystor +ePasswd); rts = new RTServiceClient(); } public void connect(String server, int port) throws RTServiceException { rts.connect(server, port, true); } public String invoke(String servicename, String xmlstring) throws RTServiceException { return rts.invoke(servicename, xmlstring); } public void disconnect() throws RTServiceException { rts.disconnect(); } } END_OF_JAVA_CODE ; use Inline Java => 'STUDY', STUDY => [ 'com.businessobjects.rtsclient.RTServiceClientX', 'com.businessobjects.rtsclient.RTServiceException', 'java.security.Security' ], AUTOSTUDY => 1; my $servicename = "SexCoder"; my $server; my $port; my $rts; my $sexcode_msg_schema; my $sexcode_reply_schema; sub new { my $self = bless {}, shift; # Service and Port identify the Data Services Server (ex: ds421.pasli +sts.com:4200) my $ds_server = "ds.paslists.com"; $ds_server = $ENV{DS_SERVER} if defined($ENV{DS_SERVER}); $server = exists $_[0] ? $_[0] : $ds_server; $port = exists $_[1] ? $_[1] : 4200; # Compile the XML Schema and create a reader and writer object $sexcode_msg_schema = XML::Compile::Schema->new('/pas/src/perl/SexCo +der/lib/sexcoder_msg.xsd'); $sexcode_reply_schema = XML::Compile::Schema->new('/pas/src/perl/Sex +Coder/lib/sexcoder_reply.xsd'); $self->{WRITER} = $sexcode_msg_schema->compile(WRITER => '{http://pa +slists.com/sexcoder}sexcode_msg'); $self->{READER} = $sexcode_reply_schema->compile(READER => '{http:// +paslists.com/sexcoder}sexcode_reply'); # Create a connection to Data Services eval { $self->{RTS} = new SexCoder::dsclient(); $self->{RTS}->connect($server, $port); }; if ($@) { if (caught("java.lang.Exception")) { die $@; } } # Mark the object as INIT'd $self->{INIT} = 1; return $self; } sub getsexcode { my $self = shift; # Arguments are countrycode and name my ($countrycode, $name) = @_; (my $lang=$ENV{LANG}) =~ s/^(.*)\.(.*)$/$2/; $name = decode($lang,$name) if $lang !~ /UTF-8/; # basic error checking die "SexCoder not Initialized\n" if $self->{INIT} != 1; if (!defined($self->{RTS})) { eval { $self->{RTS} = new SexCoder::dsclient(); $self->{RTS}->connect($server, $port); }; if ($@) if (caught("java.lang.Exception")) { die $@; } } } die "No Connection to Data Services defined\n" if !defined($self->{R +TS}); # build a new XML document to send the parameters in my $doc = XML::LibXML::Document->new('1.0','UTF-8'); my $data; $data->{countrycode} = $countrycode; $data->{name} = $name; my $sexcode_msg_xml = $self->{WRITER}->($doc, $data); # ask data services for the sex code and fielded name my $sexcode_reply_xml = ""; eval { $sexcode_reply_xml = $self->{RTS}->invoke($servicename, $sexcode_m +sg_xml->toString); }; if ($@) { if (caught("java.lang.Exception")) { eval { $self->{RTS}->connect($server, $port); }; if ($@) { if (caught("java.lang.Exception")) { die $@; } } else { eval { $sexcode_reply_xml = $self->{RTS}->invoke($servicename, $sex +code_msg_xml->toString); }; if ($@) { if (caught("java.lang.Exception")) { warn $@; unset $@; } } } } } # return a hash version of the XML result $sexcode_reply_xml = decode("iso-8859-1",$sexcode_reply_xml) if $lan +g !~ /UTF-8/; my $result = $self->{READER}->($sexcode_reply_xml); return $result; } # Disconnect from the server at the end DESTROY { my $self = shift; eval { $self->{RTS}->disconnect() if defined($self->{RTS}); if (caught("java.lang.Exception")) { die $@; } } } 1;

    Sorry for the long post