use strict; use Thread::Pool; #use Thread::Semaphore; open( OUTFILE, ">/crnch_data/foo.csv" ) or die "Cannot open output"; print OUTFILE "\n"; #select(STDIN); $| = 1; open( STDIN, "/usr/sbin/tcpdump -vttttnner /crnch_data/tcpdump_infile |" ); select(OUTFILE); $| = 1; my $res = ""; my $pool = Thread::Pool->new( { workers => 10, do => \&do, stream => \&monitor, } ); $pool->job($_) while (); $pool->shutdown; sub do { chomp; my $input = $_; $input = shift; if ( $input =~ /(^\d\d\d\d-\d\d-\d\d .*)/ ) { if ( $input =~ /(.*) \(tos (\S+), ttl +(\d+), id (\d+), offset (\d+), flags ([\S\+]+)(?:, ([\S\+]+))?, proto +(\S+ \(\d+\)), length (\d+)(?:, .*?)?\) (.*)/ ) { $input = "$1 $2 $3 $4 $5 $6$7 $8 $9 $10"; } else { if ( $input =~ /(.*) (((?:(\d{1,2}|[a-fA-F]{1,2}){2})(?::|-*)){6}) (\>) (((?:(\d{1,2}|[a-fA-F]{1,2}){2})(?::|-*)){6}), (.*?), length (\d+)\:(.*)/ ) { $input = "$1 $2 x $6 x x x x $10"; } else { $res = "error"; } } } else { $res = "error"; } my @fields = split( " ", $input ); my $timestamp = $fields[0] . " " . $fields[1]; my $microsecond = $fields[1]; $timestamp =~ s/(.*?)\.\d+$/\1/; $microsecond =~ s/(.*?)\.(\d+$)/\2/; my $sourcemac = $fields[2]; my $destmac = $fields[4]; $destmac =~ s/,//g; $fields[4] =~ s/,$//; my $len = $fields[9]; $len =~ s/:$//; my $tos = $fields[11]; my $ttl = $fields[12]; my $id = $fields[13]; my $offset = $fields[14]; my $ipflags = $fields[15] . " " . $fields[16]; $ipflags =~ s/\[(.*)\]/\1/g; my $sip = $fields[18]; $sip =~ s/([^\.]+\.[^\.]+\.[^\.]+\.[^\.]+).*/\1/; $sip =~ s/:$//; my $sport = $fields[18]; if ( $sport =~ /[^\.]+\.[^\.]+\.[^\.]+\.[^\.]+\.(.*)/ ) { $sport = $1; $sport =~ s/:$//; } else { $sport = "null"; } my $dip = $fields[20]; $dip =~ s/([^\.]+\.[^\.]+\.[^\.]+\.[^\.]+).*/\1/; $dip =~ s/:$//; my $dport = $fields[20]; if ( $dport =~ /[^\.]+\.[^\.]+\.[^\.]+\.[^\.]+\.(.*)/ ) { $dport = $1; $dport =~ s/:$//; } else { $dport = "null"; } my $proto = "null"; my $flags = $fields[21] if ( $fields[21] =~ /[SRPU.]+/ ); my $proto = "tcp" if ( $fields[21] =~ /[SRPU.]+/ ); $_ = "//$timestamp//$microsecond//$sourcemac//$destmac//$sip//$dip//$sport//$dport//$proto//$flags//$len//$ttl//$id//$tos//$ipflags//$offset?"; } sub monitor : locked { my $line = $_; # my $semaphore = new Thread::Semaphore; $line = shift; unless ( $line eq "error" ) { # $semaphore->down; print OUTFILE $line; # $semaphore->up; } } close(OUTFILE);