Re: PerlIO file handle dup
by BrowserUk (Patriarch) on Mar 07, 2017 at 07:31 UTC
|
#! perl -slw
use strict;
use threads;
use threads::shared;
our $T //= 8;
my @buffers; share( $buffers[ $_ ] ) for 0 .. $T-1;
my $stdoutSem :shared;
sub reader {
my $fname = shift;
open my $fh, '<', $fname or die $!;
my $next = 0;
while( <$fh> ) {
chomp;
lock $buffers[ $next ];
$buffers[ $next ] = $_;
$next = ++$next % $T;
}
close $fh;
for( 0 .. $T -1 ) {
lock $buffers[ $_ ];
$buffers[ $_ ] = undef;
}
}
sub worker {
my $tid = threads->tid;
my $bufn = shift;
my $localbuf;
while( 1 ) {
{
lock $buffers[ $bufn ];
last unless defined( $buffers[ $bufn ] );
$localbuf = $buffers[ $bufn ];
}
## process localbuf here.
{
lock $stdoutSem;
print "[$tid] processed record: '", $localbuf, "'";
}
}
}
my $reader = threads->new( \&reader, $ARGV[ 0 ] );
my @workers = map threads->new( \&worker, $_ ), 0 .. $T-1;
$reader->join;
$_->join for @workers;
With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
In the absence of evidence, opinion is indistinguishable from prejudice.
| [reply] [d/l] |
|
|
I'm not sure I tried passing data between threads with a shared array, but I think performance would be similar to a queue, which was too slow.
| [reply] |
|
|
but I think performance would be similar to a queue,
The problem with a queue is that all locking is applied to the entire shared array, thus every lock blocks all contenders, even if they are after different elements of the array.
If you look carefully at my example, you'll see that the @buffers array itself isn't (explicitly) shared and is never locked; only the per-thread scalar elements are.
And as only the reader thread and 1 worker thread per buffer are competing for any given lock, all workers threads are free to continue independantly of each other.
And finally, the lock on any given buffer is only held for the brief time it takes to copy its contents to a local buffer, thus the reader thread can be repopulating it with the next record whilst the worker thread is processing the previous one.
The upshot is that in my use of this technique, it beats Thread::Queue by a wide margin for applications where the processing of a record takes 3x or more time, than that required to read it.
Test it. You might be pleasantly surprised.
With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
In the absence of evidence, opinion is indistinguishable from prejudice.
| [reply] [d/l] |
|
|
You lose sequence. I had to autoflush output to keep it from getting jumbled. I didn't realize you can share elements of an array without sharing the whole array. Can you do the same with hashes?
| [reply] |
|
|
You lose sequence.
Sorry, I hadn't realised that was a requirement. I'll think about that.
Can you do the same with hashes?
Yes. But be aware that if you don't lock the hash and insertions and deletions are going on, things can get a little strange. (Eg. Iterating the hash keys might return a key, but by the time you try to access the associate value, the key might have been deleted.
If the hash is essentially static -- only the values changing -- then only sharing the value scalars and applying locking to them individually can be much more efficient than locking the entire hash for every update.
With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
In the absence of evidence, opinion is indistinguishable from prejudice.
| [reply] |
|
|
Re: PerlIO file handle dup
by marioroy (Prior) on Mar 07, 2017 at 08:45 UTC
|
Greetings,
Welcome to the world of multi-threads or cores for that matter. It's one thing to make possible on the UNIX platform and altogether something else on the Windows platform. And not to forget Cygwin.
For me, Perl is a box of crayons. MCE and MCE::Shared are my paintings. I imagine folks around the world joined together in a relay race. I have no idea what we're aiming for. It just happens to be my turn at this moment in time. The tools given me are a box of crayons named Perl, a laptop, and an agent name Perseverance. Perseverance brought along a long-time friend named Grace. Grace invited Randomness, an Uncle.
The thing is that MCE and MCE::Shared may not be perfect. They are paintings, after all. Paintings take time to paint.
In regards to any slowness, IMHO, let the script fly. Oh, please do. For this use-case, a semaphore is not necessary. Nor is yield. Upon starting a thread, that thread will begin interacting with the shared-manager immediately. That is why the same thread ID is shown repeatedly for many lines in the output. Eventually, the 2nd thread has completed spawning and joins the 1st thread. Thread 3 joins later due to being spawned last.
I up'ed the count to 100k. The OP's script with semaphore + yield takes 3.6 seconds to run on a laptop (2.6 GHz - Core i7 Haswell). Removing the semaphore + yield allows the script to complete in 1.1 seconds. The latter includes threads writing to a shared output handle. In case it was missed, I removed the line to autoflush STDOUT; e.g. $| = 1. There's no reason to slow down IO. Let Perl fly. Ditto for MCE::Shared and workers.
use strict;
use threads;
use MCE::Shared;
{
open my $fh, '|-', 'gzip > test.txt.gz';
foreach (1..100000) {
print {$fh} sprintf('%04d',$_).('abc123' x 10)."\n";
}
close $fh;
}
{
mce_open my $fh, '-|', 'gzip -cd test.txt.gz' or die "open error:
+$!\n";
mce_open my $out, '>', \*STDOUT or die "open error: $!\n";
my @thrs;
foreach (1..3) {
push @thrs, threads->create('test');
}
$_->join() foreach @thrs;
close($fh);
sub test {
my $tid = threads->tid();
# using shared output to not garble among threads
while ( my $line = <$fh> ) {
print {$out} "thread: $tid, line: $., ".$line;
}
}
}
It can run faster. To be continued in the next post.
Regards, Mario.
| [reply] [d/l] |
|
|
Greetings,
To decrease the number of trips to and from the shared-manager, one can provide a suffix (k * 1024) or (m * 1024 * 1024) for the 3rd argument to read. That there enables chunk IO. Not to worry, the shared-manager completes reading until reaching the end of line or record. Notice $. It is the chunk_id, not the actual line number. The chunk_id value is important when output order is desired.
OP's script involving semaphore + yield: 3.6 seconds. Shared handle (non-chunking): 1.1 seconds.
Below, chunking completes in 0.240 seconds which is the total running time including initial gzip.
use strict;
use threads;
use MCE::Shared;
{
open my $fh, '|-', 'gzip > test.txt.gz';
foreach (1..100000) {
print {$fh} sprintf('%04d',$_).('abc123' x 10)."\n";
}
close $fh;
}
{
mce_open my $fh, '-|', 'gzip -cd test.txt.gz' or die "open error:
+$!\n";
mce_open my $out, '>', \*STDOUT or die "open error: $!\n";
my @thrs;
foreach (1..3) {
push @thrs, threads->create('test');
}
$_->join() foreach @thrs;
close($fh);
sub test {
my $tid = threads->tid();
# using shared output to not garble among threads
while (1) {
my $n_chars = read $fh, my($buf), '4k';
last if (!defined $n_chars || $n_chars <= 0);
print {$out} "## thread: $tid, chunkid: $.\n".$buf;
}
}
}
Regards, Mario.
| [reply] [d/l] |
|
|
If we read one record at a time, the input semaphore isn't needed. However, I'm reading 500 records at a time, and they need to be in sequence. I suppose if I read in and processed one record at a time, I could eliminate the input semaphore when MCE::Shared is being used (probably not for regular file handles). However, I think that would make output slower since each thread needs to block until its processed data is the next to be written.
I only put the yield in there because the first thread seemed to be hogging all the input before the other threads even started. In my actual script I'm not using MCE::Shared for the output file, and autoflush is needed to keep the output in order.
So this
read $fh, my($buf), '4k';
is the same but faster than this?
my $buf = <$fh>;
If it always reads exactly one entire record regardless of "chunk size", what does the chunk size do exactly? Or is the chunk size a minimum, then it continues reading until EOL? It is confusing that MCE's read works fundamentally differently from Perl's read.
I don't suppose there is a "readlines" function for MCE file handles? I assume if I could read all 500 lines at a time, that would minimize overhead related to MCE. For delimited input, I'm currently letting Text::CSV_XS read from the file handle, though.
| [reply] [d/l] [select] |
|
|
read $fh, my($buf), '4k';
is most definitely Perl's read. HTH. | [reply] [d/l] |
|
|
|
|
|
|
In this context, a record is one line; e.g. $/ = "\n". When the 3rd argument to read contains a suffix 'k' or 'm', then it slurps up (e.g. '4k') including till the end of line, not EOL. This read behavior applies to MCE::Shared::Handle only. When missing the suffix 'k' or 'm', read behaves exactly like the native read.
Yes, I had thought about adding readlines at the time. But, decided against it after writing the following.
my @lines = tied(*{$fh})->readlines(10);
In the end, I settled on having the file-handle specifics feel like native Perl and it does. The 'k' or 'm' suffix (extra behavior) provides chunk IO. Likewise, $. giving you chunk_id. One can get an extimate by "cat csv file | head -500 | wc". Take that and divide by 1024, append the k suffix to use with read. IMHO, there's no reason for workers to receive the same number of lines. Some will get a little less, some a little more.
A possibility that comes to mind is having MCE::Shared export "mce_read" to provide full MCE-like chunk IO capabilites. A value greater than 8192 means to read number of bytes including till the end of line. If doing so, the following will only work for handles constructed with mce_open.
# same as chunk_size => 1 in MCE
$n_lines = mce_read $fh, \@lines, 1;
# read max 500 lines
$n_lines = mce_read $fh, \@lines, 500;
# read 1m, including till the end of line
$n_lines = mce_read $fh, \@lines, '1m';
# read 16k, ditto regarding till the end of line
$n_lines = mce_read $fh, \@lines, '16k';
# same thing as above, but slurp into $buf
$n_chars = mce_read $fh, $buf, 500;
$n_chars = mce_read $fh, $buf, '1m';
$n_chars = mce_read $fh, $buf, '16k';
$. gives chunk_id
Regards, Mario.
| [reply] [d/l] [select] |
|
|
|
|
Re: PerlIO file handle dup
by marioroy (Prior) on Sep 03, 2018 at 00:16 UTC
|
Hi chris212,
At the time of my last reply, I didn't realize you had made an update. Disclaimer: Oh, btw, I'm not here to push MCE. Please use the module of your liking. Depending on the OS and/or number of workers, MCE::Mutex may run faster than Thread::Semaphore.
Thread::Semaphore
use strict; use warnings;
use threads;
use Thread::Semaphore;
use MCE::Shared;
use Time::HiRes 'time';
my $condvar = MCE::Shared->condvar;
my $sem = Thread::Semaphore->new;
# Start the shared server. Not necessary if Perl has IO::FDPass.
MCE::Shared->start;
sub test {
$condvar->wait;
for (1..10000) {
threads->yield;
$sem->down;
$sem->up;
}
}
threads->create('test') for 1..3;
$condvar->broadcast(0.5);
my $start = time;
$_->join for threads->list;
printf "duration: %0.03f secs\n", time - $start;
MCE::Mutex::Channel
use strict; use warnings;
use threads;
use MCE::Mutex;
use MCE::Shared;
use Time::HiRes 'time';
my $condvar = MCE::Shared->condvar;
my $mutex = MCE::Mutex->new;
# Start the shared server. Not necessary if Perl has IO::FDPass.
MCE::Shared->start;
sub test {
$condvar->wait;
for (1..10000) {
threads->yield;
$mutex->lock;
$mutex->unlock;
}
}
threads->create('test') for 1..3;
$condvar->broadcast(0.5);
my $start = time;
$_->join for threads->list;
printf "duration: %0.03f secs\n", time - $start;
MCE::Mutex::Flock
use strict; use warnings;
use threads;
use MCE::Mutex;
use MCE::Shared;
use Time::HiRes 'time';
my $condvar = MCE::Shared->condvar;
my $mutex = MCE::Mutex->new( impl => 'Flock' );
# Start the shared server. Not necessary if Perl has IO::FDPass.
MCE::Shared->start;
sub test {
$condvar->wait;
for (1..10000) {
threads->yield;
$mutex->lock;
$mutex->unlock;
}
}
threads->create('test') for 1..3;
$condvar->broadcast(0.5);
my $start = time;
$_->join for threads->list;
printf "duration: %0.03f secs\n", time - $start;
Results
My laptop is a late 2013 Macbook Pro, 2.6 Ghz i7 quad CPU. Each virtual machine is configured with 4 cores.
* 3 threads: CentOS Linux 7.3
Thread::Semaphore 0.386 secs
MCE::Mutex::Channel 0.162 secs
MCE::Mutex::Flock 0.144 secs
* 3 threads: Windows 7
Thread::Semaphore 0.293 secs
MCE::Mutex::Channel 0.499 secs
MCE::Mutex::Flock 0.498 secs
* 20 threads: CentOS Linux 7.3
Thread::Semaphore 41.897 secs
MCE::Mutex::Channel 0.980 secs
MCE::Mutex::Flock 0.702 secs
* 20 threads: Windows 7
Thread::Semaphore 35.521 secs
MCE::Mutex::Channel 2.994 secs
MCE::Mutex::Flock 3.322 secs
Regards, Mario | [reply] [d/l] [select] |
|
|
Using the OP's example, I modified it to do locking via MCE::Mutex. Afterwards, toyed with a couple MCE demonstrations. All demonstrations output orderly.
OP's example, locking via MCE::Mutex
#!/opt/perl/bin/perl
use strict;
use threads;
use MCE::Mutex;
use MCE::Shared;
open my $fh, '|-', 'gzip > test.txt.gz';
foreach (1..10000) {
print {$fh} sprintf("%05d%s\n", $_, ('abc123' x 10));
}
close $fh;
mce_open $fh,'-|','gzip -cd test.txt.gz'
or die "Failed to uncompress: $!\n";
$| = 1;
my @threads = ();
my $mutex = MCE::Mutex->new();
foreach (1..3) {
push @threads, threads->create(\&test);
}
$_->join() foreach @threads;
close $fh;
print "\n";
sub test {
my $tid = threads->tid();
my $line;
while(1) {
threads->yield();
$mutex->lock();
$line = <$fh> or last;
print "Thread $tid ".$line;
$mutex->unlock();
}
$mutex->unlock;
}
MCE, no chunking
#!/opt/perl/bin/perl
use strict;
use threads;
use MCE;
open my $fh, '|-', 'gzip > test.txt.gz';
foreach (1..10000) {
print {$fh} sprintf("%05d%s\n", $_, ('abc123' x 10));
}
close $fh;
open $fh, '-|', 'gzip -cd test.txt.gz'
or die "Failed to uncompress: $!\n";
$| = 1;
# MCE spawns threads when threads is present
MCE->new(
chunk_size => 1, max_workers => 3,
input_data => $fh,
init_relay => 1,
user_func => sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
my $tid = threads->tid();
MCE::relay sub {
print "Thread $tid ".$chunk_ref->[0];
};
}
)->run;
close $fh;
print "\n";
MCE, chunking enabled
#!/opt/perl/bin/perl
use strict;
use threads;
use MCE;
open my $fh, '|-', 'gzip > test.txt.gz';
foreach (1..10000) {
print {$fh} sprintf("%05d%s\n", $_, ('abc123' x 10));
}
close $fh;
open $fh, '-|', 'gzip -cd test.txt.gz'
or die "Failed to uncompress: $!\n";
$| = 1;
# MCE spawns threads when threads is present
MCE->new(
chunk_size => 500, max_workers => 3,
input_data => $fh,
init_relay => 1,
user_func => sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
my $tid = threads->tid();
my $buf = '';
foreach my $line ( @{ $chunk_ref } ) {
$buf .= "Thread $tid ".$line;
}
MCE::relay sub {
print $buf;
};
}
)->run;
close $fh;
print "\n";
Results taken from a Linux CentOS 7.3 VM
My laptop is a late 2013 Macbook Pro, 2.6 Ghz i7 quad CPU. The Linux virtual machine is configured with 4 cores.
* 3 workers
time perl script_sem.pl | wc -l # 0.543s OP's example
time perl script_mutex.pl | wc -l # 0.663s Ditto, MCE::Mutex
time perl script_mce.pl | wc -l # 0.225s MCE, no chunking
time perl script_chunk.pl | wc -l # 0.077s MCE, chunking enabled
* 10 workers
time perl script_sem.pl | wc -l # 1.072s OP's example
time perl script_mutex.pl | wc -l # 0.726s Ditto, MCE::Mutex
time perl script_mce.pl | wc -l # 0.255s MCE, no chunking
time perl script_chunk.pl | wc -l # 0.112s MCE, chunking enabled
* 20 workers
time perl script_sem.pl | wc -l # 1.849s OP's example
time perl script_mutex.pl | wc -l # 0.803s Ditto, MCE::Mutex
time perl script_mce.pl | wc -l # 0.339s MCE, no chunking
time perl script_chunk.pl | wc -l # 0.179s MCE, chunking enabled
Regards, Mario | [reply] [d/l] [select] |