Thanks for all the suggestions. In the end I found this script that uses two non blocking threads and MQTT messages to send and receive info. It does exactly what I needed.
#!/usr/bin/perl
use threads;
use threads::shared;
use JSON;
use WebSphere::MQTT::Client;
use Device::SerialPort;
use Time::HiRes qw(time);
use strict;
# Jeelink port and speed
my $jeelink_port = '/dev/ttyUSB0';
# mqtt configuration
my $mqtt_hostname = 'localhost';
my $mqtt_port = 1883;
# this array is the queu commands to be written to the JeeLink
my @queue : shared = ();
# the thread to subscribes to MQTT messages
sub mqtt {
my $mqtta = new WebSphere::MQTT::Client(
Hostname => $mqtt_hostname,
Port => $mqtt_port,
clientid => 'rf12_received_mqtt',
);
# connect to mqtt server
my $res = $mqtta->connect();
die "Failed to connect: $res\n" if ($res);
# Subscribe to topic to be forwarded to the JeeLink, multiple topi
+cs can be added here
$mqtta->subscribe( 'TIME' );
my @res = ();
for (;;) {
@res = $mqtta->receivePub();
my $obj = from_json($res[1]);
my $topic = $res[0];
my $msg = undef;
if ( $topic eq 'TIME' ) {
# pack the data into binary
my $binary = pack("C C C",$obj->{hour},$obj->{minute},$obj
+->{second});
# convert binary into a command to the Jeelink
$msg = join(',',unpack("C*",$binary)) . ',0s';
}
# encodes for other topics can be added here
# push the message to be sent to the JeeLink on the queue
if ( $msg ) {
lock(@queue);
push(@queue,$msg);
}
}
}
# the thread that talks to the JeeLink
sub serial {
# creat an MQTT server reference
my $mqtt = new WebSphere::MQTT::Client(
Hostname => $mqtt_hostname,
Port => $mqtt_port,
clientid => 'rf12_sender_mqtt',
);
# connect to MQTT server
my $res = $mqtt->connect();
die "Failed to connect: $res\n" if ($res);
# A ping is sent to the MQTT server each five seconds. $ts is used
+ for keeping track of that
my $ts = time();
# Set up the serial port
my $port = Device::SerialPort->new($jeelink_port);
$port->databits(8);
$port->baudrate(57600);
$port->parity("none");
$port->stopbits(1);
# start endless loop
for(;;) {
# look for data on the serial port
my $recv = $port->lookfor();
if ( $recv ) {
# we're only interested in good messages
if ( $recv =~ m/^OK / ) {
my @words = split(' ',$recv);
# create an empty packet
my $obj = {};
shift(@words); # shift OK
$obj->{id} = shift(@words) & 0x1F; # shift and
+ maskthe node id
my $binary = pack("C*",@words);
my $mqtt_topic = undef;
# decode a message received from the energy meter
if ( $obj->{id} == 0x05 ) {
($obj->{realpower},$obj->{apparentpower},$
+obj->{powerfactor},$obj->{vrms},$obj->{irms},$obj->{netfrequency}) =
+unpack("f f f f f f",$binary);
$mqtt_topic = 'POWERMETER';
}
# decode a message received from a room node
elsif ( $obj->{id} == 0x03 ) {
($obj->{light},$obj->{humidity},$obj->{tem
+perature}) = unpack("C C s",$binary);
$obj->{moved} = $obj->{humidity} & 1;
$obj->{humidity} = $obj->{humidity} >> 1;
$obj->{temperature} = $obj->{temperature}
+& 0x03FF;
$mqtt_topic = 'ROOMNODE';
}
# send the message as mqtt message encode in JSON
+
if ( $mqtt_topic ) {
my $result = $mqtt->publish(to_json($obj),$mqtt_to
+pic);
# record the time we sent the last message
$ts = time();
}
}
}
# if no message was sent each 5 seconds, just send a status me
+ssage to keep us connected to the server
if ( time() - $ts > 5 ) {
$mqtt->status();
$ts = time();
}
# check for new packets on the queue
{
my $msg = undef;
{
lock(@queue);
if ( scalar(@queue) > 0 ) {
$msg = shift(@queue);
}
}
# write the message to the serial port
if ( $msg ) {
$port->write($msg . "\n");
}
}
}
}
# start both threads
my $thr1 = threads->new(\&mqtt);
my $thr2 = threads->new(\&serial);
$thr1->join;
$thr2->join;
|