I am just wrapping up a project *very* similar to this right now. Except that the environment is Ubuntu Linux.

I used forks and forks::shared which mimics the threads interface.

The whole thing ran in the Amazon cloud, copying files from one S3 bucket to another S3 bucket. This meant that I was also able to scale this thing out across multiple machines, copying files together.

I wanted to be able to stop/restart/reset the thing at will, so I created a database with a "jobs" table that looked something like this:

create table job_lists ( job_list_id bigint unsigned not null primary key auto_increment, name varchar(100) not null, unique(name) ) engine=innodb charset=utf8; create table jobs ( job_id bigint unsigned not null primary key auto_increment, job_list_id bigint unsigned not null, reference_key varchar(100) not null, data text, is_started tinyint not null default 0, started_on timestamp null, is_completed tinyint not null default 0, completed_on timestamp null, unique(job_list_id, reference_key), foreign key fk_jobs_to_job_lists (job_list_id) references job_lists (job_list_id) on delete cascade ) engine=innodb charset=utf8;

The first step was to get a list of all the files to be copied, and them as records in the "jobs" table. I call this the "Enqueue" step.

The next step was to create a webservice running under Plack which has 2 functions: get_jobs (returns a list of new job data as JSON) and update_jobs - (given a JSON array of job ids, marks them all as "is_completed=1, completed_on=now()").

Finally the "Dequeue" step. A simple perl script which loops through the following:

#!/usr/bin/perl -w # Basically: use strict; use warnings 'all'; use forks; use forks::shared; use Time::HiRes 'gettimeofday'; my $max_workers = 10; my $IS_RUNNING : shared = 1; my @new_jobs : shared = ( ); my @finished_jobs : shared = ( ); my $finished_count : shared = 0; my $job_chunk_size = 10; my $start_time; # Handle signals: $SIG{INT} = $SIG{TERM} = sub { SCOPE: { lock($IS_RUNNING); $IS_RUNNING = 0; }; }; # Main loop: while( $IS_RUNNING ) { # Create some workers if we don't have enough: my $running_procs = scalar( eval { $_->is_running } threads->list ); for( $running_procs..$max_workers ) { threads->create( \&worker ); }# end for() # Get some jobs if we don't have enough: if( scalar(@new_jobs) < $max_workers * $job_chunk_size ) { if( my @unfiled_jobs = get_next_chunk_of_jobs() ) { SCOPE: { lock(@new_jobs); push @new_jobs, @unfiled_jobs; }; }# end if() }# end if() $start_time ||= gettimeofday(); # Mark-as-completed any jobs which are finished: if( @finished_jobs ) { my @out = ( ); SCOPE: { lock(@finished_jobs); @out = splice(@finished_jobs, 0, scalar(@finished_jobs)); }; mark_as_completed( @out ); }# end if() # Print the rate at which we are finishing work: my $diff = gettimeofday() - $start_time; my $rate = $finished_count / $diff; warn "[Main] $finished_count jobs done in $diff sec: $rate/sec\n"; # And wait a moment...: sleep(5); }# end while() # We are shutting down...: warn "\nShutting down...\n"; map { eval { $_->join } } threads->list; # This is what the worker processes do: sub worker { my $tid = threads->tid; CHUNK: while( $IS_RUNNING ) { my @jobs = (sub{ SCOPE: { lock(@new_jobs); return splice(@new_jobs, 0, $job_chunk_size); }; })->(); # Did we get any jobs?: unless( @jobs ) { sleep(4); next CHUNK; }# end unless() # Process the jobs: map { process_job( $_ ) } @jobs; # Finished with this chunk of jobs: SCOPE: { lock(@finished_jobs); push @finished_jobs, map { $_->{job_id} } @jobs; lock($finished_count); $finished_count += scalar(@jobs); }; }# end while() }# end worker() # Your code here: sub process_job { my ($job) = @_; # Do your thing here... }# end process_job()

Over the weekend, I processed some 4Million images (several terabytes) in about 4 hours using a script very similar to this.


In reply to Re: Threads slurping a directory and processing before conclusion by jdrago999
in thread Threads slurping a directory and processing before conclusion by TRoderic

Title:
Use:  <p> text here (a paragraph) </p>
and:  <code> code here </code>
to format your post, it's "PerlMonks-approved HTML":



  • Posts are HTML formatted. Put <p> </p> tags around your paragraphs. Put <code> </code> tags around your code and data!
  • Titles consisting of a single word are discouraged, and in most cases are disallowed outright.
  • Read Where should I post X? if you're not absolutely sure you're posting in the right place.
  • Please read these before you post! —
  • Posts may use any of the Perl Monks Approved HTML tags:
    a, abbr, b, big, blockquote, br, caption, center, col, colgroup, dd, del, details, div, dl, dt, em, font, h1, h2, h3, h4, h5, h6, hr, i, ins, li, ol, p, pre, readmore, small, span, spoiler, strike, strong, sub, summary, sup, table, tbody, td, tfoot, th, thead, tr, tt, u, ul, wbr
  • You may need to use entities for some characters, as follows. (Exception: Within code tags, you can put the characters literally.)
            For:     Use:
    & &amp;
    < &lt;
    > &gt;
    [ &#91;
    ] &#93;
  • Link using PerlMonks shortcuts! What shortcuts can I use for linking?
  • See Writeup Formatting Tips and other pages linked from there for more info.