in reply to Re^4: Using Thread::Conveyor and Parallel::ForkManager
in thread Using Thread::Conveyor and Parallel::ForkManager
Okay. Here a skeletal, completely untested, deviod-of-error-checking, attempt in 30 lines of code.
The main thread, sets up a Queue through which to communicate with the worker threads, starts 4 workers that sit blocked waiting for somethng to do. The main thread then falls into a loop monitoring the directory for log files and queueing the names of new ones for the workers to process.
As it's not clear from your spec. what criteria the process would end under, I've set up a couple of Sig handlers to do this. You'll need to adjust these to your requirements.
#! perl -slw use strict; use threads; use Thread::Queue; my $LOGDIR = "./logs"; my $WORKERS = 4; ## The basis of the communications my $Qwork = new Thread::Queue; ## Minimal error handling for posting sub compress_log { require Compress::ZLib; while( ( my $workitem = $Qwork->dequeue ) != 'DONE' ) { open IN, '<', $workitem or warn $! and next; open OUT, '> :raw', "$workitem.gz" or warn $! and next; ## Slurp the file, compress it and write it out. print OUT Compress::Zlib::memGzip( do{ $/ = \-s( IN ); <IN> } ); close OUT; close IN; unlink $workitem; } } ## Set up a method of terminating the process cleanly. ## The main loop below will terminate on ^C/^Break (on win32) ## Set appropriate SIG vales for your OS / usage. my $monitoring = 1; $SIG{INT} = $SIG{QUIT} = sub{ $monitoring = 0; }; ## start the compressor threads. my @workers = map{ threads->new( \&compress_log ) } 1 .. $WORKERS; ## Monitor the directory for new .log files my %seen; ## Track which files we have already queued. while( $monitoring ) { ## Grab the latest logs (exluding those we've already queued) my @newfiles = grep{ !defined $seen{ $_ }++ } glob( "$LOGDIR/*.log +" ); ## And queue them for compression $Qwork->enqueue( @newfiles ); ## sleep a while before looking again sleep 10; } ## You can wait for existing work items to be processed here? ## The timeout chosen appropriate to your requirements my $timeout = 60; sleep 1 while --$timeout and $Qwork->pending; ## Empty any remaining items from the work queue -- Do something with +them? $Qwork->dequeue while $Qwork->pending; ## Post 'Done' messages one per worker $Qwork->enqueue( ('DONE') x $WORKERS ); ## And wait for them to complete $_->join for @workers; ## Bye. exit;
|
|---|
| Replies are listed 'Best First'. | |
|---|---|
|
Re^6: Using Thread::Conveyor and Parallel::ForkManager
by Asgaroth (Novice) on Aug 09, 2004 at 11:21 UTC |