Respected Monks.

I picked up perl almost a year back and now use it for almost all my scripting & gui applications.

Lately I am working on a project, that requires me to stream data between threads and was wondering if there are any modules that implement this. (I am looking for something similar to get/put like in scemi-2 standards)

Either way I put together some code to implement these and would really appreciate any recommendations..

Below code.. api's put & get are the important part. (Posting rest of code for reference Please bear with me :))

# _________________________ _________________________ ______________ +___________ _________________________ #/ \/ \/ + \/ \ #\_________________________/\_________________________/\______________ +___________/\_________________________/ # Author : Rohan Bhatia - (rohan.dd.bhatia@gmail.com|rohan_bhatia@m +entor.com) package communicate; #--------------------------------------------------------------------- +------# # Other STD packages .. #--------------------------------------------------------------------- +------# use strict; use base 'Exporter'; use mUtils; use threads; use threads::shared; #--------------------------------------------------------------------- +------# # VERSION & ISA #--------------------------------------------------------------------- +------# use vars qw($VERSION @ISA); $VERSION = 1.1; #@ISA = qw(); # Not derived from any class. #--------------------------------------------------------------------- +------# # Global Vars.. #--------------------------------------------------------------------- +------# my $objId = 1010; # Variable.. Assign ID to new objects automa +tically my %objTable; #--------------------------------------------------------------------- +------# # Defaults.. for object #--------------------------------------------------------------------- +------# my %Defaults = ( '_debug' => 0, 'pipeDepth' => -1, # -1 for infinite, 0 & 1 not support +ed, will be defaulted to 2. 'pid' => undef, ); ## KeyPat -> Make a pattern to search for matching keys.. my $keyPat = join('\b|\b' , sort keys %Defaults); my %pass; #--------------------------------------------------------------------- +------# # BEGIN block, Import function #--------------------------------------------------------------------- +------# BEGIN { #eval "use "; #die "Problem loading IO::Socket : \n$@\nx---------x\n" if $@; } # The import block allows to overwrite defaults for entire package. # Parameters can still be overwritten, when a new object is created +. sub import { my $package = shift; my @args = @_; while (my ($k,$v) = splice(@args, 0, 2)) { if(exists $Defaults{$k}) { $Defaults{$k} = $v if(defined $v and $v ne ''); } else { $pass{$k} = $v; } } } #--------------------------------------------------------------------- +------# # Constructor.. #--------------------------------------------------------------------- +------# sub new { my $that = shift; my @args = @_; my $class = ref($that) || $that; my $self; my @_dataArray = undef; #-------------------------# # Some more internal vars.. #-------------------------# my (%parms, $k, $v); #params.. assign these, otherwise defaults.. k,v internal proces +sing while (($k,$v) = splice(@args, 0, 2)) { if ($k =~ /^$keyPat$/) { $parms{$k} = $v; } else { $pass{$k} = $v; } } # Assign the params .. foreach (keys %parms, keys %Defaults) { my $key = $_; $self->{$key} = exists $parms{$key} && defined $parms{$key +} ? $parms{$key} : $Defaults{$key}; } # Check pipe/stack depth. If less than 2 than it is 2 if($self->{'pipeDepth'} == 0 or $self->{'pipeDepth'} == 1) { $self->{'pipeDepth'} = 2; printMsg "Unsupported pipe depth, Pipe depth changed to: $ +self->{'pipeDepth'}","Warning"; } #Set self id.. $self->{'pid'} = $objId if(!defined $self->{'pid' +}); $self->{'_phase'} = 1; $self->{'_putInProgress'} = 0; $self->{'_getInProgress'} = 0; $self->{'_stackEmpty'} = 1; $self->{'_stackFull'} = 0; $self->{'_arrPtr'} = \@_dataArray; share($self->{'_putInProgress'}); share($self->{'_getInProgress'}); share($self->{'_stackEmpty'} ); share($self->{'_stackFull'} ); share($self->{'_arrPtr'} ); share(@_dataArray ); bless $self, $class; $self->_reset; $self->_importPackages(%pass); $objTable{$objId} = $self; # push object in object +table $objId++; return $self; } #--------------------------------------------------------------------- +------# # put - (blocking, return 0 on success) #--------------------------------------------------------------------- +------# sub put { my $self = shift; my $data = shift; #1. Check phase compliancy.. if($self->{'_phase'} < 1) { printVerb("Operation denied. Object phase incorrect!","Warnin +g"); return -1; } #2. lock put call. This blocks recursive calls to put and stre +amlines them lock($self->{'_putInProgress'}); $self->{'_putInProgress'} = 1; #3. wait for any get calls to finish. while($self->{'_getInProgress'} and $self->{'_putInProgress'} +< 2 ) { $self->{'_getInProgress'} = 2; # Set this to prevent deadl +ock in put & get last if($self->{'_stackEmpty'}); # Should not be requir +ed.. } #4. check if stack has reached its size limit if(($#{$self->{'_arrPtr'}} >= eval($self->{'pipeDepth'} -1)) a +nd ($self->{'pipeDepth'} != -1)) { lock($self->{'_stackFull'}); $self->{'_stackFull'} = 1; } #5. wait till stack has some space. This makes the call blocki +ng while($self->{'_stackFull'}) { lock($self->{'_stackFull'}); last if($self->{'_stackEmpty'}); } #6. lock the appropriate vars and put the data. lock($self->{'_arrPtr'}); lock($self->{'_stackEmpty'}); push(@{$self->{'_arrPtr'}},$data); $self->{'_putInProgress'} = 0; $self->{'_stackEmpty'} = 0; #7. return the current size of stack. return eval($#{$self->{'arrPtr'}} + 1); } #--------------------------------------------------------------------- +------# # get blocking #--------------------------------------------------------------------- +------# sub get { my $self = shift; #1. Check phase compliancy.. if($self->{'_phase'} < 1) { printVerb("Operation denied. Object phase incorrect!","Warnin +g"); return -1; } #2. lock get call. This blocks recursive calls to get and stre +amlines them lock($self->{'_getInProgress'}); $self->{'_getInProgress'} = 1; #3. wait for any put calls to finish. while($self->{'_putInProgress'} and $self->{'_getInProgress'} +< 2 ) { $self->{'_putInProgress'} = 2; # Set this to prevent dead +lock in put & get last if($self->{'_stackFull'}); # Should not be required.. + } #4. check if stack is empty if($#{$self->{'_arrPtr'}} < 0 ) { lock($self->{'_stackEmpty'}); $self->{'_stackEmpty'} = 1; } #5. wait till stack has some more data. This makes the call bl +ocking while($self->{'_stackEmpty'}) { lock($self->{'_stackEmpty'}); last if($self->{'_stackFull'}); } #6. lock the appropriate vars and get the data. lock($self->{'_arrPtr'}); lock($self->{'_stackFull'}); my $data = pop(@{$self->{'_arrPtr'}}); $self->{'_getInProgress'} = 0; $self->{'_stackFull'} = 0; #7. return the data. return $data; } 1;

