#!/usr/bin/perl use strict; use warnings; use 5.010; use POSIX qw(_exit); use DBI; use File::Temp; use Time::HiRes qw(usleep); $| = 1; my $dbfile = "/tmp/test.db"; unlink $dbfile; my $dbh = DBI->connect("dbi:SQLite:$dbfile", "", "", {RaiseError => 1}); sub lazy { usleep rand 10000 } sub DBI::db::prepare_retry { my $dbh = shift; while (1) { my $sth = eval { $dbh->prepare(@_) }; return $sth if defined $sth; if ($dbh->err == 5) { print "!"; sleep 1; redo; } die ($@ || $dbh->errstr); } } $dbh->do("create table queue (id integer primary key, child, n, take)"); for my $w ('00'..'20') { fork or do { print "<$w enter>"; eval { my $dbh = DBI->connect("dbi:SQLite:$dbfile", "", "", {RaiseError => 1}); # $dbh->do("PRAGMA journal_mode = TRUNCATE"); $dbh->do("PRAGMA synchronous = OFF"); my $sth = $dbh->prepare_retry("insert into queue values (NULL, ?, ?, NULL)"); print "<$w with sth>"; for (1..1000) { lazy; local $@; eval { $sth->execute($w, $_); print $w; }; if ($@) { if ($dbh->err == 5) { print "!"; sleep 1; redo; } print "<$w error: $@>" } }; }; $@ and print "<$w error: $@>"; print "<$w exit>"; _exit(0); } } # sleep 1; for my $w ('a'..'h') { fork or do { print "<$w enter>"; eval { my $dbh = DBI->connect("dbi:SQLite:$dbfile", "", "", {RaiseError => 1}); $dbh->do("PRAGMA synchronous = OFF"); my $sth0 = $dbh->prepare_retry("select id, child, n, take from queue where take is NULL limit 1"); my $sth1 = $dbh->prepare_retry("update queue set take=? where id=? and take is NULL"); my $sth2 = $dbh->prepare_retry("delete from queue where id=?"); print "<$w with sth1 and sth2>"; my ($id, $child); while (1) { lazy; eval { $sth0->execute; ($id, $child) = $sth0->fetchrow_array; if (defined $id) { if ($sth1->execute($w, $id)) { # do something here! $sth2->execute($id); print $w } else { print "!"; } } }; if ($@) { if ($dbh->err == 5) { print "!"; sleep 1; redo; } else { print "<$w error: " . $dbh->errstr . "(".$dbh->err.")>"; die; } } last unless defined $id; } }; # $@ and print "<$w error: " . $dbh->errstr . "(".$dbh->err.")>"; print "<$w exit>"; exit(0); } } 1 while (wait > 0); my $sth = $dbh->prepare("select count(*) from queue"); $sth->execute; my $row = $sth->fetchrow_arrayref; print "\n@$row\n";