package Thread::OrderedQueue; use strict; use warnings; use threads::shared; sub new { my $class = shift; my $self = bless(\( my %self :shared ), $class); $self->{ q } = \( my %q: shared ); $self->{ last_given_ref } = \( my $last_given :shared = -1 ); $self->{ next_read } = 0; $self->{ ended } = 0; return $self; } sub get_token { my $self = shift; lock(${ $self->{last_given_ref} }); # We can't check if ended because we didn't lock %$self. return ++${ $self->{last_given_ref} }; } sub end { my $self = shift; lock(%$self); $self->{ended} = 1; cond_signal(%$self); } sub enqueue { my $self = shift; lock(%$self); die("!!!") if $self->{ended}; $self->{q}{ $_[0] } = shared_clone( $_[1] ); cond_signal(%$self) if $_[0] == $self->{next_read}; } sub dequeue { my $self = shift; lock(%$self); while (1) { if (exists( $self->{q}{ $self->{next_read} } )) { my $rv = delete( $self->{q}{ $self->{next_read}++ } ); cond_signal(%$self); return $rv; } if ($self->{ended}) { cond_signal(%$self); return undef; } cond_wait(%$self); } } 1;