Beefy Boxes and Bandwidth Generously Provided by pair Networks
Come for the quick hacks, stay for the epiphanies.
 
PerlMonks  

Re: Forking Multiple Regex's on a Single String (use threads)

by BrowserUk (Patriarch)
on Aug 19, 2006 at 23:26 UTC ( [id://568405]=note: print w/replies, xml ) Need Help??


in reply to Forking Multiple Regex's on a Single String

Instead of speculating about what might work, here's a threaded solution that does.

I've C&P's most of the code from your OP, but I've made various changes to adapt it to my environment. I only had one chromosome file kicking around ( Chr_22_14-09-2001 Release 3, [nucleotoides 13100001-47848585] ), so I used that multiple times. And for the motifes I just clipped out a few short (80 character) sequences at random.

The output goes to STDOUT. You could redirect this to a file via the command line or open STDOUT to a file before spawning the threads. On my system which has only 1 cpu, there is no conflict with writing to a file from multiple threads. Most threaded C-runtimes would serialise this for you, but if yours does not, it would only require the addition of 1 shared variable and two locks to do so.

The code creates a separate thread for each chromosome file in the list and passes the chromosome name as a paramater to each thread. Depending upon the number of chromosome files you are processing, it might be better to limit the number of concurrent threads--but given that each thread is also doing IO to the same file, it might be ok as is for upto 2 or 3 times the number of chromosomes as you have cpus. It will require a substantial amount of memory as written if the chromosome files are large, but with better knowledge of the task, the code can be easily adapted.

The motifs file is read and loaded into a shared hash and re-used by all the threads.

#! perl -slw use strict; use Carp; use threads; use threads::shared; my $chr_file = $ARGV[0]; my $seq_file = $ARGV[1]; if ( 2 != scalar(@ARGV) ) { croak 'Invalid parameter number'; } elsif ( ! -e $chr_file || ! -T $chr_file ) { croak 'Missing or invalid chromsome-listing file'; } elsif ( ! -e $seq_file || ! -T $seq_file ) { croak 'Missing or invalid sequence-listing file'; } my %motifs : shared; sub thread { my $chromosome = shift; my $directory = 'fasta/'; my $file = 'chr_'.$chromosome.'.fa'; my $path = $directory.$file; # my $seqio = Bio::SeqIO->new( # -file => "<$path", # -format => 'largefasta' # ); # my $seq = $seqio->next_seq(); # my $sequence = $seq->seq(); ## Crude fasta load--Expects 1 sequence per file open my $fh, '<', $path or croak "$path : $!\n"; <$fh>; ## discard header ( my $sequence = do{ local $/; <$fh> } ) =~ s[\s+][]g; close $fh; foreach my $motif ( keys(%motifs) ) { my $str = $motifs{$motif}; my $len = length($str); my $pos = 0; while ( ($pos = index( $sequence, $str, $pos)) >= 0 ) { print join "\t", $chromosome, $pos, $motif; $pos += $len; } } } open my $fh_seq, '<', $seq_file or croak "Unable to open motif file: $seq_file"; while (<$fh_seq>) { s/^\s+//; s/\s+$//; my @row = split("\t"); next if 2 != scalar @row; $motifs{ $row[0] } = $row[1]; } close($fh_seq); my @chromosomes; open my $fh_chr, '<', $chr_file or croak "Unable to open chromsome list: $chr_file"; while (<$fh_chr>) { s/^\s+//; s/\s+$//; my $row = $_; next if !$row; push @chromosomes, $row; } close($fh_chr); my @threads = map{ threads->new( \&thread, $_ ) } @chromosomes; $_->join for @threads; __END__ C:\test>568393 fasta\chromosomes.lst fasta\motifs.lst 22_14-09-2001 6666540 6 22_14-09-2001 6666540 6 22_14-09-2001 6666540 6 22_14-09-2001 6666540 6 22_14-09-2001 6666540 6 22_14-09-2001 6666540 6 22_14-09-2001 6666540 6 22_14-09-2001 6666540 6 22_14-09-2001 6666540 6 22_14-09-2001 6666540 6 22_14-09-2001 6666540 6 22_14-09-2001 6540 3 22_14-09-2001 6540 3 22_14-09-2001 6540 3 22_14-09-2001 6540 3 22_14-09-2001 6540 3 22_14-09-2001 6540 3 22_14-09-2001 6540 3 22_14-09-2001 6540 3 22_14-09-2001 6540 3 22_14-09-2001 6540 3 22_14-09-2001 6540 3 22_14-09-2001 12666540 7 22_14-09-2001 12666540 7 22_14-09-2001 12666540 7 22_14-09-2001 12666540 7 22_14-09-2001 12666540 7 22_14-09-2001 24666540 9 22_14-09-2001 12666540 7 22_14-09-2001 12666540 7 22_14-09-2001 12666540 7 22_14-09-2001 12666540 7 22_14-09-2001 12666540 7 22_14-09-2001 12666540 7 22_14-09-2001 540 2 22_14-09-2001 24666540 9 22_14-09-2001 540 2 22_14-09-2001 24666540 9 22_14-09-2001 24666540 9 22_14-09-2001 24666540 9 22_14-09-2001 540 2 22_14-09-2001 540 2 22_14-09-2001 24666540 9 22_14-09-2001 540 2 22_14-09-2001 24666540 9 22_14-09-2001 24666540 9 22_14-09-2001 540 2 22_14-09-2001 24666540 9 22_14-09-2001 24666540 9 22_14-09-2001 540 2

It's easier to respond to questions if there are any than to try guess, and comment on what you might not understand.


Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
Lingua non convalesco, consenesco et abolesco. -- Rule 1 has a caveat! -- Who broke the cabal?
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.

Replies are listed 'Best First'.
Re^2: Forking Multiple Regex's on a Single String (use threads)
by bernanke01 (Beadle) on Aug 21, 2006 at 21:46 UTC

    Wow ++BrowserUK, this is so cool. I've never done threaded perl before and in fact had to build a threaded perl in my test environment to work with this. I think after some reflection I get how this works, but there are two things I'm not getting.

    The first (and biggest) is: how is the printing working? I don't see anything to ensure that writes from two different threads don't "collide" and corrupt the output file. Is that somehow taken care of automatically?

    The second (and minor) thing is how to get a limit of, say $limit concurrent threads running. Would something simple like this be appropriate? I just keep a counter of the number of active threads. I do realize I could probably do this smarter by storing the threads in a hash keyed by chromosome or something like that -- just checking if the general approach of run, check for joins, add when joining is complete -- is appropriate.

    Update: the code I had before was fatally flawed in so many ways I'm embarrassed. Here's a corrected version (with the original tarnished one at the bottom)

    my @waiting = @chromosomes; my %threads; my $max_threads = 10; while ( scalar(@waiting) || scalar( keys(%threads) ) ) { if ( scalar( keys(%threads) ) < $max_threads ) { my $chr = pop(@waiting); $threads{$chr} = threads->new(\&threading_function, $c +hr); } else { foreach my $chr ( keys(%threads) ) { if ( $threads{$chr}->is_joinable() ) { $threads{$chr}->join(); delete $threads{$chr}; } } } }



    my $limit = 10; my $processes = 0; my $processed = scalar(@chromosomes); my @threads; while ($processed) { if ( scalar(@threads) < $limit ) { push @threads, threads->new( \&thread, $_ ); ++$processes; } else { foreach my $thread (@threads) { if ( $thread->is_joinable() ) { $thread->join(); --$processes; } } } }

    Many thanks again -- you've opened up my eyes/mind to threaded programming!

      As I mentioned above, if the threaded C-runtime your copy of perl is built against is any good, it will take care of serialising access to global resources (like STDOUT) for you. However, if in practice you find that the output from the threads is getting intermingled, then the following two additional lines to my code above should prevent that (with the caveat that I don't have a multi-cpu machine to test this on):

      ... my %motifs : shared; my $semaphore : shared; ################# Added. ... while ( ($pos = index( $sequence, $str, $pos)) >= 0 ) { lock $semaphore; ################################# Added. print join "\t", $chromosome, $pos, $motif; $pos += $len; }

      There are several ways to approach limiting the number of concurrent threads. I would normally advocate using a pool of threads, but given the size of the data sets being loaded in each thread, discarding them after each use and spawning a new one is probably the safest way of ensuring most of the memory is returned to the heap for re-use.

      Your revised code looks good, but I would suggest a couple of changes.

      1. Continue to loop and join joinable threads even after there are no more files to process.
      2. Check for joinable threads even if it isn't time to start a new one yet.
      3. Insert a sleep into the loop to prevent this thread from running away with the cpu.

      Something like this (untested) code:

      my @waiting = @chromosomes; my %threads; my $max_threads = 10; ## Continue to join threads as the become joinable ## whilst there are still threads running while( @waiting or threads->list( threads::running ) ) { if( keys %threads < $max_threads ) { my $chr = pop @waiting; $threads{ $chr } = threads->new( \&threading_function, $chr ); } ## Do this regardless of whether it's time to start a new thread. foreach my $chr ( keys %threads ) { if( $threads{ $chr }->is_joinable ) { $threads{ $chr }->join; delete $threads{ $chr }; } } sleep 1; ## Prevent this thread consuming all spare cpu }

      Don't get hung up on my changes to your formatting. When you're used to seeing things a certain way, it's easier to follow the logic when the code looks that way. I'm not advocating my preferences here.


      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      Lingua non convalesco, consenesco et abolesco. -- Rule 1 has a caveat! -- Who broke the cabal?
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://568405]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others lurking in the Monastery: (6)
As of 2024-04-26 09:22 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found