Tanktalus has asked for the wisdom of the Perl Monks concerning the following question:

I may be a bit off my rocker, so this may end up being an XY Problem, but I suppose I'll ask anyway. Quite often I look at code and see it working in a stream fashion. Not just working on a list and returning another list like map, but more like working on a stream that flows along, where the current code doesn't need to modify the entire list right now, but could pass on what it does to the next phase and we can get to the next piece of data at some future point. Or where it has to be delayed, it's not optional (think "flush" or Expect where you want to wait for the next trigger before deciding what the next item to be processed is going to be).

Of course, just to make this more difficult, I'm also thinking about streams being actual streams, as in multi-process. Simple iterators (my $next = $iter->();) don't quite work here... subprocesses are rather stubborn about not calling such things.

The reason for this line of thought is in duplicating shell pipelines, and interjecting perl code - beginning, middle, or end. Or any combination. Perhaps, for example, you want to run something where you're going to feed data (that you pulled from a database) into some other commands, modifying them before going into the next commands, and then taking the output and doing something with it (maybe sending it back via CGI to the browser). Ok, I'm not doing all of that, but as long as I'm generifying it in the first place, may as well dream big.

I've started - but got the easy part done: the end. That is, no producing, just reading from an IO::Handle interface (specifically, IO::Pipe::End, I think). I was thinking that if there was a convenient derivation from IO::Handle to handle such callbacks, that'd be great, though I'm not sure how well that'd actually work, or if a much larger situation (subprocesses and/or using select or IO::Select) would be needed.

Thoughts?

Update: I'm really bad at examples ;-) Ignore the CGI part - it was just being used as an example where the output needed to be modified and sent somewhere dynamic that wasn't necessarily a file. Maybe a different example. In shell, you may have:

eval $cmd1_with_opts | grep "$myfilter" | ssh someuser@somehost -e ' +cat > /mnt/usbdrive/somefile.txt'
The eval is there because $cmd1_with_opts might be something like cat "/home/user/My Documents"/*.txt (yes, this gets really tricky when you combine spaces and globs). And it's actually that eval that gets annoying to get right - so I was just thinking that reproducing it in perl would allow me to use File::Glob::bsd_glob and avoid the shell to avoid breaking up things on spaces. And then I got to thinking that the grep in the middle could get complex to the point where it'd be awesome to allow perl to sit there (stream in, stream out) without having to put that code into another file, possibly without another process at all, and, then what if we wanted perl to be the one to grab the input to the pipeline (so that perl would be the first process in the stream, too)? I thought that if I could just create some sort of IO::Handle-derivative to sit in there, that'd be great. I'm just not sure how to do that, and I'm kind of doubting it can be done. If it were, I'm sure there's a monk out there that could give me an appropriate push - not necessarily to write it for me, but maybe point to an existing module (which I've not been able to find, but I may be misinterpreting it) or hints on how to proceed, if at all possible.

What I do realise is probably possible is playing with open's '-|' or '|-' options to create a subprocess to do that - though that would work for the beginning, it's not quite so good for the ending. Also possible is to create an IO::Pipe and fork. This could work for both input and output - but is precisely the syntax that I was trying to avoid in the first place by putting all of this in one, well-tested place. I could pass in a CODE ref and fork it off in my utility module, but that gets a bit ugly in that it seems like action at a distance: since it will be forked off, it won't really be a closure anymore. Without forking, those CODE refs could change any variable they closed on and it'd still be changed after the pipe was completed.

Replies are listed 'Best First'.
Re: IO:: ... dynamic?
by ELISHEVA (Prior) on Apr 28, 2009 at 03:41 UTC
    I'm not quite sure I understand what you would like to do. Are you thinking of injecting bits of code that could be carried with the "stream" waiting for the right trigger to execute? If so, what sort of trigger to you have in mind?

    Delayed execution is a perfect application for functional programming. For a recent discussion and some examples, see Functional programming ?.

    The specific application you describe for the web sounds a lot like a mashup tool. If that is what you had in mind, you might also want to check out Yahoo Pipes and the Google Mashup Editor (now a part of Google Apps). These are intended as graphical programming tools, but I believe the guts (especially for Yahoo Pipe) do something like what you are talking about. If you had something else in mind, perhaps you might add some notes explaining how your idea is different or goes beyond that?

    Best, beth

Re: IO:: ... dynamic?
by ambrus (Abbot) on Apr 28, 2009 at 14:51 UTC

    I'm not sure what you want. Do you maybe want coroutines connected with threads implemented as Coro threads or perl threads?

Re: IO:: ... dynamic?
by dk (Chaplain) on Apr 29, 2009 at 12:02 UTC
    I've seen recently IPC::Exe that looks like the idea I think you're trying to describe.
Re: IO:: ... dynamic?
by jorgegv (Novice) on Apr 29, 2009 at 07:03 UTC

    Maybe something like the following? Each stage reads from its standard input and writes to an output pipe, which it sets up before starting processing. Each stage recursively sets up the following stage by opening the pipe to it. The last stage is a bit different, writes to STDOUT, so no need to open anything.

    I haven't refactored the code, for better illustrating the fact that the stageX functions can (should?) be different.

    The best way to develop a program in this way would be to develop the stageX functions as independent scripts (reading and writing to STDIN and STDOUT), then adapting them to this framework.

    Try feeding it a large text file and see the results:

    #!/usr/bin/perl -w use Getopt::Std; sub stage1 { my $out; open $out,"|$0 -m2" or die "Can't fork.\n"; while (<STDIN>) { chomp; printf $out "S1>%s\n",$_; } close $out; } sub stage2 { my $out; open $out,"|$0 -m3" or die "Can't fork.\n"; while (<STDIN>) { chomp; printf $out "S2>%s\n",$_; } close $out; } sub stage3 { my $out; open $out,"|$0 -m4" or die "Can't fork.\n"; while (<STDIN>) { chomp; printf $out "S3>%s\n",$_; } close $out; } sub stage4 { while (<STDIN>) { chomp; printf "S4>%s\n",$_; } } my %mode = ( 1 => \&stage1, 2 => \&stage2, 3 => \&stage3, 4 => \&stage4, ); getopt("m:"); if (defined($opt_m)) { if (exists($mode{$opt_m})) { $mode{$opt_m}(); } else { die "Mode '$opt_m' undefined.\n"; } } else { die "usage: $0 -m <mode>\n"; }