5haun has asked for the wisdom of the Perl Monks concerning the following question:
Oh wise and gracious monks, I really need your help understanding what is happening and what I might be able to do.
I have successfully used IPC::MPS to create a parent <==> broker(1) <==> workers(many) forked message passing architecture. The broker is forked at INIT time, and the workers are forked when the broker decides they are needed. After a worker is created, the broker just forwards messages between the parent and the workers.
I am using forks instead of threads, because I am trying to isolate some non-thread-safe code which will successfully run within the forked workers.
All was GREAT and worked SURPRISINGLY WELL, until a couple of the users introduced the requirement to be able to thread in the parent script and then pass messages to the broker. That is when I discovered the threads of the parent, will hang when they try to send a message to the broker. I have reduced the issue to the following example code:
#!/usr/bin/perl # # Create parent <==> child message relationship with socketpair() # Then thread the parent and try to contact the child use threads; use IPC::MPS; use warnings; use v5.14; my $child; INIT { $child = spawn { receive { msg hi => sub { my ($from) = @_; say "$$ - Hi there!"; snd( $from, 'hi', $$ ); }; }; }; receive { quit; }; say "My child vpid: $child"; } sub say_hi { my $ans = snd_wt( $child, 'hi'); say "$$ - My child $ans said hi"; }; say 'Single-threaded execution'; say_hi; say 'Threaded execution'; my $thr = threads->create( \&say_hi ); $thr->join; __END__ My child vpid: 18275192 Single-threaded execution 7533 - Hi there! 7532 - My child 7533 said hi Threaded execution <_HANGS_HERE_>
Note the communication works fine when not-threaded, and hangs when threaded. I suspect it may have something to do with the state of the dup'd socket file descriptors within the thread. I am open to patching IPC::MPS to support this scenario, or using a similar fork module that will work.
Any suggestions?
Here is the end of the strace output:
... clone(child_stack=0, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIG +CHLD, child_tidptr=0x7f0235ade9d0) = 8680 close(5) = 0 rt_sigprocmask(SIG_BLOCK, [PIPE], [], 8) = 0 rt_sigaction(SIGPIPE, {SIG_DFL, [], SA_RESTORER, 0x7f02351d2150}, {SIG +_IGN, [], SA_RESTORER, 0x7f02351d2150}, 8) = 0 rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0 rt_sigprocmask(SIG_BLOCK, [CHLD], [], 8) = 0 rt_sigaction(SIGCHLD, {SIG_DFL, [], SA_RESTORER, 0x7f02351d2150}, {SIG +_IGN, [], SA_RESTORER|SA_NOCLDWAIT, 0x7f02351d2150}, 8) = 0 rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0 write(1, "My child vpid: 20032376\n", 24My child vpid: 20032376 ) = 24 write(1, "Single-threaded execution\n", 26Single-threaded execution ) = 26 brk(0x15f4000) = 0x15f4000 brk(0x15ea000) = 0x15ea000 select(8, [4], [4], [4], NULL) = 1 (out [4]) select(8, [4], [4], [4], NULL) = 1 (out [4]) write(4, "\0\0\0*\4\t\01012345678\4\10\10\10\2\4\0\0\0\10\200\n\010200 +3"..., 46) = 46 select(8, [4], NULL, [4], NULL8680 - Hi there! ) = 1 (in [4]) read(4, "\0\0\0000\4\t\01012345678\4\10\10\10\2\4\0\0\0\6x\2531\1\0\0\ +0"..., 16384) = 52 write(1, "8679 - My child 8680 said hi\n", 298679 - My child 8680 said + hi ) = 29 write(1, "Threaded execution\n", 19Threaded execution ) = 19 rt_sigprocmask(SIG_BLOCK, ~[ILL BUS SEGV RTMIN RT_1], [], 8) = 0 ioctl(0, SNDCTL_TMR_TIMEBASE or TCGETS, {B38400 opost isig icanon echo + ...}) = 0 lseek(0, 0, SEEK_CUR) = -1 ESPIPE (Illegal seek) ioctl(1, SNDCTL_TMR_TIMEBASE or TCGETS, {B38400 opost isig icanon echo + ...}) = 0 lseek(1, 0, SEEK_CUR) = -1 ESPIPE (Illegal seek) ioctl(2, SNDCTL_TMR_TIMEBASE or TCGETS, {B38400 opost isig icanon echo + ...}) = 0 lseek(2, 0, SEEK_CUR) = -1 ESPIPE (Illegal seek) ioctl(3, SNDCTL_TMR_TIMEBASE or TCGETS, 0x7fff55890df8) = -1 ENOTTY (I +nappropriate ioctl for device) lseek(3, 0, SEEK_CUR) = 700 ioctl(4, SNDCTL_TMR_TIMEBASE or TCGETS, 0x7fff55890df8) = -1 ENOTTY (I +nappropriate ioctl for device) lseek(4, 0, SEEK_CUR) = -1 ESPIPE (Illegal seek) ioctl(4, SNDCTL_TMR_TIMEBASE or TCGETS, 0x7fff55890df8) = -1 ENOTTY (I +nappropriate ioctl for device) lseek(4, 0, SEEK_CUR) = -1 ESPIPE (Illegal seek) brk(0x160b000) = 0x160b000 brk(0x162c000) = 0x162c000 brk(0x164d000) = 0x164d000 brk(0x166e000) = 0x166e000 brk(0x168f000) = 0x168f000 brk(0x16b1000) = 0x16b1000 brk(0x16d2000) = 0x16d2000 mmap(NULL, 135168, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1 +, 0) = 0x7f0235933000 brk(0x16f3000) = 0x16f3000 brk(0x1714000) = 0x1714000 brk(0x1735000) = 0x1735000 brk(0x1756000) = 0x1756000 brk(0x1777000) = 0x1777000 brk(0x1798000) = 0x1798000 munmap(0x7f0235933000, 135168) = 0 mmap(NULL, 8392704, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS|MA +P_STACK, -1, 0) = 0x7f0233404000 mprotect(0x7f0233404000, 4096, PROT_NONE) = 0 clone(child_stack=0x7f0233c03ff0, flags=CLONE_VM|CLONE_FS|CLONE_FILES| +CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SE +TTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f0233c049d0, tls=0x7f0233 +c04700, child_tidptr=0x7f0233c049d0) = 8681 rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0 futex(0x7f0233c049d0, FUTEX_WAIT, 8681, NULL<_HANGS_HERE_>
Please don't suggest converting the user code to use forks or a similar user-level rewrite. The requirement for the parent to be able to thread and then pass a message to the broker from a thread is non-negotiable.
I highly appreciate any wisdom and/or direction anyone can provide, including any details that might help explain what is going wrong.
Thank you,
-Shaun
|
|---|
| Replies are listed 'Best First'. | |
|---|---|
|
Re: Desperately need help with threaded socket
by BrowserUk (Patriarch) on Jan 21, 2015 at 00:25 UTC | |
by 5haun (Scribe) on Jan 21, 2015 at 00:57 UTC | |
by 5haun (Scribe) on Jan 21, 2015 at 08:41 UTC | |
| |
|
Re: Desperately need help with threaded socket
by Mr. Muskrat (Canon) on Jan 21, 2015 at 20:16 UTC | |
|
Re: Desperately need help with threaded socket
by flexvault (Monsignor) on Jan 21, 2015 at 16:33 UTC |