This forum uses cookies
This forum makes use of cookies to store your login information if you are registered, and your last visit if you are not. Cookies are small text documents stored on your computer; the cookies set by this forum can only be used on this website and pose no security risk. Cookies on this forum also track the specific topics you have read and when you last read them. Please confirm that you accept these cookies being set.

MQTT CLIENT AutoRefreshing
#1
Ciao,

I was willing to find a way by whitch MQTT client script would autorefresh its configuration, so I made some changes to the MQTT script available in

https://kb.logicmachine.net/integration/mqtt-client/

My changes works in this way:
the script use the object name as topic and define a local table to convert values in case of need

I use the object comment field to input a short lua table definition that will be parsed by the script.
comments syntax is (starting at col 1):
Code:
config = { 'topic', 'conversion method' }
 conversion methos should be one key in the CONVERTER table
this is my code for handler script:
Code:
local BROKERADDRESS = '127.0.0.1'
local BROKERUSER    = 'userid'    -- use nil if no user/pass needed
local BROKERPASSWD  = 'password'  -- use nil if no user/pass needed


-- VALUE CONVERSION TABLE
local CONVERTER = {
   ['toBoolString']    =    function(input_state)        -- true >> 'true' / false >> 'false'
                          return tostring(input_state)
                      end
  ,['BooltoString']    = function(input_state)            -- true >> 'ON'   / false >> 'OFF'
                            return (input_state and "ON" or "OFF") 
                     end
  ,['StringtoBool'] = function(input_state)
                          return (input_state:upper() =='ON' or input_state:upper() == 'TRUE' and true or false)
                        end
  ,['BooltoNum']      = function(input_state)        -- true >> 1      / false >> 0
                          return (input_state and 1 or 0)
                        end
  ,['NumtoBool']         = function(input_state)            -- 1 >> true            / 0 >> false
                          return toboolean(input_state)
                        end
  ,['HextoDec']     = function(input_state)                                        -- #FFFFFF                / 16777215
                                                return tonumber(input_state:gsub('#',''), 16)
                                          end
  ,['DectoHex']     = function(input_state)                                        -- 16777215       / #FFFFFF
                                                return string.format('#%X',input_state):upper()
                                          end
  ,['toNumber']     = function(input_state)                                        -- '1'                        / 1
                                            return tonumber(input_state)
                                            end
}

function converter(input_state, conversion_request)
  if CONVERTER[conversion_request] then
    return CONVERTER[conversion_request](input_state)
  else
    return input_state
  end
end

if not broker then
  broker = BROKERADDRESS
    username,password = BROKERUSER, BROKERPASSWD
 
  -- topic to object map
  mqtt_to_object = {}

  -- optional topic value conversion function
  mqtt_to_object_conv = {}

  -- object to topic map
  object_to_mqtt = {}

  objs = grp.tag('mqtt-control')
  for _, obj in ipairs(objs) do
    if string.find(obj.comment,"config") == 1 then
      assert(loadstring(obj.comment))()
        object_to_mqtt[ obj.address ] = {
          topic   = config[1],
        convert =  config[2]
      }
    end
  end

  objs = grp.tag('mqtt-status')
  for _, obj in ipairs(objs) do
    if string.find(obj.comment,"config") == 1 then
      assert(loadstring(obj.comment))()
        mqtt_to_object[ config[1] ] = {
          address = obj.address,
        convert =  config[2]
      }
    end
  end

  datatypes = {}

  grp.sender = 'mq'
  require('socket')

  for addr, _ in pairs(object_to_mqtt) do
    local obj = grp.find(addr)
    if obj then
      datatypes[ addr ] = obj.datatype
    end
  end

  mclient = require('mosquitto').new()
    if username and password then
    mclient:login_set(username,password)
  end
 
  mclient.ON_CONNECT = function(res, ...)
    log('mqtt connect status', res, ...)

    if res then
      for topic, _ in pairs(mqtt_to_object) do
        mclient:subscribe(topic)
      end
    else
      mclient:disconnect()
    end
  end

  mclient.ON_MESSAGE = function(mid, topic, payload)
    if mqtt_to_object[ topic ] then
        payload =  converter(payload,mqtt_to_object[ topic ].convert)
     
      grp.write(mqtt_to_object[ topic ].address, payload)
    end
  end

  mclient.ON_DISCONNECT = function(...)
    log('mqtt disconnect', ...)
    mclientfd = nil
  end

  function mconnect()
    local fd

    mclient:connect(broker)
    fd = mclient:socket()

    -- fd ref is valid
    if fd then
      mclientfd = fd
    end
  end

  mconnect()

  function publishvalue(event)
    -- message from us or client is not connected
    if (event.sender == 'mq' or not mclientfd ) or not object_to_mqtt[ event.dst ] then
      return
    end

    local addr = event.dst
    local dpt = datatypes[ addr ]
    local topic = object_to_mqtt[ addr ].topic

    -- unknown object
    if not dpt or not topic then
      return
    end

    local value = busdatatype.decode(event.datahex, dpt)
    log(value)
    if value ~= nil then
     
      if object_to_mqtt[ addr ].convert then value =converter(value, object_to_mqtt[ addr ].convert) end
      mclient:publish(topic, tostring(value))
    end
  end

  lbclient = require('localbus').new(1)
  lbclient:sethandler('groupwrite', publishvalue)

  lbclientfd = socket.fdmaskset(lbclient:getfd(), 'r')

  -- run timer every 5 seconds
  timer = require('timerfd').new(5)
  timerfd = socket.fdmaskset(timer:getfd(), 'r')
