#!/usr/bin/perl use threads; use Thread::Queue; #require Text::CSV_XS; my $input = 'input.csv'; my $output = 'output.csv'; my @cols = qw(a b c d); my $q; my $count = 0; while(1) { $q = Thread::Queue->new; my $inth = threads->create(\&inputthread); my $outth = threads->create(\&outputthread); $inth->join(); $q->enqueue(undef); $outth->join() or die("thread crashed?\n"); $count++; print "$count\n"; } sub inputthread { require Text::CSV_XS; $csv = Text::CSV_XS->new ({ binary => 1, eol => "\n", sep_char => ",", quote_char => '"', escape_char => '"', empty_is_undef => 0, decode_utf8 => 0 }); $csv->column_names(@cols); my $fh; open($fh,'<',$input); binmode($fh); my @rows = (); while(1) { my $data; if($csv) { $data = $csv->getline_hr($fh); unless($data) { last if($csv->eof); die("Failed to parse CSV line: ".$csv->error_diag."\n"); } } else { die("Data parsed another way\n"); } my @arr = values(%$data); push(@rows,\@arr); } close($fh); $q->enqueue(\@rows); } sub outputthread { my $fh; open($fh,'>',$output); binmode($fh); my $csv; require Text::CSV_XS; $csv = Text::CSV_XS->new ({ binary => 1, eol => "\n", sep_char => ',', quote_char => '"', escape_char => '"' }); while(1) { my $rows = $q->dequeue; last unless(defined $rows); foreach my $data(@$rows) { if($csv) { $csv->print($fh,$data); } else { die("Data output another way\n"); } } } close($fh); return 1; }