#!/usr/bin/perl use strict; use threads qw[ yield ]; use threads::shared; use Thread::Queue; use Thread::Queue::Any; use Socket; use Win32::GUI; use Win32::GUI::AxWindow; use POSIX qw[_exit]; $|=1; our $MainWindow = new Win32::GUI::Window( -name => "MainWindow", -text => "Thread Watcher", -width => 500, -height => 500, -minsize => [500, 500], -maxsize => [500, 500], -pos => [300, 100 ], ); $MainWindow->AddLabel ( -name => "Thread\_Header", -text => pack("A5 A*"," Thr ","Information"), -pos => [10, 0], -size => [480, 20] ); my $worker_cnt = 8; #Number of worker threads to start my $hostname = `hostname`; $hostname = strip($hostname); my $hostip = (gethostbyname($hostname))[4]; my %SocketInfo = ( hostname => $hostname, hostip => $hostip, ); print "creating thread queues ..."; my $Qwork = new Thread::Queue::Any; my $Qprint = new Thread::Queue::Any; my %ThreadStatus : shared = (); my %ThreadLabel : shared = (); my %ThreadClient : shared = (); my %ThreadAddr : shared = (); print "Done\n"; print "starting 1 writer thread ... \n"; my $cthread = threads->new(\&writer); my $thrid=$cthread->tid(); $MainWindow->AddLabel ( -name => "Label1", -text => pack("A5 A20 A*",$thrid,"Sock Server",""), -pos => [10, 20], -size => [480, 20] ); $ThreadLabel{$thrid}='Label1'; print "starting 1 socketServer thread ... \n"; $cthread = threads->new(\&socketServer); $thrid=$cthread->tid(); $MainWindow->AddLabel ( -name => "Label2", -text => pack("A5 A20 A*",$thrid,"Sock Server",""), -pos => [10, 40], -size => [480, 20] ); $ThreadLabel{$thrid}='Label2'; print "starting $worker_cnt worker threads ... "; for(my $x = 0;$x<$worker_cnt;$x++){ $cthread = threads->new(\&worker); #Add a label to the gui for each thread my $y=int(60+(30*$x)); my $thrid=$cthread->tid(); my $label="Label" . int($x+3); $MainWindow->AddLabel ( -name => $label, -text => pack("A5 A20 A*",$thrid,"Started",""), -pos => [10, $y], -size => [480, 30] ); $ThreadLabel{$thrid}=$label; } print "Done\n"; $MainWindow->Show(); $MainWindow->AddTimer('Report', 250); Win32::GUI::Dialog(); ### Listener on Socket for Client Requests # Use fileno to pass Globs between threads # http://www.perlmonks.org/index.pl?node_id=395373 ### send a Die message to each thread by putting a die message in the queue for each thread to get. my @threads=keys(%ThreadLabel); my $threadcnt=@threads; $Qwork->enqueue(["","",1]) for 1 .. $threadcnt; #Wait for Keyboard input before exiting the windows Dialog print STDOUT "Press any key to exit\n"; my $inp=; ### Join threads - cleanup and exit #$_->join for @threads; - this should work but causes a crash.. Let windows clean up the threads. _exit( 0 ); ################################################################# sub Report_Timer{ foreach my $tid (sort(keys(%ThreadStatus))){ my $label=$ThreadLabel{$tid}; my $status=$ThreadStatus{$tid}; if($status=~/died/is){ delete($ThreadLabel{$tid}); delete($ThreadStatus{$tid}); $Qprint->enqueue($tid,"Dead - Creating a new thread"); my $cthread = threads->new(\&worker); my $thrid=$cthread->tid(); $Qprint->enqueue($thrid,"New thread created - $thrid"); $ThreadLabel{$thrid}=$label; $ThreadStatus{$thrid}='new'; $status='new'; } if($MainWindow->{$label}){ #print STDOUT "Updating $label\n"; $MainWindow->{$label}->Text(pack("A5 A*",$tid,$status)); } #else{print STDOUT "No Label $label\n";} } } sub MainWindow_Terminate { return -1; } ############ sub socketServer{ my $tid = threads->tid(); $ThreadStatus{$tid} = "Sock Server - Starting"; my $port = 8080; my $address = ''; my $Server; my ($name, $aliases, $protocol) = getprotobyname('tcp'); if ($port !~ /^\d+$/) {($name, $aliases, $port) = getservbyport($port, 'tcp');} my $proto = getprotobyname('tcp'); socket($Server,PF_INET,SOCK_STREAM,$proto) || die "Socket Connection Error : $!"; setsockopt($Server, SOL_SOCKET, SO_REUSEADDR, pack("l", 1)) || die "SettingSocket Option Error: $!"; bind($Server, sockaddr_in($port, INADDR_ANY)) || die "Cannot bind Socket : $!"; listen($Server,SOMAXCONN) || die "Error listening on Socket: $!"; binmode $Server; select($Server);$| = 1; select(STDOUT);$|= 1; $ThreadStatus{$tid} = "Sock Server - Listening"; my $Client; my $cnt=0; while (my $addr = accept($Client, $Server)){ $cnt++; $ThreadStatus{$tid} = "Sock Server - Enqueue Job # $cnt"; $Qwork->enqueue(fileno $Client,$addr,""); #sleep for 1 millisecond to avoid pegging the CPU select(undef,undef,undef,.01); $ThreadStatus{$tid} = "Sock Server - Listening"; } } ################################################################# sub writer { #Grab items from the Qprint queue and print them to STDOUT my $tid = threads->tid(); $ThreadStatus{$tid} = 'Writer - idle'; while(1){ select(undef,undef,undef,.01) until $Qprint->pending(); $ThreadStatus{$tid} = 'Writer - Dequeue'; my ($thrid,$msg) = $Qprint->dequeue; if(length($msg) && $msg=~/^DIE NOW$/is){ $ThreadStatus{$tid} = 'Writer - Died'; last; } print "[$thrid] $msg\n"; #Show the state of each thread #foreach my $tid (sort(keys(%ThreadStatus))){ # print STDOUT "\t$tid is $ThreadStatus{$tid}\n"; #} $ThreadStatus{$tid} = 'Writer - idle'; #sleep for 1 millisecond to avoid pegging the CPU select(undef,undef,undef,.01); } return 1; } sub worker { my $tid = threads->tid(); local $SIG{__DIE__} = sub{ $ThreadStatus{$tid} = 'died'; # $Qprint->enqueue($tid,"Thread $tid Died - ReQueueing the Job $ThreadClient{$tid}"); # $Qwork->enqueue($ThreadClient{$tid},$ThreadAddr{$tid},""); return 1; }; $ThreadStatus{$tid} = 'idle'; while(1){ select undef, undef, undef, .01 until $Qwork->pending(); $ThreadStatus{$tid} = 'DQ'; my ($fno,$addr,$action) = $Qwork->dequeue; $ThreadClient{$tid}=$fno; $ThreadAddr{$tid}=$addr; last if length($action) && int($action)==1; my $Client; $ThreadStatus{$tid} = 'Open Client'; if(open( $Client, "+<&=$fno" )){ #Process Request; $ThreadStatus{$tid} = 'Send Request for Processing'; processRequest( $Client, $addr, $tid ); $ThreadStatus{$tid} = 'Done Processing - Closing Client'; if($Client){close($Client);} } else{ $ThreadStatus{$tid} = 'Open Client Failed'; } $ThreadStatus{$tid} = 'idle'; } $ThreadStatus{$tid} = 'Done - Returning'; return 1; } sub processRequest { my $Client = shift || return; my $addr = shift || return; my $tid = shift; #$Qprint->enqueue( $tid, "Processing" ); $ThreadStatus{$tid} = 'processing'; my $socket_format = 'S n a4 x8'; my @tmp = unpack( $socket_format, $addr ); my $port = $tmp[ 1 ]; my $inetaddr = $tmp[2]; my @inetaddr = unpack('C4',$inetaddr); my $client_ip = join(".", @inetaddr); my $client_host = gethostbyaddr( $inetaddr, AF_INET ); $ThreadStatus{$tid} = "Reading Request from $client_host"; my %Request = (); my @request = readSocket( \%Request, $Client ); $ThreadStatus{$tid} = "Connect Internet Sock - $Request{url}"; my( $UrlSock, $err ) = connectToHost( $Request{ host }, 80 ); $ThreadStatus{$tid} = 'Write Request to Internet Sock'; #Send Request to the internet $request[0] =~ s[http://[^/]+][]; $request[0] =~ s[HTTP/1.1][HTTP/1.0]; foreach my $line (@request){ print $UrlSock $line; } if( exists $Request{ postdata } and length( $Request{ postdata } ) ) { print $UrlSock $Request{ postdata }; } #Get Reply Header $ThreadStatus{$tid} = 'Read Internet Sock Reply'; my %Reply = (); my @reply = &readSocket( \%Reply, $UrlSock ); $ThreadStatus{$tid} = 'Read Internet Sock Reply - OK'; binmode $UrlSock; binmode $Client; if($Request{url}=~/\.2o7\.net/is){ close($UrlSock); return; } $ThreadStatus{$tid} = 'Write Reply Header'; #Print the Reply Header to the browser client print $Client @reply; $ThreadStatus{ $tid } = 'Read Internet Sock Body'; my $clen = 0; my $readcnt = 0; while( 1 ) { my $line; $ThreadStatus{$tid} = "Reading Next Line"; my $n = read( $UrlSock, $line, 8192 ); $clen += length($line); last if $n==0; $ThreadStatus{$tid} = "Reading Next Line Done - [$readcnt] [$clen] [$Reply{'content-length'}]"; $readcnt++; if($readcnt>500){ $ThreadStatus{$tid} = "timed out Reading"; $Qprint->enqueue($tid,"Timed out"); last; } $ThreadStatus{$tid} = "Read Internet Sock Body - line $readcnt"; print $Client $line; if( $Reply{'content-length'} && $Reply{'content-length'} > 0 ){ if($clen >= $Reply{'content-length'}){ $ThreadStatus{$tid} = "Content Length Reached"; last; } } last if $line=~/\<\/html\>/is; last if $line=~/^0$/s; if($line=~/\<\/body\>$/is){ print $Client qq||; last; } $ThreadStatus{$tid} = "Sleeping"; select(undef,undef,undef,.01); } $ThreadStatus{$tid} = "Done Processing"; return 1; } sub readSocket{ my $hash = shift || return "No Hash Reference"; my $Socket = shift || return "No Socket"; binmode $Socket; my @lines = (); %{$hash} = (); #Read in First line and process it my $firstline = <$Socket>; $hash->{ request } = strip( $firstline ); push( @lines, $firstline ); if( $hash->{url} = ( $firstline =~ m[(http://\S+)] )[ 0 ] and $hash->{url} =~ m[\?(.+)]s ){ $hash->{getdata} = $1; } #Method,host,port my( $method, $host, $port ); #POST http://www.basgetti.com/cgi-bin/wasql.pl HTTP/1.1 #CONNECT www.netteller.com:443 HTTP/1.1 if( $firstline =~ m[(GET|POST|HEAD) http://([^/:]+):?(\d*)] ) { $method = $1; $host = $2; $port = $3 } elsif( $firstline =~ m[(CONNECT) ([^/:]+):?(\d*)] ) { $method = $1; $host = $2; $port = $3 } $hash->{method} = $method; $hash->{host} = $host; $hash->{port} = $port; #Read in rest of Header while( <$Socket> ) { next if m[(Proxy-Connection:|Keep-Alive:|Accept-Encoding:)]; if( m[^([a-z\-]+)\:(.+)]is ) { my $attr = lc strip( $1 ); my $val = lc strip( $2 ); $hash->{$attr} = $val; } push @lines,$_; last if m[^[\s\x00]*$]; } my $len = $hash->{'content-length'} ? $hash->{'content-length'} : 0; if( defined $method and $method =~ m[^POST$]is && $len ) { my $data = ''; my $dlen = 0; my $bytes = $len > 2048 ? 2048 : $hash->{'content-length'}; while( $dlen < $len ){ my $cdata; my $n = read( $Socket, $cdata, $bytes ); $data .= $cdata; $dlen = length($data); last if $n==0; last if !defined $n; select(undef,undef,undef,.01); } $hash->{'postdata'} = $data; } $hash->{url} =~ s[\?.*][]s if exists $hash->{url} and defined $hash->{url}; return @lines; } sub connectToHost { # Create a socket that connects to a certain host # connectToHost($MainSock, $remote_hostname, $port) my( $remote_hostname, $port ) = @_; my $Sock; my( $socket_format, $proto, $packed_port, $cur ); my( $remote_addr, @remote_ip, $remote_ip ); my( $local_port, $remote_port ); if( $port !~ m[^\d+$] ) { $port = ( getservbyname( $port, "tcp" ) )[ 2 ]; $port = 80 unless $port; } $proto = ( getprotobyname( 'tcp' ) )[ 2 ]; $remote_addr = ( gethostbyname( $remote_hostname ) )[ 4 ]; if( !$remote_addr ) { return ( undef, "Unknown host: $remote_hostname" ); } @remote_ip = unpack("C4", $remote_addr); $remote_ip = join(".", @remote_ip); $socket_format = 'S n a4 x8'; $local_port = pack($socket_format, &AF_INET, 0, $SocketInfo{hostip}); $remote_port = pack($socket_format, &AF_INET, $port, $remote_addr); socket($Sock, &AF_INET, &SOCK_STREAM, $proto) || return (undef,"Socket Error: $!"); bind($Sock, $local_port) || return (undef,"Socket Bind Error: $!"); connect($Sock, $remote_port) || return (undef,"Socket Connect Error: $!"); $cur = select($Sock); $| = 1; # Disable buffering on socket. select($cur); return $Sock; } sub strip{ #usage: $str=strip($str); #info: strips off beginning and endings returns, newlines, tabs, and spaces my $str = shift; if( length($str) == 0 ){return;} $str=~s/^[\r\n\s\t]+//s; $str=~s/[\r\n\s\t]+$//s; return $str; }