Noticeable problems..

1. The while loops keep the cpu busy. I tried to implement using this locks, but was not unable to async unblock a lock. Also is there a way to check if a var is locked?

2. If I create try_put/try_get then may be parent script using these api's can yield threads..

Test application(parent script)..

use base 'Exporter'; use strict; use vars qw($VERSION @ISA @EXPORT); use threads; use threads::shared; use mUtils qw/callerDepth -1/; use communicate; #my @arr; #my @arr3; #share(@arr3); #share(@arr); my $pipe1 = communicate->new(pipeDepth => -1, callerDepth => 1, pid => + 'clientQueue'); my $key = $pipe1->getCid(); printMsg "Got key: $key","Info"; my $tPut1 = threads->create(\&keepPut1); my $tPut2 = threads->create(\&keepPut2); sleep 60; my $tGet1 = threads->create(\&keepGet1); my $tGet2 = threads->create(\&keepGet2); $tPut1->join(); $tPut2->join(); sleep 2; print "\n======================\n"; #print "Array size : $#arr3\n"; #print "Array size : $#arr\n"; print "\n======================\n"; $tGet2->join(); $tGet1->join(); sub keepPut1 { printMsg "In Put1"; my $count = 0; while($count<500) { printMsg "1. Putting Count => $count"; $pipe1->put($count); #lock(@arr3); #push(@arr3,$count); if($count%10==0) { sleep .1;} $count++; #sleep 0.1; } return 0; } sub keepPut2 { printMsg "In Put2"; my $count = 500; while($count<500) { printMsg "2. Putting Count => $count"; $pipe1->put($count); #lock(@arr3); #push(@arr3,$count); $count++; #sleep 0.1; } return 0; } sub keepGet1 { printMsg "In Get 1"; my $pipe = communicate::getObject($key); while(1) { my $countGet = $pipe1->get(); #printMsg "Got count: $countGet"; if(!defined($countGet)) { print "Vola\n"; sleep 10; } else { printMsg "1. Got from pipe => $countGet"; #lock(@arr); #push(@arr,$countGet); } #sleep 0.1; } print "Get1 ends.. "; } sub keepGet2 { printMsg "In Get 2"; my $pipe = communicate::getObject($key); while(1) { my $countGet = $pipe1->get(); #printMsg "Got count: $countGet"; if(!defined($countGet)) { print "Vola\n"; sleep 10; } else { printMsg "2. Got from pipe => $countGet"; #lock(@arr); #push(@arr,$countGet); } #sleep 0.1; } print "Get2 ends.. "; } printErr "---------------------------------";

In reply to any module with streaming pipe implementation for use with threads by Rohan_Bhatia

Title:
Use:  <p> text here (a paragraph) </p>
and:  <code> code here </code>
to format your post, it's "PerlMonks-approved HTML":



  • Posts are HTML formatted. Put <p> </p> tags around your paragraphs. Put <code> </code> tags around your code and data!
  • Titles consisting of a single word are discouraged, and in most cases are disallowed outright.
  • Read Where should I post X? if you're not absolutely sure you're posting in the right place.
  • Please read these before you post! —
  • Posts may use any of the Perl Monks Approved HTML tags:
    a, abbr, b, big, blockquote, br, caption, center, col, colgroup, dd, del, details, div, dl, dt, em, font, h1, h2, h3, h4, h5, h6, hr, i, ins, li, ol, p, pre, readmore, small, span, spoiler, strike, strong, sub, summary, sup, table, tbody, td, tfoot, th, thead, tr, tt, u, ul, wbr
  • You may need to use entities for some characters, as follows. (Exception: Within code tags, you can put the characters literally.)
            For:     Use:
    & &amp;
    < &lt;
    > &gt;
    [ &#91;
    ] &#93;
  • Link using PerlMonks shortcuts! What shortcuts can I use for linking?
  • See Writeup Formatting Tips and other pages linked from there for more info.