MQTT CLIENT AutoRefreshing - emme - 19.03.2024
Ciao,
I was willing to find a way by whitch MQTT client script would autorefresh its configuration, so I made some changes to the MQTT script available in
https://kb.logicmachine.net/integration/mqtt-client/
My changes works in this way:
the script use the object name as topic and define a local table to convert values in case of need
I use the object comment field to input a short lua table definition that will be parsed by the script.
comments syntax is (starting at col 1):
Code: config = { 'topic', 'conversion method' }
conversion methos should be one key in the CONVERTER table
this is my code for handler script:
Code: local BROKERADDRESS = '127.0.0.1'
local BROKERUSER = 'userid' -- use nil if no user/pass needed
local BROKERPASSWD = 'password' -- use nil if no user/pass needed
-- VALUE CONVERSION TABLE
local CONVERTER = {
['toBoolString'] = function(input_state) -- true >> 'true' / false >> 'false'
return tostring(input_state)
end
,['BooltoString'] = function(input_state) -- true >> 'ON' / false >> 'OFF'
return (input_state and "ON" or "OFF")
end
,['StringtoBool'] = function(input_state)
return (input_state:upper() =='ON' or input_state:upper() == 'TRUE' and true or false)
end
,['BooltoNum'] = function(input_state) -- true >> 1 / false >> 0
return (input_state and 1 or 0)
end
,['NumtoBool'] = function(input_state) -- 1 >> true / 0 >> false
return toboolean(input_state)
end
,['HextoDec'] = function(input_state) -- #FFFFFF / 16777215
return tonumber(input_state:gsub('#',''), 16)
end
,['DectoHex'] = function(input_state) -- 16777215 / #FFFFFF
return string.format('#%X',input_state):upper()
end
,['toNumber'] = function(input_state) -- '1' / 1
return tonumber(input_state)
end
}
function converter(input_state, conversion_request)
if CONVERTER[conversion_request] then
return CONVERTER[conversion_request](input_state)
else
return input_state
end
end
if not broker then
broker = BROKERADDRESS
username,password = BROKERUSER, BROKERPASSWD
-- topic to object map
mqtt_to_object = {}
-- optional topic value conversion function
mqtt_to_object_conv = {}
-- object to topic map
object_to_mqtt = {}
objs = grp.tag('mqtt-control')
for _, obj in ipairs(objs) do
if string.find(obj.comment,"config") == 1 then
assert(loadstring(obj.comment))()
object_to_mqtt[ obj.address ] = {
topic = config[1],
convert = config[2]
}
end
end
objs = grp.tag('mqtt-status')
for _, obj in ipairs(objs) do
if string.find(obj.comment,"config") == 1 then
assert(loadstring(obj.comment))()
mqtt_to_object[ config[1] ] = {
address = obj.address,
convert = config[2]
}
end
end
datatypes = {}
grp.sender = 'mq'
require('socket')
for addr, _ in pairs(object_to_mqtt) do
local obj = grp.find(addr)
if obj then
datatypes[ addr ] = obj.datatype
end
end
mclient = require('mosquitto').new()
if username and password then
mclient:login_set(username,password)
end
mclient.ON_CONNECT = function(res, ...)
log('mqtt connect status', res, ...)
if res then
for topic, _ in pairs(mqtt_to_object) do
mclient:subscribe(topic)
end
else
mclient:disconnect()
end
end
mclient.ON_MESSAGE = function(mid, topic, payload)
if mqtt_to_object[ topic ] then
payload = converter(payload,mqtt_to_object[ topic ].convert)
grp.write(mqtt_to_object[ topic ].address, payload)
end
end
mclient.ON_DISCONNECT = function(...)
log('mqtt disconnect', ...)
mclientfd = nil
end
function mconnect()
local fd
mclient:connect(broker)
fd = mclient:socket()
-- fd ref is valid
if fd then
mclientfd = fd
end
end
mconnect()
function publishvalue(event)
-- message from us or client is not connected
if (event.sender == 'mq' or not mclientfd ) or not object_to_mqtt[ event.dst ] then
return
end
local addr = event.dst
local dpt = datatypes[ addr ]
local topic = object_to_mqtt[ addr ].topic
-- unknown object
if not dpt or not topic then
return
end
local value = busdatatype.decode(event.datahex, dpt)
log(value)
if value ~= nil then
if object_to_mqtt[ addr ].convert then value =converter(value, object_to_mqtt[ addr ].convert) end
mclient:publish(topic, tostring(value))
end
end
lbclient = require('localbus').new(1)
lbclient:sethandler('groupwrite', publishvalue)
lbclientfd = socket.fdmaskset(lbclient:getfd(), 'r')
-- run timer every 5 seconds
timer = require('timerfd').new(5)
timerfd = socket.fdmaskset(timer:getfd(), 'r')
end
-- mqtt connected
if mclientfd then
mclientfdset = socket.fdmaskset(mclientfd, mclient:want_write() and 'rw' or 'r')
res, lbclientstat, timerstat, mclientstat =
socket.selectfds(10, lbclientfd, timerfd, mclientfdset)
-- mqtt not connected
else
res, lbclientstat, timerstat =
socket.selectfds(10, lbclientfd, timerfd)
end
if mclientfd and mclientstat then
if socket.fdmaskread(mclientstat) then
mclient:loop_read()
end
if socket.fdmaskwrite(mclientstat) then
mclient:loop_write()
end
end
if lbclientstat then
lbclient:step()
end
if timerstat then
-- clear armed timer
timer:read()
if mclientfd then
mclient:loop_misc()
else
mconnect()
end
end
this script shuld be a resident at 0 sec pause and named LMIta_MQTT_Handler
Next, as a scheduled (I use every 5 mins, but you can choose yours) the following code
Code: local STORAGE_FILE_IN = 'lmita_mqtt_in'
local STORAGE_FILE_OUT = 'lmita_mqtt_out'
local SCRIPT_HANDLE_NAME = 'LMIta_MQTT_Handler' -- handler script name!
local TABSTORE_IN = storage.get(STORAGE_FILE_IN, nil)
local TABSTORE_OUT = storage.get(STORAGE_FILE_OUT, nil)
local TABLOCAL_IN = {}
local TABLOCAL_OUT = {}
local J = require('json')
local function compareTables(tableA, tableB)
local compareResult = true -- false > not equals / true > equals
-- verifying input parameters
if type(tableA) ~= 'table' or type(tableB) ~= 'table' then return false end
-- start Verifying tableA is in tableB
for index, value in pairs(tableA) do
if J.encode(tableB[index]) ~= J.encode(value) then
compareResult = false
break
end
end
-- start Verifying tableB is in tableA
if compareResult == true then
for index, value in pairs(tableB) do
if J.encode(tableA[index]) ~= J.encode(value) then
compareResult = false
break
end
end
end
return compareResult
end
function buildLocal()
objs = grp.tag('mqtt-control')
for _, obj in ipairs(objs) do
if string.find(obj.comment,"config") == 1 then
assert(loadstring(obj.comment))()
TABLOCAL_OUT[ obj.address ] = {
topic = config[1],
convert = config[2]
}
end
end
objs = grp.tag('mqtt-status')
for _, obj in ipairs(objs) do
if string.find(obj.comment,"config") == 1 then
assert(loadstring(obj.comment))()
TABLOCAL_IN[ config[1] ] = {
address = obj.address,
convert = config[2]
}
end
end
end
buildLocal()
if not compareTables(TABSTORE_IN, TABLOCAL_IN) or not compareTables(TABSTORE_OUT, TABLOCAL_OUT) then
storage.set(STORAGE_FILE_IN, TABLOCAL_IN)
storage.set(STORAGE_FILE_OUT, TABLOCAL_OUT)
log("Config changed, restarting script")
script.disable(SCRIPT_HANDLE_NAME)
os.sleep(1)
script.enable(SCRIPT_HANDLE_NAME)
end
this script with check che tag tables and compare it to a copy stored, if it does not match (different keys or content) then it will save into a storage the new configuration and restart the handler script
I've tested it with several oblects and different conversions and run quite good
It could has still some bugs... but the main code is this...
hope this could help you
Ciao
M
|