in reply to Solving the Long List is Long challenge, finally?
There is one missing in the mix :)
Kyoto Cabinet is the successor to Tokyo Cabinet. This new variant also creates 32 hash databases (by default) into a temp directory. Crypt::xxHash is used to determine which database to insert/update. Sorting is handled by Sort::Packed. Similar to the DB_File variant, this is best run on a Unix OS.
Usage: KEYSIZE=N NUM_THREADS=N NUM_MAPS=N perl llilkch.pl file... perl llilkch.pl --keysize=N --threads=N --maps=N file... perl llilkch.pl --keysize=N --threads=max --maps=max file...
Running:
$ perl llilkch.pl big{1,2,3}.txt | cksum Kyoto Cabinet hash database - start fixed string length=12, threads=8, maps=32 get properties : 3.348 secs pack properties : 1.187 secs sort packed data : 0.960 secs write stdout : 0.775 secs total time : 6.273 secs count lines : 10545600 count unique : 10367603 2956888413 93308427 $ perl llilkch.pl --threads=16 --maps=64 big{1,2,3}.txt | cksum Kyoto Cabinet hash database - start fixed string length=12, threads=16, maps=64 get properties : 1.925 secs pack properties : 0.723 secs sort packed data : 0.965 secs write stdout : 0.387 secs total time : 4.005 secs count lines : 10545600 count unique : 10367603 2956888413 93308427 $ perl llilkch.pl --threads=24 --maps=64 big{1,2,3}.txt | cksum Kyoto Cabinet hash database - start fixed string length=12, threads=24, maps=64 get properties : 1.420 secs pack properties : 0.538 secs sort packed data : 0.975 secs write stdout : 0.286 secs total time : 3.225 secs count lines : 10545600 count unique : 10367603 2956888413 93308427 $ perl llilkch.pl --threads=48 --maps=max big{1,2,3}.txt | cksum Kyoto Cabinet hash database - start fixed string length=12, threads=48, maps=128 get properties : 0.908 secs pack properties : 0.372 secs sort packed data : 0.969 secs write stdout : 0.205 secs total time : 2.462 secs count lines : 10545600 count unique : 10367603 2956888413 93308427
llilkch.pl
## # This demonstration requires Kyoto Cabinet. # homepage http://fallabs.com/kyotocabinet/ # documentation http://fallabs.com/kyotocabinet/perldoc/ # # Installation: # wget http://fallabs.com/kyotocabinet/pkg/kyotocabinet-1.2.80.tar.g +z # wget http://fallabs.com/kyotocabinet/perlpkg/kyotocabinet-perl-1.2 +0.tar.gz # # macos: please refer to https://perlmonks.org/?node_id=1198574 for +tips # # tar xzf kyotocabinet-1.2.80.tar.gz && cd kyotocabinet-1.2.80 # ./configure --disable-lzo --disable-lzma # enabling requires lzo/ +lzma dev pkgs # make # make install # Note: you may need to use "sudo" # cd .. # # tar xzf kyotocabinet-perl-1.20.tar.gz && cd kyotocabinet-perl-1.20 # perl Makefile.PL # make # make install # Note: you may need to use "sudo" # cd .. ## use strict; use warnings; no warnings 'uninitialized'; use KyotoCabinet; use Crypt::xxHash qw(xxhash64); use Sort::Packed qw(sort_packed); use Time::HiRes qw(time); use MCE::Signal qw($tmp_dir -use_dev_shm); use MCE; sub usage { die "Usage: [KEYSIZE=N] [NUM_THREADS=N] [NUM_MAPS=N] perl $0 file... +\n". " perl $0 [--keysize=N] [--threads=N] [--maps=N] file...\n +". " perl $0 [--keysize=N] [--threads=max] [--maps=max] file. +..\n"; } @ARGV or usage(); my $NUM_CPUS = MCE::Util->get_ncpu(); my $KEY_SIZE = $ENV{KEYSIZE} || 12; my $NUM_THDS = $ENV{NUM_THREADS} || 8; my $NUM_MAPS = $ENV{NUM_MAPS} || 32; while ($ARGV[0] =~ /^--?/) { my $arg = shift; $KEY_SIZE = $1, next if $arg =~ /-keysize=(\d+)$/; $NUM_THDS = $1, next if $arg =~ /-threads=(\d+)$/; $NUM_THDS = $NUM_CPUS, next if $arg =~ /-threads=max$/; $NUM_MAPS = $1, next if $arg =~ /-maps=(\d+)$/; $NUM_MAPS = 128, next if $arg =~ /-maps=max$/; usage(); } $NUM_THDS = $NUM_CPUS if $NUM_THDS > $NUM_CPUS; $NUM_MAPS = 128 if ($NUM_MAPS > 128); #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Setup. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ print {*STDERR} "Kyoto Cabinet hash database - start\n"; print {*STDERR} "fixed string length=${KEY_SIZE}, threads=${NUM_THDS}, + maps=${NUM_MAPS}\n"; our @MM; # Let's have Kyoto Cabinet handle locking because we're not doing FETC +H/STORE. # Instead, we're calling "increment" to increment the value (single ca +ll). # if ($^O =~ /cygwin|MSWin32/) { # # On Cygwin, use Channel instead for better performance. # $MM[$_] = MCE::Mutex->new(impl => "Channel") # for (0 .. $NUM_MAPS - 1); # } else { # $MM[$_] = MCE::Mutex->new(impl => "Flock", path => "$tmp_dir/$_. +sem") # for (0 .. $NUM_MAPS - 1); # } # Open DB function. # Each child must open the DB file separately. sub open_db { my ($idx, $omode) = @_; # hash (*.kch db) is faster than tree (*.kct db) for this demonstr +ation my $db = KyotoCabinet::DB->new(); my $path = "$tmp_dir/$idx.kch#bnum=500000"; # Hash database #my $path = "$tmp_dir/$idx.kct#pccap=128m"; # B+ Tree database $db->open($path, $omode) or die "Open error '$path': $!"; return $db; } # Create the databases. { open_db($_, KyotoCabinet::DB::OWRITER | KyotoCabinet::DB::OCREATE) for (0 .. $NUM_MAPS - 1); } #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Get properties. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ my ($start1, $process_count, $num_lines) = (time, 0, 0); our ($NUM_LINES, %TEMP) = (0); # child vars my $mce = MCE->new( max_workers => $NUM_THDS, chunk_size => 65536, gather => sub { $num_lines += $_[0] }, posix_exit => 1, use_slurpio => 1, user_func => sub { my ($mce, $slurp_ref, $chunk_id) = @_; open my $input_fh, '<', $slurp_ref; while (<$input_fh>) { my ($key, $count) = split /\t/; my $idx = xxhash64($key, 0) % $NUM_MAPS; $TEMP{$idx}{$key} += $count; $NUM_LINES++; } close $input_fh; }, user_end => sub { my $omode = KyotoCabinet::DB::OREADER | KyotoCabinet::DB::OWRI +TER; for my $idx (keys %TEMP) { # Acquire the lock before opening the DB file. Must also c +lose. # $MM[$idx]->lock_exclusive; # my $db = open_db($idx, $omode); # while (my ($key, $count) = each %{ $TEMP{$idx} }) { # my $val = $db->get($key); # $db->set($key, $val + $count); # } # $db->close; # $MM[$idx]->unlock; my $db = open_db($idx, $omode); while (my ($key, $count) = each %{ $TEMP{$idx} }) { $db->increment($key, $count); } $db->close; } MCE->gather($NUM_LINES); $NUM_LINES = 0, %TEMP = (); }, ); for my $fname (@ARGV) { warn("'$fname': Is a directory, skipping\n"), next if (-d $fname); warn("'$fname': No such file, skipping\n"), next unless (-f $fname +); warn("'$fname': Permission denied, skipping\n"), next unless (-r $ +fname); ++$process_count, $mce->process($fname) if (-s $fname); } $mce->shutdown; # reap workers printf {*STDERR} "get properties : %9.3f secs\n", time - $start1; exit unless $process_count; #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Pack data for sorting. # Each worker handles a unique DB. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ my $VAL_SIZE = length pack('l', 0); my $STR_SIZE = $KEY_SIZE + 1; # null-terminated my $PACK_SIZE = $STR_SIZE + $VAL_SIZE; my $FETCH_SIZE = $PACK_SIZE * 12000; sub pack_task { my ($mce, $seq_id, $chunk_id) = @_; my $db = open_db($seq_id, KyotoCabinet::DB::OREADER); my ($num_rows, $kv_pairs) = (0, ''); # Calling increment above? Because Kyoto serializes the value as a +n # 8-byte integer in big-endian order, they should be processed wit +h # the 'unpack' function with the 'q>' operator after retrieval. my $cur = $db->cursor; $cur->jump; my ($key, $val); while (($key, $val) = $cur->get(1)) { $num_rows += 1; #$kv_pairs .= pack("lZ${STR_SIZE}", -($val), $key); $kv_pairs .= pack("lZ${STR_SIZE}", -(unpack 'q>', $val), $key) +; } $cur->disable; $mce->gather($num_rows, $kv_pairs); } my ($start2, $unique, $data) = (time, 0, ''); # Spin up MCE workers to handle packing and output. $mce = MCE->new( max_workers => $NUM_THDS, chunk_size => 1, init_relay => 1, posix_exit => 1, user_func => sub { my $task = MCE->user_args->[0]; no strict 'refs'; $task->(@_); }, ); # Pack data for sorting. $mce->process({ user_args => [ 'pack_task' ], sequence => [ 0, $NUM_MAPS - 1 ], chunk_size => 1, gather => sub { $unique += $_[0]; $data .= $_[1]; }, }); printf {*STDERR} "pack properties : %9.3f secs\n", time - $start2; #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Output data by value descending, word ascending. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Get the next down value for integer division. sub divide_down { my ($dividend, $divisor) = @_; return int($dividend / $divisor) if $dividend % $divisor; return int($dividend / $divisor) - 1; } # Return a chunk of $data: manager responding to worker request. sub fetch_chunk { my ($seq_id) = @_; return substr($data, $seq_id * $FETCH_SIZE, $FETCH_SIZE); } # Worker task: unpack chunk and write directly to standard output. sub disp_task { my ($mce, $seq_id, $chunk_id) = @_; my ($output, $chunk) = ('', $mce->do('fetch_chunk', $seq_id)); while (length $chunk) { my ($val, $key) = unpack( "lZ$STR_SIZE", substr($chunk, 0, $PACK_SIZE, '') ); $output .= $key. "\t". -($val). "\n"; } MCE::relay { print $output; }; } if (length $data) { my $start3 = time; sort_packed "C$PACK_SIZE", $data; printf {*STDERR} "sort packed data : %9.3f secs\n", time - $start3 +; my $start4 = time; $mce->process({ user_args => [ 'disp_task' ], sequence => [ 0, divide_down(length($data), $FETCH_SIZE) ], chunk_size => 1, }); printf {*STDERR} "write stdout : %9.3f secs\n", time - $start4 +; } $mce->shutdown; # reap workers @MM = (); printf {*STDERR} "total time : %9.3f secs\n", time - $start1; printf {*STDERR} " count lines : %lu\n", $num_lines; printf {*STDERR} " count unique : %lu\n", $unique;
|
---|
Replies are listed 'Best First'. | |
---|---|
Re^2: Solving the Long List is Long challenge - Kyoto Cabinet
by hippo (Archbishop) on Jul 14, 2023 at 08:52 UTC | |
by marioroy (Prior) on Jul 15, 2023 at 07:21 UTC | |
by marioroy (Prior) on Jul 17, 2023 at 05:27 UTC | |
by marioroy (Prior) on Jul 17, 2023 at 05:33 UTC | |
by marioroy (Prior) on Jul 14, 2023 at 14:47 UTC |