stonecolddevin has asked for the wisdom of the Perl Monks concerning the following question:
I have this problem. I need to take 70G~ of JSON data from a MySQL database and migrate it over to S3. To do so, I am attempting to fork off $n workers using Parallel::Runner, grab $n (most likely 5k at a time) records from the database in staggered fashion (such that child 1 grabs 1-5000, child 2 grabs 5001-10000, and so on.), and enqueues the data to SQS. This all runs fine except when I make the call to SQS.
As you can see here, without the SQS call, the script runs swimmingly (as best as I can tell, at least):
[6516] ID: 81523, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81525, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81529, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81536, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81537, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81538, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81541, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81545, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81546, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81547, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81548, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81550, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81554, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81560, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81564, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81582, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81583, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81585, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81592, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81593, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81597, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81600, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81601, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81604, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81605, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81606, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81622, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81623, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81629, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81631, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81636, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81637, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81638, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81641, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81642, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81643, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6514] ID: 80565, last seen 15001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81750, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6514] ID: 80580, last seen 15001 at sbin/1off/enqueue-client-info.pl +line 84. [6517] ID: 37056, last seen 15001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81758, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6514] ID: 80604, last seen 15001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81644, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81759, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81647, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6517] ID: 37060, last seen 15001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81771, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81653, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6514] ID: 80614, last seen 15001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81775, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6514] ID: 80627, last seen 15001 at sbin/1off/enqueue-client-info.pl +line 84. [6517] ID: 37081, last seen 15001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81780, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6514] ID: 80629, last seen 15001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81654, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81781, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81655, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81788, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81664, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81789, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6516] ID: 81665, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81792, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81793, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81794, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81816, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81817, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81826, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81835, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81836, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81840, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81842, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81844, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81846, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84. [6515] ID: 81867, last seen 10001 at sbin/1off/enqueue-client-info.pl +line 84.
When I add the SQS call however, each child processes the same item. It's almost like SQS is taking too long and the next child is created before the $last_seen variable is updated.
[6700] 1 started in just now = 1251.65741569681/sec [6703] 1 started in just now = 261.571811661989/sec [6701] 1 started in just now = 99.9310016201277/sec [6702] 1 started in just now = 87.3977204059094/sec [6701] ID: 1185, last seen 1 at sbin/1off/enqueue-client-info.pl line +84. [6703] ID: 1185, last seen 1 at sbin/1off/enqueue-client-info.pl line +84. [6702] ID: 1185, last seen 1 at sbin/1off/enqueue-client-info.pl line +84. [6700] ID: 1185, last seen 1 at sbin/1off/enqueue-client-info.pl line +84.
Here is the script:
#!/usr/bin/env perl use strict; use warnings 'all'; use Amazon::SQS::Simple; use Parallel::Runner; use Getopt::Long; use JSON::XS; use Try::Tiny; use Data::Dump qw(ddx); use Time::HiRes 'gettimeofday'; use Time::Duration; use DBIx::Connector; GetOptions( 'queue=s' => \my $queue_name, 'max-workers=i' => \my $max_workers, 'chunk-size=i' => \my $page_size, ); die "Usage: $0 --queue=<queue-name> --max-workers=<number> --chunk-siz +e=<items> --json=file\n" unless $queue_name && $max_workers; # && $json; # && $chunk_size && +$mime_type; my $aqs = Amazon::SQS::Simple->new( ); my $queue = $aqs->CreateQueue( $queue_name ); my $runner = Parallel::Runner->new( $max_workers ); my $started_chunks = 0; my $json_xs = JSON::XS->new->utf8; my $start_time = gettimeofday(); my $started_items = 0; my $last_seen = 1; my $dsn = $ENV{'DB_DSN'} || 'dbi:mysql:bp'; my $username = $ENV{'DB_USER'} || 'root'; my $password = $ENV{'DB_PASS'} || '*'; $page_size ||= 5000; my $conn = DBIx::Connector->new($dsn, $username, $password, { RaiseError => 1, AutoCommit => 1, }); my $rows; for my $workerid (1..$max_workers) { $runner->run( sub { $started_items++; my $diff = gettimeofday() - $start_time; my $rate = $started_items / $diff; warn "[$$] $started_items started in ", concise(duration($diff)), +" = $rate/sec\n"; # warn "LAST SEEN $last_seen"; #warn "OFFSET $offset"; while ( $rows = $conn->run( sub { $_->selectall_arrayref(qq{ SELECT s.standard_image_id, s.signature, s.client_info, b. +name FROM standard_images s LEFT JOIN buckets b on s.bucket_id=b.bucket_id WHERE s.has_client_info = 1 AND s.standard_image_id > ? GROUP BY s.signature ORDER BY s.standard_image_id LIMIT ? },{} , $last_seen, $page_size ); } ) ){ try { foreach my $image ( @{ $rows } ) { my $line = encode_json { id => $image->[0], mime_type => 'application/json', client_info => decode_json $image->[2], signature => $image->[1], bucket_name => $image->[3] }; warn "[$$] ID: " . $image->[0] . ", last seen $last_seen"; sqs_do( $queue, 'SendMessage', $line ); } } catch { warn "ERROR\t$_\t$@"; }; $last_seen += $page_size; } }); } $runner->finish; sub sqs_do { my ($queue, $action, @params) = @_; my @rv; my $attempts = 5; ACTION: { @rv = eval { $queue->$action( @params ); }; if ( $@ ) { if ( --$attempts > 0 ) { select undef, undef, undef, 0.25; redo ACTION; } else { warn "Cannot $action( @params ) after 5 attempts. Giving up!\n +$@"; } } } return @rv; }
If there's a better way to do this, I am more than willing to consider it!
Thanks in advance.
Three thousand years of beautiful tradition, from Moses to Sandy Koufax, you're god damn right I'm living in the fucking past
|
|---|
| Replies are listed 'Best First'. | |
|---|---|
|
Re: Parallel::Runner and Amazon SQS Issue
by remiah (Hermit) on Sep 12, 2012 at 23:21 UTC | |
by stonecolddevin (Parson) on Sep 13, 2012 at 01:28 UTC | |
by remiah (Hermit) on Sep 13, 2012 at 06:05 UTC | |
by stonecolddevin (Parson) on Sep 13, 2012 at 16:15 UTC | |
|
Re: Parallel::Runner and Amazon SQS Issue
by runrig (Abbot) on Sep 13, 2012 at 17:06 UTC | |
by stonecolddevin (Parson) on Sep 13, 2012 at 18:45 UTC | |
|
Re: Parallel::Runner and Amazon SQS Issue
by locked_user sundialsvc4 (Abbot) on Sep 13, 2012 at 15:28 UTC | |
by stonecolddevin (Parson) on Sep 13, 2012 at 16:11 UTC |