Anonymous Monk has asked for the wisdom of the Perl Monks concerning the following question:

Does anyone have or know of Perl modules to publish data to an Apache Pulsar messaging bus?

Replies are listed 'Best First'.
Re: Apache Pulsar modules
by haukex (Archbishop) on Nov 14, 2020 at 12:37 UTC
    Does anyone have or know of Perl modules to publish data to an Apache Pulsar messaging bus?

    I don't see any, sorry. However, according to the Plusar docs:

    Pulsar's WebSocket API is meant to provide a simple way to interact with Pulsar using languages that do not have an official client library. Through WebSockets you can publish and consume messages and use all the features available in the Java, Go, Python and C++ client libraries.

    Here's a simple producer and consumer implemented using Mojolicious, based on the Node.js examples in the above link:

    consumer.pl

    use Mojo::Base -strict, -signatures; use Mojo::UserAgent; use Mojo::IOLoop; use Mojo::Util qw/b64_decode/; use Data::Dump qw/dd pp/; my $ADDR = 'ws://localhost:8080/ws/v2/consumer/persistent/public/defau +lt/my-topic/my-sub'; say "Opening WebSocket..."; my $ua = Mojo::UserAgent->new; $ua->websocket($ADDR => sub ($ua, $tx) { die 'WebSocket handshake failed!'.pp($tx) unless $tx->is_websocket; $tx->on(finish => sub ($tx, $code, $reason=undef) { say "WebSocket closed ", pp($code, $reason); }); $tx->on(json => sub ($tx, $json) { say "Received: ", pp($json); say "Payload: ", pp( b64_decode($json->{payload}) ); say "Sending ack"; $tx->send({ json => { messageId => $json->{messageId} } }); }); }); Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

    producer.pl

    use Mojo::Base -strict, -signatures; use Mojo::UserAgent; use Mojo::IOLoop; use Mojo::Util qw/b64_encode/; use Data::Dump qw/dd pp/; my $ADDR = 'ws://localhost:8080/ws/v2/producer/persistent/public/defau +lt/my-topic'; say "Opening WebSocket..."; my $ua = Mojo::UserAgent->new; $ua->websocket($ADDR => sub ($ua, $tx) { die 'WebSocket handshake failed!'.pp($tx) unless $tx->is_websocket; $tx->on(finish => sub ($tx, $code, $reason=undef) { say "WebSocket closed ", pp($code, $reason); }); $tx->on(json => sub ($tx, $json) { say "Received ack: ", pp($json); $tx->finish; # close socket after sending test message }); $tx->send({ json => { payload => b64_encode("Hello World", ''), properties => { key1 => "value1", key2 => "value2", }, context => "1", } }); }); Mojo::IOLoop->start unless Mojo::IOLoop->is_running;