end

-- mqtt connected
if mclientfd then
  mclientfdset = socket.fdmaskset(mclientfd, mclient:want_write() and 'rw' or 'r')
  res, lbclientstat, timerstat, mclientstat =
      socket.selectfds(10, lbclientfd, timerfd, mclientfdset)
-- mqtt not connected
else
  res, lbclientstat, timerstat =
    socket.selectfds(10, lbclientfd, timerfd)
end

if mclientfd and mclientstat then
  if socket.fdmaskread(mclientstat) then
    mclient:loop_read()
  end

  if socket.fdmaskwrite(mclientstat) then
    mclient:loop_write()
  end
end

if lbclientstat then
  lbclient:step()
end

if timerstat then
  -- clear armed timer
  timer:read()

  if mclientfd then
    mclient:loop_misc()
  else
    mconnect()
  end
end

this script shuld be a resident at 0 sec pause and named LMIta_MQTT_Handler

Next, as a scheduled (I use every 5 mins, but you can choose yours) the following code
Code:
local STORAGE_FILE_IN    = 'lmita_mqtt_in'
local STORAGE_FILE_OUT   = 'lmita_mqtt_out'
local SCRIPT_HANDLE_NAME = 'LMIta_MQTT_Handler'   -- handler script name!

local TABSTORE_IN  = storage.get(STORAGE_FILE_IN, nil)
local TABSTORE_OUT = storage.get(STORAGE_FILE_OUT, nil)

local TABLOCAL_IN  = {}
local TABLOCAL_OUT = {}

local J = require('json')
local function compareTables(tableA, tableB)
  local compareResult = true                            -- false > not equals / true > equals
 
  -- verifying input parameters
  if type(tableA) ~= 'table' or type(tableB) ~= 'table' then return false end
 
  -- start Verifying tableA is in tableB
  for index, value in pairs(tableA) do
    if J.encode(tableB[index]) ~= J.encode(value) then
          compareResult = false
          break
    end
  end
  -- start Verifying tableB is in tableA
  if compareResult == true then
    for index, value in pairs(tableB) do
      if J.encode(tableA[index]) ~= J.encode(value) then
          compareResult = false
          break
      end
    end
  end
  return compareResult 
end

function buildLocal()
  objs = grp.tag('mqtt-control')
  for _, obj in ipairs(objs) do
    if string.find(obj.comment,"config") == 1 then
      assert(loadstring(obj.comment))()
        TABLOCAL_OUT[ obj.address ] = {
          topic   = config[1],
        convert =  config[2]
      }
    end
  end

  objs = grp.tag('mqtt-status')
  for _, obj in ipairs(objs) do
    if string.find(obj.comment,"config") == 1 then
      assert(loadstring(obj.comment))()
        TABLOCAL_IN[ config[1] ] = {
          address = obj.address,
        convert =  config[2]
      }
    end
  end
end

buildLocal()

if not compareTables(TABSTORE_IN, TABLOCAL_IN) or not compareTables(TABSTORE_OUT, TABLOCAL_OUT) then
  storage.set(STORAGE_FILE_IN, TABLOCAL_IN)
  storage.set(STORAGE_FILE_OUT, TABLOCAL_OUT)
  log("Config changed, restarting script")
  script.disable(SCRIPT_HANDLE_NAME)
  os.sleep(1)
  script.enable(SCRIPT_HANDLE_NAME)

end

this script with check che tag tables and compare it to a copy stored, if it does not match (different keys or content) then it will save into a storage the new configuration and restart the handler script


I've tested it with several oblects and different conversions and run quite good

It could has still some bugs... but the main code is this...

hope this could help you
Ciao
M
Reply


Forum Jump: