http://qs1969.pair.com?node_id=11123645


in reply to Apache Pulsar modules

Does anyone have or know of Perl modules to publish data to an Apache Pulsar messaging bus?

I don't see any, sorry. However, according to the Plusar docs:

Pulsar's WebSocket API is meant to provide a simple way to interact with Pulsar using languages that do not have an official client library. Through WebSockets you can publish and consume messages and use all the features available in the Java, Go, Python and C++ client libraries.

Here's a simple producer and consumer implemented using Mojolicious, based on the Node.js examples in the above link:

consumer.pl

use Mojo::Base -strict, -signatures; use Mojo::UserAgent; use Mojo::IOLoop; use Mojo::Util qw/b64_decode/; use Data::Dump qw/dd pp/; my $ADDR = 'ws://localhost:8080/ws/v2/consumer/persistent/public/defau +lt/my-topic/my-sub'; say "Opening WebSocket..."; my $ua = Mojo::UserAgent->new; $ua->websocket($ADDR => sub ($ua, $tx) { die 'WebSocket handshake failed!'.pp($tx) unless $tx->is_websocket; $tx->on(finish => sub ($tx, $code, $reason=undef) { say "WebSocket closed ", pp($code, $reason); }); $tx->on(json => sub ($tx, $json) { say "Received: ", pp($json); say "Payload: ", pp( b64_decode($json->{payload}) ); say "Sending ack"; $tx->send({ json => { messageId => $json->{messageId} } }); }); }); Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

producer.pl

use Mojo::Base -strict, -signatures; use Mojo::UserAgent; use Mojo::IOLoop; use Mojo::Util qw/b64_encode/; use Data::Dump qw/dd pp/; my $ADDR = 'ws://localhost:8080/ws/v2/producer/persistent/public/defau +lt/my-topic'; say "Opening WebSocket..."; my $ua = Mojo::UserAgent->new; $ua->websocket($ADDR => sub ($ua, $tx) { die 'WebSocket handshake failed!'.pp($tx) unless $tx->is_websocket; $tx->on(finish => sub ($tx, $code, $reason=undef) { say "WebSocket closed ", pp($code, $reason); }); $tx->on(json => sub ($tx, $json) { say "Received ack: ", pp($json); $tx->finish; # close socket after sending test message }); $tx->send({ json => { payload => b64_encode("Hello World", ''), properties => { key1 => "value1", key2 => "value2", }, context => "1", } }); }); Mojo::IOLoop->start unless Mojo::IOLoop->is_running;