use strict; use warnings; package Interval { use Time::HiRes; use MCE::Channel; use constant HAS_CLOCK_MONOTONIC => eval { # use monotonic clock if available Time::HiRes::clock_gettime( Time::HiRes::CLOCK_MONOTONIC() ); 1; }; sub new { my ($class, $delay) = @_; my $chnl = MCE::Channel->new; $chnl->send( undef ); return bless [ $delay, $chnl ], $class; } sub yield { my ($delay, $chnl) = @{ $_[0] }; my $lapse = $chnl->recv; my $time = HAS_CLOCK_MONOTONIC ? Time::HiRes::clock_gettime( Time::HiRes::CLOCK_MONOTONIC() ) : Time::HiRes::time(); if (!$delay || !defined $lapse) { $lapse = $time; } elsif ($lapse + $delay - $time < 0) { $lapse += int(abs($time - $lapse) / $delay + 0.5) * $delay; } $chnl->send($lapse += $delay); Time::HiRes::usleep(($lapse - $time) * 1e6); return; } } use MCE; use Time::HiRes qw{ sleep time }; my $interval = Interval->new(0.1); sub func { my ($last_value, $seq) = @_; printf "%0.3f $last_value $seq\n", time; sleep 0.1 if $seq == 3; # simulate extra time return $seq; } MCE->new( max_workers => 4, chunk_size => 1, sequence => [ 1, 20 ], init_relay => 0, user_func => sub { my $seq = $_; # wait for data from the last execution # this value will be 0 for the worker processing MCE->chunk_id 1 my $last_value = MCE->relay_recv; # delay until the next time period $interval->yield; my $value = func($last_value, $seq); # relay data to the next worker MCE::relay { $_ = $value }; } )->run; __END__ 1597215117.883 0 1 1597215117.983 1 2 1597215118.083 2 3 # 0.1 gap between sequence 3 and 4 due to 3 taking extra time 1597215118.283 3 4 1597215118.382 4 5 1597215118.482 5 6 1597215118.583 6 7 1597215118.683 7 8 1597215118.783 8 9 1597215118.882 9 10 1597215118.983 10 11 1597215119.082 11 12 1597215119.182 12 13 1597215119.283 13 14 1597215119.383 14 15 1597215119.483 15 16 1597215119.583 16 17 1597215119.683 17 18 1597215119.782 18 19 1597215119.883 19 20