#!/usr/bin/perl =foo Update: added rules Update: on my darwin i set up the relevant rule with ipfw add 65500 divert 65500 tcp from me to any via en0 replace with your iface of choice, naturally. I haven't set this up on linux (yet), but I know divert sockets were ported at some point... ciao ciao! This little script was hell to write, because every time i didn't do the firewall rule right, and had a fix for the script, my netinfo server couldn't be contacted... this ment that i couldn't run ipfw flush regardless, i've finally made it. It now prioritises using a size based queue, which is really nothing compared to what you can do. Suprisingly it's not a resource hog, and there's still some stuff to implement, such as timeout based dropping using a journal (the purger is written, but not the logger), and priority queue clipping using mapl's janus heaps... I guess i'll prioritize the cmp sub so that it puts tcp SYN packets, and non piggybacked ACKS, aswell as short ICMP and UDP at the top automatically. Then I'll prioritise by service... so far only the heap has made bittorrent and perlmonks chat possible at the same time, even when running several bts at a time... I must say I was surprised. Replace "#print" with ";print" for debugging info. Unresolved bug: within the SIGIO handler recv will sometimes return undefined, but the loop check doesn't pick it up, so pack complains. I have no idea why. Bah. Enough yak, here's the stuff: =cut use strict; use warnings; use Socket; use Time::HiRes qw(time sleep alarm); use Fcntl qw(F_SETOWN F_GETFL F_SETFL O_ASYNC O_NONBLOCK); use Array::Heap; #use Array::Heap2; # same thing, with a different namespacs use NetPacket::IP qw(IP_PROTO_ICMP IP_PROTO_TCP IP_PROTO_UDP); use NetPacket::TCP; use NetPacket::UDP; use NetPacket::ICMP; use Scalar::Util qw(dualvar); ### KNOWN BUGS # this code is very dirty and hackish, may be resolved in the future ### CHANGES # added real rules to order the heaps. A simple API and examples at the bottom... # currently aggressively user oriented, with a bit of space for standard unixish servers ### TODO # score changes dynamically based on time... Instability in heap could # be negligeable at this point, as TCP is fault tolerant and UDP # shouldn't care. ICMP should a have high enough priority # # purge using timeout, not just by clipping the queue # # both of these are theoretically solveable using references within the queue, # so that you could undef and change the value of the referants (thingys) in # the heap itself. I wonder if make_heap instead of push_heap would be much worse. # This way we could take a reference to an element in the heap. # aggressive purging minimizes wasted bandwidth: # have an x second maximum queue life (to be implemented) # purge heap size: # if (overflow){ # reverse order # reverse heapify # reverse pop # reverse order # } # # push # user config sub PORT () { 65500 }; # the port to bind on sub WANTED_BYTES_PER_SECOND () { 94 * 128 }; # cap ( * 128 for kilobits, * 1024 for kilobytes )... divide by ERROR sub SLEEP_INTERVAL () { 0.05 } ## smaller = smoother, more overhead. For some perspective: ## 0.01 is usually the size of a time slice on unices... (linux lets you get picky) ## 0.03 gives good response on ~10K, 0.05 on 100K and more. the question is really ## how many packets do you burst before delaying for them all. If you send 100 packets ## a second the overhead of calling sleep will cause a significant delay sub PURGE_INTERVAL () { 2 } ## how often are packets purged sub PURGE_TIMEOUT () { 20 } ## how long can a packet live in the queue... only as accurate as PURGE_INTERVAL sub QUEUE_LIMIT () { 32 } ## how many packets are allowed to be in the queue at any given time # constants sub ERROR () { 0.996 }; # clipped average on my machine, for 12 * 1024, 64 * 1024, 128 * 1024 sub BYTES_PER_SECOND () { WANTED_BYTES_PER_SECOND / ERROR }; sub SLICE () { BYTES_PER_SECOND * SLEEP_INTERVAL }; # a slice of data corresponding to SLEEP_INTERVAL sub NOQUEUE () { O_ASYNC | O_NONBLOCK }; sub PACK_QUEUE () { "a16 a*" }; sub PACK_DEPT () { "d S" }; ## F: less overhead, innacurate. d: accurate, more overhead, ## D: less overhead, accurate, needs long double support (perl -V). sub PURGE_MARKERS () { int(PURGE_TIMEOUT / PURGE_INTERVAL) } # variables my ($dept,@dept) = (0); my ($qcnt,@qjournal,@queue) = (0,((undef) x PURGE_MARKERS)); #print "Initializing...\n"; #print "SLICE=", SLICE, "\n"; my $rules = new Rules; install_rules($rules); ### hackery for priority queue implmentation sub queue (@) { ## use $rules to compute score, push, janusify and pop if needed my @packets = @_; foreach my $packet (@packets){ $qcnt++; $packet = dualvar $rules->evaluate(substr($packet,16)), $packet; # add journal entry } if ($qcnt > QUEUE_LIMIT){ #print "queue has exceeded limit, clipping\n"; @queue = reverse @queue; make_heap_cmp { $b <=> $a } @queue; while ($qcnt > QUEUE_LIMIT){ pop_heap_cmp { $b <=> $a } @queue; $qcnt--; } @queue = reverse @queue; } push_heap @queue, @packets; } sub dequeue (){ ### pop while defined $qcnt--; pop_heap @queue; } ### hackery ends ## set up socket socket D, PF_INET, SOCK_RAW, getprotobyname("divert") or die "$!"; bind D,sockaddr_in(PORT,inet_aton("0.0.0.0")) or die "$!"; fcntl D, F_SETOWN, $$; #print "fcntl returned async ", ((fcntl D, F_GETFL, 0 & NOQUEUE) ? "on" : "off"), "\n"; #$SIG{ALRM} = sub { # if ($qcnt){ # while (defined $qjournal[0]){ # undef ${shift @qjournal}; # undefine the reference # if (--$qcnt == 1){ # @queue = (); # @qjournal = ((undef) x PURGE_MARKERS); # return; # } # }; shift @qjournal; # take out one marker # push @qjournal, undef; # put another in # } #}; #alarm PURGE_INTERVAL,PURGE_INTERVAL; # purge old packets every n my ($ts,$p); # lexical for SIGIO temp usage... don't alloc/daelloc every time $SIG{IO} = sub { #print time(), " SIGIO: queuing up\n"; while (defined($ts = recv D, $p, 65535, 0)){ # we assume packets can come in faster than SIGIO can be called # (on my machine, 128 * 1024 cap, this loops usually picks up 3-4 # packets), so we'll save some context switching on high load, #print "undefined p ($!)\n" unless defined $p; #print "undefined ts ($!)\n" unless defined $ts; queue(pack(PACK_QUEUE, $ts, $p)); } }; #print "Initialization complete, starting read loop\n"; # start loop my ($to, $t, $s, $l); #my ($start, $end, $total); # used to compute ERROR PACKET: { if (defined ($to = recv D, $_, 65535, 0)){ # blocking read. the queue is empty. $to is reassigned # because the packet could come from various rules. hack at it # if it ticks you off. #print time(), " received packet\n"; #print "received: " . length($to) . "\n"; if ($dept < SLICE){ #print time(), " dept is $dept - short circuited, should take ", length($_) / BYTES_PER_SECOND, " seconds to deliver\n"; send D, $_, 0, $to; $dept += length($_); push @dept, pack(PACK_DEPT, time(), length($_) ); redo PACKET; } else { #print time(), " queued (too much dept: $dept)\n"; queue(pack(PACK_QUEUE, $to, $_)); # pack is about 1.5 times faster than refs (benchmark) } # the queue is not empty, or dept needs purging #print time(), " clearing up queue\n"; fcntl D, F_SETFL, ((fcntl D, F_GETFL, 0)|NOQUEUE); # switch to async #print "fcntl is now noqueue ", ((fcntl D, F_GETFL, 0 & NOQUEUE) ? "on" : "off"), "\n"; # use to compute ERROR #$start = time; #$total = 0; until (not $qcnt){ # until the queue is empty do { #print time(), " cleaning out and making up for dept\n"; $t = time; for (my $i = 0; $i < @dept; $i++){ defined $dept[$i] or next; ($s, $l) = unpack(PACK_DEPT, $dept[$i]); #print time(), " diff is ", time - $s, ", ", ($l / BYTES_PER_SECOND)," diff needed queue length is $#queue, dept joural has $#dept entries ($dept)\n"; if ($t > $s + ($l / BYTES_PER_SECOND) ){ $dept -= $l; delete($dept[$i]); # faster than splice } } while (@dept and not exists $dept[0]){ shift @dept }; ## clean out those which are easy #print time(), " dept is now $dept\n"; #print time(), " will sleep for ", $dept / BYTES_PER_SECOND,"\n" if $dept > SLICE; } while (($dept > SLICE) and sleep $dept / BYTES_PER_SECOND); # sleep (one should suffice, but in case a sig came # (IO, ALRM are used)) until we've cleared the dept #print time(), " dept is now $dept, flushing a packet\n"; my ($to,$p) = unpack(PACK_QUEUE, dequeue() ); $dept += length($p); push @dept, pack(PACK_DEPT, time(), length($p) ); #$total += length($p); used to compute ERROR #print time(), " sent one from queue, dept is now $dept, should take ", length($p) / BYTES_PER_SECOND, " seconds to deliver (queue left: $#queue)\n"; send D, $p, 0, $to; !$qcnt ? fcntl D, F_SETFL, ((fcntl D, F_GETFL, 0)&!NOQUEUE) : redo ; # unset async. checking here will skip checking # until(!queue), up to the time fcntl is called. # Then a double check is made to avoid a packet # getting stuck in the queue while others are # getting short circuited #print "fcntl is now noqueue ", ((fcntl D, F_GETFL, 0 & NOQUEUE) ? "on" : "off"), "\n"; } # use this code to determine ERROR #$end = time; #my $bps = ($total/($end-$start)); # print "during high load we sent $total bytes in ", $end-$start, " seconds, which means ", $bps, " bytes per second.\n"; # print "the ratio of actual rate versus cap is ", $bps/BYTES_PER_SECOND, "\n"; #print time(), " queue empty, returned to synchronious IO\n"; # the queue is empty } redo } 1; # Keep your mother happy. sub install_rules { ## the rules $_[0]->install( ### DEPENDANCIES Rule::Dependancy::Simple->new({ # basic (network unrelated) data provides => ["basic"], evaluate => sub { my $packet = shift; my $basic = new Dependancy::Simple; $basic->set("size",length($packet)); {basic => $basic}; }, }), Rule::Dependancy::Simple->new({ # ip packet data provides => ["ip"], evaluate => sub { {ip => NetPacket::IP->decode($_[0]) } } }), Rule::Dependancy::Simple->new({ # tcp packet data needs => ["ip"], provides => ["tcp"], evaluate => sub { #print "providing tcp packet dependancy\n"; ##print "got packet: ", unpack("H*",$_[0]), "\n"; ##print "Available dependancies:\n\n", do { use Data::Dumper; Dumper $_[1] },"\n"; ($_[1]{ip}{proto} == IP_PROTO_TCP) ? {tcp => NetPacket::TCP->decode($_[1]{ip}{data}) } : {} } }), Rule::Dependancy::Simple->new({ # udp packet data needs => ["ip"], provides => ["udp"], evaluate => sub { ($_[1]{ip}{proto} == IP_PROTO_UDP) ? {udp => NetPacket::UDP->decode($_[1]{ip}{data}) } : {} } }), Rule::Dependancy::Simple->new({ # icmp packet data needs => ["ip"], provides => ["icmp"], evaluate => sub { ($_[1]{ip}{proto} == IP_PROTO_ICMP) ? {icmp => NetPacket::ICMP->decode($_[1]{ip}{data}) } : {} } }), ### RULES Rule::Simple->new({ # handle Type of Service et cetera (delay += 8, thoroughput += 5, reliability += 4, cost += 1, congestion += 2) needs => ["ip"], evaluate => sub { 0 }, }), Rule::Simple->new({ # packet size needs => ["basic"], evaluate => sub { #print "evaluating size based score adjustment\n"; length($_[1]{basic}->get("size")) ? (1.5 * log(length($_[1]{basic}->get("size")))) : 0 } }), Rule::Simple->new({ # tcp window size needs => ["tcp"], evaluate => sub { #print "evaluating window size score adjustment\n"; $_[1]{tcp}{winsize} ? 0.1 * log($_[1]{tcp}{winsize}) : 0 } }), Rule::Simple->new({ # icmp conditional needs => ["icmp"], evaluate => sub { #print "packet is icmp, -20\n"; -20 }, }), Rule::Simple->new({ # udp conditional needs => ["udp"], evaluate => sub { #print "packet is UDP, -6\n"; -6 }, }), Rule::Simple->new({ # tcp flags needs => ["tcp"], evaluate => sub { #print "evaluating tcp flags\n"; my $flags = $_[1]{tcp}{flags}; my $ret = 0; # tcp messages with special information have varying degrees of additional importance $ret -= 1 if $flags & FIN; $ret -= 8 if $flags & SYN; $ret -= 20 if $flags & RST; # attempt to help prevent waste by pushing as fast as possible. They're pretty rare anyway $ret -= 5 if $flags & PSH; $ret -= 2 if $flags & ACK; # packets without acks aren't as urgent $ret -= 20 if $flags & URG; # $ret += 0 if $flags & ECE; # $ret += 0 if $flags & CWR; #print "final score: $ret\n"; $ret; } }), Rule::Simple->new({ # generic (udp, tcp) port handling wants => ["tcp","udp"], # we either have tcp, or tcp evaluate => sub { #print "evaluating port rules\n"; my $prot = (exists $_[1]->{tcp}) ? $_[1]{tcp} : $_[1]{udp}; my $ret = 0; my $src = $prot->{src_port}; my $dst = $prot->{dest_port}; #print "ports: dest=$dst, src=$src\n"; SWITCH: { # source port # unpriviliged ports $src > 1024 and do { $ret += 2; #print "source port is unpriviliged\n"; $ret += 18, last if ($src >= 6881 and $src <= 6888); # bittorrent $ret += 17, last if $src == 5501; # hotline $ret += 15, last if $src == 20; # ftp data last; }; # important services $src == 80 and $ret -= 1, last; # http $src == 443 and $ret -= 1, last; # https $src == 143 and $ret -= 4, last; # imap $src == 110 and $ret -= 4, last; # pop3 $src == 25 and $ret -= 5, last; # smtp $src == 22 and $ret -= 7, last; # ssh $src == 21 and $ret -= 6, last; # ftp control } SWITCH: { # destination port $dst > 1024 and do { $ret += 3; #print "destination port is unpriviliged\n"; $ret += 16, last if ($dst >= 6881 and $dst <= 6888) and not ($src >= 6881 and $src <= 6888); $ret += 15, last if $dst == 5501; $ret += 14, last if $dst == 20; last; }; $dst == 80 and $ret -= 6, last; # http $dst == 443 and $ret -= 6, last; # https $dst == 143 and $ret -= 4, last; # imap $dst == 110 and $ret -= 4, last; # pop3 $dst == 25 and $ret -= 2, last; # smtp $dst == 22 and $ret -= 10, last; # ssh $dst == 23 and $ret -= 10, last; # telnet $dst == 21 and $ret -= 6, last; # ftp ctrl } #print "port score: $ret\n"; $ret; } }), ) } package Rules; # API for joint abstraction - rules depend on common shared data, and may be added and removed. # rules evaluate recursive, possibly asynchroniously in the future. # once a dependancy is solved it may not be altered, and all it's children may be computed on it with no locking - methods are supposed to return static or unrelated data. # a dependancy gets it's own # dependancy is either or: (more complexity may be built by creating empty dependnancy rules) # needs -> a strong dependancy list. every dependancy must be met (evaluated as soon as all are met) # wants -> a weak dependnancy list, at least one must be met (evaluated as soon as one is met) # evaluate -> run the rule, and return either a hash of dependancy objects, or a score modification # provides -> currently irrelevant. for hinting install in the future sub new { bless [],shift; # dependancy tree, inverse dependancy tree, rules pending parent, execution tree } sub install { # clear rules that will never have all their dependancies met, and then filter for duplicates my $self = shift; # filter here #print "installing score rules\n"; push @$self,@_; } sub evaluate { # evaluate all of the rules and return the sum my $self = shift; my $packet = shift; #no warnings; # perl doesn't like me playing with closures my %offers; my %deferred; my @ruleq; my $score = 0; #print "evaluating entire ruleset\n"; foreach my $rule (@$self){ my $dep = [ 0, $rule ]; # build dependancy counter if ($rule->has_deps){ my @needs; if ($rule->strong_deps){ @needs = grep { not exists $offers{$_} } $rule->needs; $dep->[0] = scalar @needs; } else { $dep->[0] = 1; @needs = grep { $dep->[0] and (exists $offers{$_} ? (($dep->[0] = 0),undef) : 1) } $rule->needs; $dep->[0] or @needs = (); } #print "this rule needs (@needs)\n"; foreach my $dependancy (@needs){ $deferred{$dependancy} = $dep; } } push @ruleq,try($packet,\$score,\%offers,\%deferred,$dep); } my $last = scalar @ruleq; while(@ruleq){ # finish the loop #print "attempting to evaluate remaining rules\n"; push @ruleq, try($packet,\$score,\%offers,\%deferred,shift @ruleq); (last == @ruleq) ? last : ($last = @ruleq); # break an infinite loop } #print "Final score is $score\n"; return $score; sub try { my ($packet,$score,$offers,$deferred,$dep) = (@_); #print "trying to evaluate rule\n"; if ($dep->[0] < 1){ #print "all dependancies met\n"; my $ret = $dep->[1]->evaluate($packet,$offers); if (ref $ret){ #print "rule introduced new offerings:"; foreach my $key (keys %{$ret}){ #print " $key,"; $offers->{$key} = $ret->{$key}; # install dependancies foreach my $dependant (@{$deferred->{$key}}){ $dependant->[0]--; # dependancy count goes down by one } } #print "\n"; } else { #print "rule adjusted score by $ret\n"; $$score += $ret; } # don't forget this is a closure return (); # we have nothing to requeue } else { #print "unmet dependancies\n"; return $dep; # requeue the current one } } } ## base packages for rules package Rule::Simple; # a rule is something that fits in rules, and works via a certain API. a leaf in a dependancy tree sub new { my $pkg = shift; bless shift, $pkg; } sub has_deps { (exists $_[0]{needs} or exists $_[0]{wants}) ? 1 : undef }; sub strong_deps { (exists $_[0]{needs}) ? 1 : undef }; sub needs { (exists $_[0]{needs}) ? @{$_[0]{needs}} : @{$_[0]{wants}} } sub evaluate { goto &{shift->{evaluate}} } package Rule::Dependancy::Simple; # a dependancy rule is something another dependancy rule or plain rule needs. a node in a dependancy tree. use base "Rule::Simple"; # a simple rule that also provides(); sub provides { @{$_[0]{provides}} } package Dependancy::Simple; # a dependancy is something a dependancy rule creates - This is just a base class for dependancy objects to work on. It contains plain values, and is basically a blessed hash sub new { bless {},shift } sub set { # set a value $_[0]{$_[1]} = $_[2]; } sub get { # get a value $_[0]{$_[1]} } __END__