vxp has asked for the wisdom of the Perl Monks concerning the following question:

Possible duplicate of Slow mass mailer (used for sending out a newsletter, not spam)


These are 3 files here, for sending out newsletters.
They are completely functioning, although some parts still
require work (such as log management could be a bit better
i suppose :)
1st file: ds2srv the server. you just launch it in the background and let it sit.
2nd file: ds2clt the client. you launch it as soon as
the newsletter and the list of people to mail it to are sitting in the queue directory (queue directory should be in the same dir as ds2srv ... ds2srv will search for something.msg and something.list in queue. something.msg is the newsletter and something.list is the list of people to send it to.
3rd file: ds2cmd used for sending commands to the server
such as shutdown, stats, etc.

any suggestions/patches/better written code is appreciated :)

#!/usr/bin/perl -w use strict; use IO::File; use IO::Socket; use IO::Select; use Queue; my ($svr_shutdown, $verbose, $debug); my ($select, $log); my %conn_info; my $msg_id; my ($msg, @addr_list, $list_pos); my $qdir = "queue"; my $q = Queue->new($qdir); sub printlog; sub close_client; ######################################################### # # The %cmds hash keeps the protocol commands and their # associated subrefs. To add functionality, just # declare the sub and add a key to the hash. # ######################################################### my %cmds; sub next_addr(); sub succ_addr(); sub fail_addr(); sub get_msg(); sub svr_stat(); sub svr_down(); %cmds = ( 'next_addr' => \&next_addr, 'succ_addr' => \&succ_addr, 'fail_addr' => \&fail_addr, 'get_msg' => \&get_msg, 'quit' => \&close_client, 'stat' => \&svr_stat, 'svr_down' => \&svr_down, 'hup_log' => \&hup_log, ); ######################################################### # # The protocol handlers actually live here - don't forget # to map the sub to a command in the %cmds hash. # ######################################################### sub next_addr() { my ($clt) = @_; if (defined $msg && defined($conn_info{$clt->fileno}{"msg_id"}) && + $msg_id eq $conn_info{$clt->fileno}{"msg_id"}) { if ($list_pos < @addr_list) { chomp $addr_list[$list_pos]; if ($debug) {print("0: send_addr: $addr_list[$list_pos] " + . $clt->fileno . ": listpos: $list_pos : addrlist: " . scalar(@addr_ +list) . "\n")} snd($clt,$addr_list[$list_pos]); $list_pos++; } if ($list_pos >= @addr_list) { if ($debug) {print("0: flush:" . $clt->fileno . ": listpos +: $list_pos : addrlist: " . scalar(@addr_list) . "\n")} printlog("$msg_id.msg finished. flushing."); $msg = undef; $msg_id = undef; @addr_list = undef; $list_pos = -1; } } else { if ($debug) {print("0: FIN:" . $clt->fileno . "\n")} delete $conn_info{$clt->fileno}{"msg_id"}; snd($clt,"FIN"); } } sub succ_addr() { my ($clt,$addr) = @_; if (defined($addr)) { printlog("succ: $addr"); snd($clt,"OK"); } else { snd($clt,"ERR:no addr!"); } } sub fail_addr() { my ($clt,$addr) = @_; if (defined($addr)) { printlog("fail: $addr"); snd($clt,"OK"); } else { snd($clt,"ERR:no addr!"); } } sub get_msg() { my ($clt) = @_; if (!defined $msg) { my $msg_file = $q->next_by_ext(".msg"); if ($msg_file) { my $list_file = $msg_file; $list_file =~ s/\.msg$/\.list/g; $_ = $msg_file; ($msg_id) = /$qdir\/(.*)?\.msg/; if ($debug) { print "Setting msg_id: $msg_id\n" } open(MSG, "<$msg_file"); { local $/ = undef; $msg = <MSG>; } close(MSG); open(LIST, "<$list_file"); @addr_list = <LIST>; close(LIST); $list_pos = 0; $q->dequeue($msg_file); $q->dequeue($list_file); printlog("$msg_id.msg loaded; " . scalar @addr_list . " ad +dresses"); } else { $msg = undef; $msg_id = undef; } } if (defined $msg) { snd($clt,$msg); if ($debug) { print "conn:" . $clt->fileno . ": msg_id=$msg_id +\n" } $conn_info{$clt->fileno}{"msg_id"} = $msg_id; } else { snd($clt,"FIN"); if (exists $conn_info{$clt->fileno}{"msg_id"}) { delete $conn_ +info{$clt->fileno}{"msg_id"} } } } sub svr_stat() { my ($clt) = @_; if ($list_pos >= @addr_list) { snd($clt,"List finished - $list_pos addresses processed"); } elsif ($list_pos == 0) { snd($clt,"List ready. Waiting for client."); } else { snd($clt,$list_pos . " of " . (@addr_list)); } } sub svr_down() { my ($clt) = @_; $svr_shutdown = 1; snd($clt,"Bye!"); } sub hup_log() { my ($clt) = @_; $log->close(); $log = IO::File->new(">>log/ds2.log"); snd($clt,"log released"); } ################################ # # ################################ sub snd { my $sock = shift; my $msg = shift; my $len = length($msg); if ($debug) {print $sock->fileno . ": > $len : $msg\n"} $msg = pack('N', $len) . $msg; syswrite($sock, $msg, length($msg)); } sub rcv { my $sock = shift; my $buf; my ($bytes_read, $total_read, $bytes_to_read); $bytes_read = sysread($sock, $buf, 4); if ($! || ($bytes_read != 4)) { return undef; } else { $bytes_to_read = unpack('N',$buf); #if ($debug) { print "Read count: $bytes_to_read\n" } # Quick sanity check on our packet length # Bob help you if you're trying to read over 1MB packets :-) return undef if ($bytes_to_read <= 0 || $bytes_to_read > 10000 +00); $total_read = 0; my $buf2 = $buf = undef; while ($total_read < $bytes_to_read) { $bytes_read = sysread($sock, $buf, $bytes_to_read - $total +_read); $buf2 .= $buf; $total_read += $bytes_read; } $buf = $buf2; } return $buf; } sub printlog { my @date = localtime(time); my $timestamp = sprintf("%02d/%02d/%04d %02d:%02d:%02d", $date[4] ++ 1, $date[3], $date[5] + 1900, $date[2],$date[1], $date[0]); print $log "$timestamp: @_\n"; $log->flush(); } sub close_client { my $socket = shift; if ($debug) {print($socket->fileno . ": disconnected\n")} if (defined $conn_info{$socket->fileno}) { delete($conn_info{$sock +et->fileno}) } $select->remove($socket); $socket->close; } ######################################################### # # start_server - Sets up the server socket and handles # all I/O and command dispatching. # ######################################################### ######################################################### # # Start things running... # ######################################################### $| = 1; $list_pos = 0; $verbose = 0; $debug = 0; while (defined($_ = shift)) { if ($_ eq '-v' || $_ eq '--verbose') {$verbose = 1} elsif ($_ eq '-d' || $_ eq '--debug') {$debug = 1} } $svr_shutdown = 0; my $server = IO::Socket::INET->new( Proto => 'tcp', LocalPort => 6900, Listen => SOMAXCONN, Reuse => 1); $select = IO::Select->new($server); $log = IO::File->new(">>log/ds2.log"); my @ready; while (!$svr_shutdown && (@ready = $select->can_read)) { my $socket; for $socket (@ready) { if ($socket == $server) { my $new_clt = $server->accept(); $new_clt->autoflush(1); $select->add($new_clt); if ($debug) {print($new_clt->fileno . ": connected\n")} snd($new_clt,"HELO " . $new_clt->fileno); } else { my $msg; if (defined($msg = rcv($socket))) { chomp $msg; if ($debug) {print($socket->fileno . ": $msg\n")} $_ = $msg; my ($clt_cmd, $clt_args) = /^\s*(\w*)\s*(.*)/; if (exists($cmds{"$clt_cmd"})) { my $subref = $cmds{"$clt_cmd"}; &$subref($socket, $clt_args); } else { snd($socket,"Invalid command: $clt_cmd"); } } else { close_client($socket); } } } } $log->close();
#!/usr/bin/perl -w use strict; use IO::Socket; use Net::DNS; use MIME::Lite; sub mail_run; sub send_letter($$$$); my ($msg, $resp, $from_addr, $subject); my $debug; my ($child_count, $kid_pid); my $conn_id; my $sim_delivery = 30; $| = 1; $debug = 0; while (my $arg = shift) { if ($arg eq '-d') { $debug = 1 } elsif ($arg eq '-s') { $sim_delivery = shift or die "You mus +t specify the number of simultaneous deliveries!\n" } } for ($child_count = 0; $child_count < $sim_delivery; $child_count++) { if (!defined($kid_pid = fork())) { die "Couldn't fork!\n"; } elsif ($kid_pid) { # I'm the daddy - don't do nothin, yet } else { mail_run; exit; } } while ($child_count > 0) { wait; $child_count--; } sub snd { my $sock = shift; my $msg = shift; my $len = length($msg); if ($debug) {print $conn_id . ": sent: $msg\n"} $msg = pack('N', $len) . $msg; syswrite($sock, $msg, length($msg)); } sub rcv { my $sock = shift; my $buf; my ($bytes_read, $total_read, $bytes_to_read); $bytes_read = sysread($sock, $buf, 4); if ($! || ($bytes_read != 4)) { return undef; } else { $bytes_to_read = unpack('N',$buf); $total_read = 0; my $buf2 = $buf = undef; while ($total_read < $bytes_to_read) { $bytes_read = sysread($sock, $buf, $bytes_to_read - $total +_read); $buf2 .= $buf; $total_read += $bytes_read; } $buf = $buf2; } return $buf; } sub mail_run { my $stop; my $server = IO::Socket::INET->new( Proto => 'tcp', PeerAddr => 'localhost', PeerPort => 6900 ); $server->autoflush(); $_ = rcv($server); ($conn_id) = /HELO (.*)/; if ($debug) {print "$conn_id: Connected to server\n"} $stop = 0; while (!$stop) { if (!defined $msg) { if ($debug) {print "$conn_id: Getting message\n"} snd($server,"get_msg"); $msg = rcv($server); if (!defined $msg || $msg =~ /^FIN/) { snd($server,"quit"); $stop = 1; } else { ($from_addr, $subject, $msg) = split("\n", $msg, 3); } } else { if ($debug) {print "$conn_id: Requesting next address\n"} snd($server,"next_addr"); $server->flush(); if ($debug) {print "$conn_id: Request sent...waiting\n"} $resp = rcv($server); $resp =~ s/[\r|\n]//g; if ($debug) {print "$conn_id: server said: $resp\n"} if ($resp eq 'FIN') { if ($debug) {print "$conn_id: Getting message\n"} snd($server,"get_msg"); $msg = rcv($server); if (!defined $msg || $msg eq 'FIN') { if ($debug) {print "$conn_id: server said: FIN (ms +g)\n"} snd($server,"quit"); $stop = 1; } else { ($from_addr, $subject, $msg) = split("\n", $msg, 3 +); } } else { if ($debug) { print "$conn_id: From: $from_addr\nTo: $resp\nSubj +ect: $subject\n"; sleep 2; } else { if (send_letter($from_addr, $resp, $subject, $msg) +) { snd($server,"succ_addr $resp"); } else { snd($server,"fail_addr $resp"); } $resp = rcv($server); } } } } } sub send_letter($$$$) { my $from_addr = shift; my $to_addr = shift; my $subject = shift; my $body = shift; my $username = undef; my $domain = undef; my $success = 0; ###################### # add email validation ###################### if ( $to_addr =~ /\@/ ) { ($username, $domain) = split /\@/, $to_addr; my $msg = MIME::Lite->new( From => $from_addr, To => $to_addr, Subject => $subject, Data => $body, Type => 'text/html' ); my $res = Net::DNS::Resolver->new(); my @mx = mx($res, $domain); my $i = 0; my $num_mx = @mx; while ($i < $num_mx && !$success) { # print "\n$to_addr: ", $mx[$i]->exchange; eval('$msg->send_by_smtp($mx[$i]->exchange, Timeout=>60)') +; if ( !$@ ) { $success = 1 } $i++; } } return $success; }
#!/usr/bin/perl -w use strict; use IO::Socket; my ($msg, $resp, $from_addr, $msg_title); my ($verbose, $debug); $| = 1; $verbose = 0; $debug = 0; sub snd { my $sock = shift; my $msg = shift; my $len = length($msg); if ($debug) {print $sock->fileno . ": $len : $msg"} $msg = pack('N', $len) . $msg; syswrite($sock, $msg, length($msg)); } sub rcv { my $sock = shift; my $buf; my ($bytes_read, $total_read, $bytes_to_read); $bytes_read = sysread($sock, $buf, 4); if ($! || ($bytes_read != 4)) { return undef; } else { $bytes_to_read = unpack('N',$buf); $total_read = 0; my $buf2 = $buf = undef; while ($total_read < $bytes_to_read) { $bytes_read = sysread($sock, $buf, $bytes_to_read - $total +_read); $buf2 .= $buf; $total_read += $bytes_read; } $buf = $buf2; } return $buf; } my $server = IO::Socket::INET->new( Proto => 'tcp', PeerAddr => 'localhost', PeerPort => 6900 ); $server->autoflush(); $_ = rcv($server); my ($conn_id) = /HELO (.*)/; #print "$conn_id: Connected to server\n"; my ($cmd,$buf); if (defined($cmd = shift)) { snd($server,$cmd); $buf = rcv($server); if (defined($buf)) { print "$buf\n"; snd($server,"quit"); } } else { print "$_\n"; $buf = <STDIN>; snd($server,$buf); while(defined($buf = rcv($server))) { print "$buf\n"; if ($buf ne 'Bye!') { $buf = <STDIN>; snd($server, $buf); } } }