in reply to Threads slurping a directory and processing before conclusion
I am just wrapping up a project *very* similar to this right now. Except that the environment is Ubuntu Linux.
I used forks and forks::shared which mimics the threads interface.
The whole thing ran in the Amazon cloud, copying files from one S3 bucket to another S3 bucket. This meant that I was also able to scale this thing out across multiple machines, copying files together.
I wanted to be able to stop/restart/reset the thing at will, so I created a database with a "jobs" table that looked something like this:
create table job_lists ( job_list_id bigint unsigned not null primary key auto_increment, name varchar(100) not null, unique(name) ) engine=innodb charset=utf8; create table jobs ( job_id bigint unsigned not null primary key auto_increment, job_list_id bigint unsigned not null, reference_key varchar(100) not null, data text, is_started tinyint not null default 0, started_on timestamp null, is_completed tinyint not null default 0, completed_on timestamp null, unique(job_list_id, reference_key), foreign key fk_jobs_to_job_lists (job_list_id) references job_lists (job_list_id) on delete cascade ) engine=innodb charset=utf8;
The first step was to get a list of all the files to be copied, and them as records in the "jobs" table. I call this the "Enqueue" step.
The next step was to create a webservice running under Plack which has 2 functions: get_jobs (returns a list of new job data as JSON) and update_jobs - (given a JSON array of job ids, marks them all as "is_completed=1, completed_on=now()").
Finally the "Dequeue" step. A simple perl script which loops through the following:
#!/usr/bin/perl -w # Basically: use strict; use warnings 'all'; use forks; use forks::shared; use Time::HiRes 'gettimeofday'; my $max_workers = 10; my $IS_RUNNING : shared = 1; my @new_jobs : shared = ( ); my @finished_jobs : shared = ( ); my $finished_count : shared = 0; my $job_chunk_size = 10; my $start_time; # Handle signals: $SIG{INT} = $SIG{TERM} = sub { SCOPE: { lock($IS_RUNNING); $IS_RUNNING = 0; }; }; # Main loop: while( $IS_RUNNING ) { # Create some workers if we don't have enough: my $running_procs = scalar( eval { $_->is_running } threads->list ); for( $running_procs..$max_workers ) { threads->create( \&worker ); }# end for() # Get some jobs if we don't have enough: if( scalar(@new_jobs) < $max_workers * $job_chunk_size ) { if( my @unfiled_jobs = get_next_chunk_of_jobs() ) { SCOPE: { lock(@new_jobs); push @new_jobs, @unfiled_jobs; }; }# end if() }# end if() $start_time ||= gettimeofday(); # Mark-as-completed any jobs which are finished: if( @finished_jobs ) { my @out = ( ); SCOPE: { lock(@finished_jobs); @out = splice(@finished_jobs, 0, scalar(@finished_jobs)); }; mark_as_completed( @out ); }# end if() # Print the rate at which we are finishing work: my $diff = gettimeofday() - $start_time; my $rate = $finished_count / $diff; warn "[Main] $finished_count jobs done in $diff sec: $rate/sec\n"; # And wait a moment...: sleep(5); }# end while() # We are shutting down...: warn "\nShutting down...\n"; map { eval { $_->join } } threads->list; # This is what the worker processes do: sub worker { my $tid = threads->tid; CHUNK: while( $IS_RUNNING ) { my @jobs = (sub{ SCOPE: { lock(@new_jobs); return splice(@new_jobs, 0, $job_chunk_size); }; })->(); # Did we get any jobs?: unless( @jobs ) { sleep(4); next CHUNK; }# end unless() # Process the jobs: map { process_job( $_ ) } @jobs; # Finished with this chunk of jobs: SCOPE: { lock(@finished_jobs); push @finished_jobs, map { $_->{job_id} } @jobs; lock($finished_count); $finished_count += scalar(@jobs); }; }# end while() }# end worker() # Your code here: sub process_job { my ($job) = @_; # Do your thing here... }# end process_job()
Over the weekend, I processed some 4Million images (several terabytes) in about 4 hours using a script very similar to this.
|
|---|
| Replies are listed 'Best First'. | |
|---|---|
|
Re^2: Threads slurping a directory and processing before conclusion
by TRoderic (Novice) on Aug 26, 2011 at 22:04 UTC |