#!/usr/dev/perl/bin/perl -w require 5.8; package Trance; use strict; use IO::Socket; #use Storable qw/freeze thaw/; $|=1; my $bcast='10.42.143.255'; my $port=4478; sub new { my $self = shift; return $self; } sub listen { my $self = shift; my $queue = shift; my $sock = new IO::Socket::INET ( LocalPort => $port, Proto =>'udp' ) || die "pulsar §$!"; my $flags; my $data; do { $sock->recv($data , 65536 , $flags); print "Recieved!"; $queue->enqueue( $data ); print " and Queued.\n"; $flags=undef; } until ($SIG{INT} ); } sub send { my $self = shift; my $queue = shift; do { my $data = $queue->dequeue; socket( CON , PF_INET , SOCK_DGRAM , getprotobyname('udp')) || die "socket: $!$@"; setsockopt( CON , SOL_SOCKET , SO_BROADCAST , 1 ) || die "setsockopt: $!$@"; my $dest = sockaddr_in( $port , inet_aton($bcast) ) ; send ( CON , $data , 0 , $dest ) || die "send $!"; close CON; } until ( $SIG{INT} ); } package noid; sub new { my $self= shift; my %hash = @_; return bless \%hash, $self; } package main; use Data::Dumper; use Storable qw/freeze thaw/; use threads; use threads::shared; use threads::shared::queue; my $nodeID = `hostname`; chomp($nodeID); my $inQueue = new threads::shared::queue ; my $outQueue = new threads::shared::queue ; my $trance; $trance = Trance->new; my $hear = threads->create ( 'pickup' ); my $tell = threads->create ( 'send' ); my $face = threads->create ( 'input' ); do { my $packet = ( $inQueue->dequeue ); print Dumper thaw $packet if $packet; } until ($SIG{INT}) ; sub pickup { $trance->listen( $inQueue ) }; sub send { $trance->send( $outQueue ) }; sub input { do { $_ = ; chomp; my $time = time; my $ref = noid->new( pid => $$, time => $time, node => $nodeID, data => $_ ); $outQueue->enqueue(freeze $ref); } until ( $_ eq ''); }