-based webservice stream its results to a client (incrementally) as they become available on the server-side.
I called for SOAP::Lite to stream the partially constructed XML response to the client incrementally by dispatching to webservice methods implemented as Continuations. SOAP::Lite would then re-enter the Continuation when it felt the client was ready for more data.
module which emulates a Continuaton via threads. So far it:
Dispite all of that, it works well enough to implement the scheme I envisioned (with some caveats for developers of webservices).
Continuation/Thread.pm
package Continuation::Thread;
#=====================================================================
# TODO
# * translate implicit return from continuation to yield_final
# * properly handle missing ';' on return/creturn translation
# * properly deal with continuation's wantarray semantics
# * fix translation to work on anonymous subs
# * don't touch 'return' statements in closures inside continuations
# * what to do about variables closed over by the continuation
# (right now they're unsynchronised copies)
# * add caching for translated code
#=====================================================================
# Imports
# translation-related imports
use Filter::Simple;
use PPI;
use Carp;
use Digest::MD5;
# thread-related imports
use threads;
use Thread::Queue::Any;
use Time::HiRes;
# bdsm-related imports
use strict;
use warnings;
#=====================================================================
# Call source filter on import (only executable code)
FILTER_ONLY executable => sub {
my $doc = get_doc($_);
convert_continuations($doc, {});
$_ = $doc->serialize;
};
#=====================================================================
# Class-methods called by filtered continuations
#
# Create a continuation
# INPUT: a reference to a (translated) sub
#
sub call {
my $class = shift;
my $code = shift;
my $queue = Thread::Queue::Any->new();
my $bqueue = Thread::Queue::Any->new();
my $mock = bless {}, 'Thread::Continuation::Startup';
my $thread = threads->create($code, $mock, $queue, $bqueue, @_);
$bqueue->enqueue(1);
my @vals = $queue->dequeue();
die "continuation was not ready!"
unless @vals && $vals[0] eq 'ready';
bless {
thread => $thread, queue => $queue,
bqueue => $bqueue, result => \@vals
}, $class;
}
#
# Inform the caller that the continuation is ready to run its payload.
# (Automatically inserted by translation process.)
#
sub ready {
my $class = shift;
my $queue = shift;
my $bqueue = shift;
$bqueue->dequeue();
$queue->enqueue('ready');
}
#
# Yield a value back to the caller, block the continuation;
# the value is available by calling $cont->result.
# (Automatically inserted by translation process.)
#
sub yield {
my $class = shift;
my $queue = shift;
my $bqueue = shift;
my $vals = [@_];
$bqueue->dequeue();
$queue->enqueue(1, $vals);
}
#
# Yield the final value back to the caller, allow the continuation's
# thread to join, mark the continuation as 'done'; the value is
# available by calling $cont->result.
# (Automatically inserted by translation process.)
#
sub yield_final {
my $class = shift;
my $queue = shift;
my $bqueue = shift;
my $vals = [@_];
$bqueue->dequeue();
$queue->enqueue(2, $vals);
}
#
# Cause the caller to throw an exception with information taken from
# the continuation's exception.
# (Automatically inserted by the translation process.)
#
sub yield_exception {
my $class = shift;
my $queue = shift;
my $bqueue = shift;
my $exception = shift;
$queue->enqueue(3, $exception);
}
#
# Throw an exception if the continuation is being called directly
# as a normal sub; the caller should instead use Continuation->start.
# (Automatically inserted by the translation process.)
#
sub check_direct {
my $class = shift;
my $startup = shift;
local $Carp::CarpLevel = 2;
die Carp::shortmess("Don't call continuations directly!") unless
defined($startup) &&
ref($startup) &&
ref($startup) !~ /^(?:SCALAR|ARRAY|HASH|CODE|REF|GLOB|LVALUE)$/ &&
$startup->isa('Thread::Continuation::Startup')
}
#=====================================================================
# Object-methods allowing callers to use continuations
#
# Unblock the continuation, let it run until it would yield its next
# value, or exit naturally; If a value is yielded, make it available
# via $cont->result and re-block the continuation.
#
sub continue {
my $self = shift;
$self->{bqueue}->enqueue(1);
my @vals = $self->{queue}->dequeue_dontwait();
while (!@vals) {
Time::HiRes::usleep(10000);
@vals = $self->{queue}->dequeue_dontwait();
}
if ($vals[0] == 2) {
$self->{thread}->join();
$self->{done} = 1;
} elsif ($vals[0] == 3) {
$self->{thread}->join();
$self->{done} = 1;
die "$vals[1]\n";
}
$self->{result} = $vals[1];
return $self;
}
#
# Return whether the continuation could yield further values; will be
# set to false when the continuation exits naturally, or due to some
# exception.
#
sub done {
my $self = shift;
return $self->{done} ? 1 : 0;
}
#
# Get the value most recently yielded by the continuation.
#
sub result {
my $self = shift;
return $self->{result};
}
#=====================================================================
# Continuation translation code (via PPI)
#
# Parse the supplied perl into a PPI parse tree.
#
sub get_doc
{
my $text = shift;
my $doc = PPI::Document->new(\$text);
return $doc;
}
#
# Recursively traverse the parse tree from the supplied element,
# calling the supplied code at each element visited.
#
sub traverse {
my ($elem,$code,$level) = @_;
my @args = ($elem, $elem->class);
if ($elem->can('content')) {
push @args, $elem->content;
}
$code->($level, @args);
if ($elem->can('children') && scalar($elem->children)) {
for my $child ($elem->children()) {
traverse($child,$code,$level+1);
}
}
}
#
# Travel upward in the parse tree from the supplied element,
# returning the first element found matching (ISA) the supplied
# class, if any such element is found.
#
sub find_parent_of_class {
my ($elem, $class) = @_;
my $parent = $elem->parent;
return undef unless defined $parent;
while (!$parent->isa($class)) {
$parent = $parent->parent;
return undef unless defined $parent;
}
return $parent;
}
#
# Iterate through the child elements of the supplied element,
# returning the first child found matching (EQ) the supplied
# class, if any such element is found.
#
sub find_first_child_of_class {
my ($elem, $class) = @_;
return undef unless $elem->can('children');
return undef unless ($elem->children) > 0;
for my $child ($elem->children) {
return $child if $child->class eq $class;
}
return undef;
}
#
# Traverse trailing sibling elements of the supplied element,
# returning the first sibling found to be 'significant' according
# to PPI's rules, if any such element is found.
#
sub get_next_significant_sibling
{
my ($elem) = @_;
while (my $sib = $elem->next_sibling) {
return $sib if $sib->significant;
$elem = $sib;
}
return undef;
}
#
# Given a PPI::Token::Word element, translate the containing
# PPI::Statement such that the token is removed, and the statement
# has supplied content prepended and appended.
#
sub markup_call
{
my ($elem, $class, $pre, $post, $delete, $debug) = @_;
my $st = find_parent_of_class($elem, $class, $debug);
my $es = find_first_child_of_class($st, 'PPI::Token::Structure');
# prepend creturn token with Continuation::Thread->yield stuff
my $ys = new PPI::Token('Comment', $pre);
die "[mc1]" unless $elem->insert_before($ys);
# append closing parenthesis (before next statement marker)
# FIXME: does this still work when statement has no semi-colon?
my $cp = new PPI::Token('Comment', $post);
die "[mc2]" unless $es->insert_before($cp);
# remove the creturn token
die "[mc3]" unless !$delete || $elem->delete;
}
#
# Translate subs containing 'creturn' statement into continuations:
# * insert continuation starting/ending infrastructure
# * translate creturn/return -> yield/yield_final
# * trap & translate exceptions -> yield_exception
#
sub convert_continuations
{
my ($doc, $continuations) = @_;
# traverse perl parse-tree
traverse($doc, sub {
my ($level, $elem, $class, $content) = @_;
# we're looking for the 'creturn' keyword,
# denoting a continuation
return unless $class eq 'PPI::Token::Word' &&
$content eq 'creturn';
# make sure we're in a subroutine, abort otherwise
my $sub = find_parent_of_class($elem, 'PPI::Statement::Sub');
die "illegal creturn outside of Continuation! [1]"
unless defined $sub;
# get the name of the subroutine
my $subk = find_first_child_of_class($sub, 'PPI::Token::Word');
die "illegal creturn outside of Continuation! [2]"
unless defined $subk && $subk->content eq 'sub';
my $subn = get_next_significant_sibling($subk);
die "illegal creturn outside of Continuation! [2]"
unless defined $subn && $subn->content =~ /\w/;
my $name = $subn->content;
# convert this 'creturn' statement to use 'yield'.
eval {
markup_call($elem,
'PPI::Statement',
"Continuation::Thread->yield(\$__cont_$name\_yqueue, ".
"\$__cont_$name\_bqueue, ", ")", 1);
}; die "could not create Continuation! [y] $@\n" if $@;
# there may be many creturn's in a single sub,
# only translate the surrounding sub once
if (!exists($continuations->{$name})) {
$continuations->{$name} = 1;
# find the subroutine's block
my $block = find_first_child_of_class(
$sub, 'PPI::Structure::Block');
die "illegal creturn outside of Continuation! [3]"
unless defined $block;
# at the start of the block, add 'starting'
# continuation infrastructure.
my $firstChild = ($block->children)[0];
my $vs = new PPI::Token('Comment', " ".
"Continuation::Thread->check_direct(shift);\n ".
"my \$__cont_$name\_yqueue = shift;\n ".
"my \$__cont_$name\_bqueue = shift;\n ".
"eval {\n Continuation::Thread->ready(".
"\$__cont_$name\_yqueue, \$__cont_$name\_bqueue);");
die "could not create Continuation! [4]"
unless $firstChild->insert_before($vs);
# at the end of the block, add 'ending' continuation
# infrastructure
my $last = ($block->children)[-1];
my $cl = new PPI::Token('Comment', "\n };".
"\n Continuation::Thread->yield_exception(".
"\$__cont_$name\_yqueue, \$__cont_$name\_bqueue, ".
"\$@) if \$@;");
die "could not create Continuation! [6]"
unless $last->insert_before($cl);
# FIXME: if the sub returns a value implicitly (without final
# return), insert return token...
# iterate all 'return' statements within the current subroutine
# FIXME: should not touch 'return' statements inside
# internal closures
traverse($block, sub {
my ($lvl, $relem, $rclass, $rcont) = @_;
return unless $rclass eq 'PPI::Token::Word' &&
$rcont eq 'return';
# convert this 'return' statement to use 'yield'.
eval {
markup_call($relem, 'PPI::Statement',
"Continuation::Thread->yield_final(".
"\$__cont_$name\_yqueue, ".
"\$__cont_$name\_bqueue, ", ")", 1);
}; die "could not create Continuation! [yr] $@\n" if $@;
}, 0);
# FIXME: deal with wantarray semantics
# (presuming list context for now)
}
}, 0);
}
#=====================================================================
1;