stevehicks has asked for the wisdom of the Perl Monks concerning the following question:

Hi

I am trying to pass data between a thread and the rest of my program and it's driving me crazy because I can't get it to work. Of course it's bound to be my lack of knowledge but I've spent days and days trying to get it to work and i'm at my wit's end. I would REALLY appreciate some help. I know the following post is quite long(even tho I tried to keep it quite short) but if you guys could possibly take a look and give me some ideas even it would be a great help.

I am writing a program that takes feeds of financial data and interrogates them. The main part of the program is generalised and does not really know anything about the data it is consuming. You start the program and passes it the name of a "plugin"(implemented as a module") thus :

MdAgent.pl --plugin UTP --config UTP --interface ce1 --mcastAddress 2 +39.11.25.104 --mcastPort 104
The Agent then "requires" the plugin(module) :
require "$PALIBDIR/$actualPlugin.pm"; <c/> It then calls the initialisation sub of the plugin : <c> $actualPlugin->moduleInit($debug, $instance);
At this point it used to enter a loop, reading packets and passing them to the sub in the plugin that understood them :
while(1) { $drop=1 unless $peer->recv($sock, $message, $maxLen,0) if($drop ==1) { $DEBUG && msg("recv failed : $!"); } else { $actualPlugin->processMessage("$message"); }
now originally in the processMessage sub of the plugin I would update a hash of hashes in the main package :
$MdAgent::TotalsHsh{$service}{$messageType}+=1;
...that was before I made the code multithreaded for performance reasons. Now, I am creating a thread that reads the packets, and then calls the processMessage sub of the plugin :
$mcastReaderThread=threads->create(\&doMcastSubscription, $interface, +$mcastAddress, $mcastPort); $mcastReaderThread->detach();
this thread is the thing that now calls the processMessage sub.

I'm aware that the area of memory used by the thread is different to the one used by the main part of the program, but I need to communicate across the boundary.

As far as I am aware I cannot used threads:shared since a hash of hashes is too complex. Now I thought I could use the old Threads model and communicate across the boundary using the same method as I did before making the program multithreaded, ie. :

$MdAgent::TotalsHsh{$service}{$messageType}+=1;

However that doesn't work(I'm assuming because the thread still has it's own seperate memory area, even though I though this would work with the old Thread model)

I've also tried creating a reference to the hash of hashes and passing it into the $mcastReaderThread thread, and then having that thread pass it into the plugin's processMessage sub, but that doesn't work either, I'm sure because of the same reasons above.

I think I could use queues but I don't want to do that since enqueuing and dequeuing will be quite an overhead when in essence all I want to do is update a counter using the thread and have it visible to the main part of the progam.

Help : (

Replies are listed 'Best First'.
Re: communication across threads using hash of hashes
by BrowserUk (Patriarch) on Dec 20, 2009 at 09:16 UTC
    As far as I am aware I cannot used threads:shared since a hash of hashes is too complex.

    Not so. Shared structures can be as complex as you like. You just have to learn a few relatively simple rules for setting them up and using them. By way of example, this do-nothing code demonstrates using a shared nested hash structure similar to that you describe.

    #! perl -slw use strict; use Data::Dump qw[ pp ]; use threads; use threads::shared; my %hash :shared; $hash{ serviceA } = &share( {} ); ## & necessary here! $hash{ serviceB } = &share( {} ); $hash{ serviceA }{ "messageType$_" } = 0 for 0 .. 9; $hash{ serviceB }{ "messageType$_" } = 0 for 0 .. 9; sub thread { while( 1 ) { select '','','', 0.01; ## Simulate doing other things my $service = 'service' . ( qw[A B] )[ rand( 2 ) ]; my $messageType = 'messageType' . int( rand 10 ); lock %{ $hash{ $service } }; ++$hash{ $service }{ $messageType }; } } threads->new( \&thread )->detach for 1 .. 4; while( 1 ) { sleep 1; lock %hash; pp \%hash; } __END__ [ 9:16:58.77] c:\test>junk2 { # tied threads::shared::tie serviceA => { # tied threads::shared::tie messageType0 => 15, messageType1 => 19, messageType2 => 23, messageType3 => 26, messageType4 => 16, messageType5 => 16, messageType6 => 18, messageType7 => 24, messageType8 => 17, messageType9 => 15, }, serviceB => { # tied threads::shared::tie messageType0 => 19, messageType1 => 15, messageType2 => 20, messageType3 => 25, messageType4 => 22, messageType5 => 30, messageType6 => 25, messageType7 => 16, messageType8 => 16, messageType9 => 24, }, } { # tied threads::shared::tie serviceA => { # tied threads::shared::tie messageType0 => 27, messageType1 => 37, messageType2 => 44, messageType3 => 48, messageType4 => 34, messageType5 => 44, messageType6 => 33, messageType7 => 48, messageType8 => 32, messageType9 => 45, }, serviceB => { # tied threads::shared::tie messageType0 => 43, messageType1 => 32, messageType2 => 38, messageType3 => 47, messageType4 => 47, messageType5 => 60, messageType6 => 36, messageType7 => 41, messageType8 => 38, messageType9 => 38, }, }

    Run the code above and note that four threads are incrementing randomly chosen counters, whilst the main thread is displaying them. Note the use of lock. Note that you cannot lock an individual key/value pair within a shared hash, you have to lock the entire (sub) hash.

    I thought I could use the old Threads model

    The old threading model is dead, (and has been for some time). Thread is now just a bad alias for threads.


    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.
      That's great thanks very,very much for replying : )

      Your code does indeed work but if I try to use the same methodology in my code it doesn't. I get :

      Thread 1 terminated abnormally: lock can only be used on shared values + at UTP.pm line 189 <c> Do you think this is because of the separate package ?? I declare and initialise the hash in MdAgent.pl : <c> ... my %Totals :shared; ... msg("initialising totals"); foreach my $envo(sort keys %{$marketMsgHsh{$plugin}{environments}}) { $DEBUG && msg("sharing $envo"); $Totals{$envo}=&share( {} ); $DEBUG && msg("done."); foreach my $messageType(sort keys %{$marketMsgHsh{$plugin}{message +Types}}) { $Totals{$envo}{$messageType}=0; } } msg("done."); ...
      ...and then after that I create the thread :
      ... $mcastReaderThread=threads->create(\&doMcastSubscription, $interface, +$mcastAddress, $mcastPort); $mcastReaderThread->detach(); ...
      in the thread I call the sub(in the "UTP" package)that actually handles the messages :
      ... while(1) { $drop=1 unless $peer->recv($sock, $message, $maxLen,0) if($drop ==1) { $DEBUG && msg("recv failed : $!"); } else { $actualPlugin->processMessage("$message"); } ...
      processMessage is defined in "UTP.pm". this is what I have it doing right now :
      ... if($FeedAgent::sendRateStats) { lock %{$Totals{$serviceID}}; $Totals{$serviceID}{$packetType}++; $DEBUG && msg("set Totals{$serviceID}{$packetType} == $Totals{ +$serviceID}{$packetType}");; } ...
      Is this going wrong because the processMessage sub is in a different package or am I doing something else wrong ??
        Do you think this is because of the separate package ??

        Kinda hard to tell from your snippets, but almost certainly yes. If you were using strict and warnings, I would expect to see some other messages that would clarify things greatly.

        You could try declaring %Totals as a global variable. Whilst they are generally to be avoided, they actually fit quite well with the concept of thread-shared data.

        our %Totals :shared;

        You'll need to do this in every package (scope) where you use it.


        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.
Re: communication across threads using hash of hashes
by zentara (Cardinal) on Dec 20, 2009 at 14:22 UTC
    ....the thing to remember with shared hashes, is that only the first level hash keys get shared by default.... deeper keys must be explicitly shared as BrowserUk showed

    I'm not really a human, but I play one on earth.
    Old Perl Programmer Haiku

      Thanks very much for your help guys. I'vd spent a while messing around with this and it does appear that it is because of the seperate package. To make things simpler I've used a scalar. This is the scene :

      I declare the shared var in the main program :
      use threads; use threads::shared; my $sharedTestVar :shared;
      I then "require" at rumtime the plugin name(which equates to a module) that was passed as a parameter to the program :
      ... if(! GetOptions ('plugin=s' => \$plugin,'config=s' => \$instanceName,' +debug' => \$debug, 'interface=s' => \$interface, 'mcastAddress=s' => +\$mcastAddress, 'mcastPort=s' => \$mcastPort, 'file=s' => \$snoopf, ' +remotePcapForwarder=s' => \$remotePCF, 'netprobe=s' => \$netprobe)) { usage(); xit("incorrect options specification", 99); } if($debug) { msg("info : turning on debug"); $ENV{TRACE}=1; $DEBUG=1; } if(! $plugin) { usage(); xit("you must supply a plugin name", 98); } msg("info : plugin \"$plugin\" will be connected"); eval (require "$PALIBDIR/$plugin.pm");
      I then assign a value to the test scalar :
      $sharedTestVar=6; $DEBUG && msg("init : set sharedTestVar == $sharedTestVar");
      I then create a thread and detach it :
      $mcastReaderThread=threads->new(\&doMcastSubscription, $interface,$mca +stAddress,$mcastPort); $mcastReaderThread->detach;
      In the doMcastSubscription sub I increment the scalar :
      sub doMcastSubscription { ... while(1) { my($message,$peer); $drop=1 unless $peer=recv($sock,$message,1400,0); if($drop == 1) { $DEBUG && msg("recv failed"); #sleep(2); } else { $actualPlugin->processMessage("$message"); } $sharedTestVar++; msg("in doMcastSubscription set sharedTestVar == $sharedTestVa +r"); $drop=0; } $DEBUG && msg("leaving doMcastSubscription()"); }
      meanwhile I look at the scalar in the main loop of the program :
      while(1) { ... msg("in main while loop, sharedTestVar == $sharedTestVar"); ... sleep(5); }
      When I do the above, it works fine, i.e the thread updates the shared scalar with no issue. However, when I move the code that increments the scalar to the file that I "required" at runtime it doesn't work. Here's what the thread is doing now :
      sub doMcastSubscription { ... while(1) { my($message,$peer); $drop=1 unless $peer=recv($sock,$message,1400,0); if($drop == 1) { $DEBUG && msg("recv failed"); #sleep(2); } else { $actualPlugin->processMessage("$message"); } $drop=0; } }
      and here's the processMessage sub in UTP.pm :
      sub processMessage { ... if($FeedAgent::sendRateStats) { $sharedTestVar++; msg("in processMessage set sharedTestVar == $sharedTestVar"); + } ... }

      I've tried declaring the shared scalar with "our" instead of "my" but this doesn't help either and not not sure what else to do as this is way past my understanding : * (

      Any ideas ?? My only thought for a workaround is to use shared memory but I'd rather not

      Once again thanks for trying to help, I really appreciate it !!

        I've fixed it, at least in terms of the shared scalar : )

        If I declare the scalar with "our", and then update it in the processMessage sub using "$FeedAgent::sharedTestVar++;" then it works.

        not sure why this works, but it does !!

        Once again thanks very much for your help guys, I'm going to need to share the deeper levels of the shared hash of hashes as you mentioned before