http://qs1969.pair.com?node_id=11111391

cLive ;-) has asked for the wisdom of the Perl Monks concerning the following question:

I have a requirement to connect to a Kafka server over SSL. I have the certs, private key, and password, but I can't work out how to even test a connection. Looking at the Kafka modules, they use Socket to create the connection, and the Kafka modules don't support SSL.

I've started playing with IO::Socket::SSL to try to connect to the server, but I think I'll have to work out how to do this at the lower level Socket interface, even if I manage to connect using IO::Socket::SSL, because of how Kafka::IO is written. I know very little about socket coding, nor why the decision was made to use Socket over IO::Socket so this feels like a very steep learning curve. So far, I've managed to create the socket, and I can send data to the server, but I'm guessing the server thinks it's just gibberish, because it closes the socket straight away. Or, possibly, I'm not doing whatever it is I need to do with the password to get a valid response.

Does anyone have any pointers that could get me going in the right direction. I did drag out my old copy of Stein's Network programming with Perl, but I don't see any references to SSL in it.

I get no errors when running this at debug level 3:

my $socket = IO::Socket::SSL->new( PeerAddr => $host, PeerPort => 9094, SSL_verify_mode => SSL_VERIFY_NONE, SSL_key_file => './key.pem', SSL_cert_file => './cert.pem', ) or die "failed to connect: $SSL_ERROR"

but that doesn't include using the password (I'm not sure whether that's for the SSL level or Kafka level ATM). Is there something I can print to the socket to get metadata from the server?

I tried looking for info on the raw socket commends that Kafka uses, but I'm only finding API docs right now :/

All pointers welcomed...

update

Got it working with Net::Kafka after following the pointers below. For future reference for those that need it, here's the code I used:
use v5.22; use strictures 2; use JSON::XS qw(decode_json); use Data::Dumper 'Dumper'; use Net::Kafka::Consumer; my $consumer = Net::Kafka::Consumer->new( 'bootstrap.servers' => 'server1.com:9094,server2.com:9094,server +3.com:9094', 'group.id' => 'my_consumer_group', 'security.protocol' => 'SSL', 'ssl.keystore.location' => '/path/to/kafka.client.keystore.p12', 'ssl.keystore.password' => 'password', 'ssl.key.password' => 'password', error_cb => sub { my ($self, $err, $msg) = @_; warn Dumper({ ERR => $err, MSG => $msg }); }, ); # see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION +.md for all options $consumer->subscribe( [ "my_topic" ] ); warn Dumper({ consumer => $consumer }); while (1) { my $msg = $consumer->poll(1000); if ($msg) { if ( my $err = $msg->err ) { say "Error: ", Net::Kafka::Error::to_string($err); } else { say Dumper({ PAYLOAD => decode_json $msg->payload }); } } }
The module is sparsely documented, so I ended up doing a little hit and miss testing to get here.