This forum uses cookies
This forum makes use of cookies to store your login information if you are registered, and your last visit if you are not. Cookies are small text documents stored on your computer; the cookies set by this forum can only be used on this website and pose no security risk. Cookies on this forum also track the specific topics you have read and when you last read them. Please confirm that you accept these cookies being set.

MQTT protocol using resident scripts
#1
Hello,

I have a few questions about the MQTT protocol.
I’ve tried setting the LM up so that it subscribes to each topic associated with a corresponding group address using a resident script and setting the interval to 0.
Then I tried publishing 40 topics, but it took approximately 2 seconds for the process to finish, and from the logs I can tell that it is taking time between the end of a single iteration of client.ON_MESSAGE and the beginning of the next iteration. However, I am not sure why that is the case. Any ideas on why it is taking some time between iterations within the for-loop? I’ve tried 2 variations of the script but had no luck.

Script 1:
Code:
require('json') broker = "xx.xxx.xxx.xxx" port = 1883 username = "xxx" password = "yyy" topic = "floor1/room1" mqtt = require("mosquitto") client = mqtt.new() client.ON_CONNECT = function(status, rc, msg) -- true, 0, string connection  accepted     if status then     log("mqtt connected")      client:subscribe("floor1/room1/0")     client:subscribe("floor1/room1/1")     client:subscribe("floor1/room1/2")     client:subscribe("floor1/room1/3")     client:subscribe("floor1/room1/4")     client:subscribe("floor1/room1/5")     client:subscribe("floor1/room1/6")     client:subscribe("floor1/room1/7")     client:subscribe("floor1/room1/8")     client:subscribe("floor1/room1/9")     client:subscribe("floor1/room1/10")   else     log("mqtt connect failed " .. tostring(msg))     client:disconnect()   end end client.ON_MESSAGE = function(mid, topic, payload_string)     log("結果" .. payload_string)   decoded = json.decode(payload_string)       for i,m in ipairs(decoded.commands) do     local ms = string.match(tostring(os.clock()), "%d%.(%d+)")      log("書き込み開始  group_address: "..tostring(m.alias).. ", value: " ..tostring(m.value).. ",   time:  " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)      grp.write(m.alias, m.value)      log("書き込み終了  group_address: "..tostring(m.alias).. ", value: " ..tostring(m.value).. ",   time:  " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)     end   end client:login_set(username, password) status, rc, msg = client:connect(broker, port) if status then   client:loop_forever() else   log("connect failed: " .. tostring(msg)) end

Script 2
Code:
if not broker then   broker = 'xx.xxx.xxx.xxx'   function multiply(mult)     return function(value)       local num = tonumber(value)       if num then         return num * mult       else         return value       end     end   end   -- topic to object map   mqtt_to_object = {     ['floor1/room1/0'] = '0/0/2',     ['floor1/room1/1'] = '0/0/5',      ['floor1/room1/2'] = '0/0/10',      ['floor1/room1/3'] = '0/0/11',      ['floor1/room1/4'] = '0/0/12',      ['floor1/room1/5'] = '0/0/13',   }   -- optional topic value conversion function   mqtt_to_object_conv = {     ['floor1/room1/0'] = multiply(100),     ['floor1/room1/1'] = multiply(0.01),   }   -- object to topic map   object_to_mqtt = {     ['floor1/room1/status'] = 'floor1/room1/0',     ['floor1/room1/status'] = 'floor1/room1/1',      ['floor1/room1/status'] = 'floor1/room1/2',      ['floor1/room1/status'] = 'floor1/room1/3',      ['floor1/room1/status'] = 'floor1/room1/4',      ['floor1/room1/status'] = 'floor1/room1/5',   }   datatypes = {}   grp.sender = 'mq'   require('socket')   for addr, _ in pairs(object_to_mqtt) do     local obj = grp.find(addr)     if obj then       datatypes[ addr ] = obj.datatype     end   end   mclient = require('mosquitto').new()   mclient.ON_CONNECT = function(res, ...)     log('mqtt connect status', res, ...)     if res then       for topic, _ in pairs(mqtt_to_object) do         mclient:subscribe(topic)       end     else       mclient:disconnect()     end   end   mclient.ON_MESSAGE = function(mid, topic, payload)     local addr = mqtt_to_object[ topic ]     if addr then       local ms = string.match(tostring(os.clock()), "%d%.(%d+)")       log("書き込み開始  group_address: "..tostring(topic).. ", value: " ..tostring(payload).. ",   time:  " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)       grp.write(addr, payload)       log("書き込み終了  group_address: "..tostring(topic).. ", value: " ..tostring(payload).. ",   time:  " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)     end   end   mclient.ON_DISCONNECT = function(...)     log('mqtt disconnect', ...)     mclientfd = nil   end   function mconnect()     local fd     mclient:connect(broker)     fd = mclient:socket()     -- fd ref is valid     if fd then       mclientfd = fd     end   end   mconnect()   function publishvalue(event)     -- message from us or client is not connected     if event.sender == 'mq' or not mclientfd then       return     end     local addr = event.dst     local dpt = datatypes[ addr ]     local topic = object_to_mqtt[ addr ]     -- unknown object     if not dpt or not topic then       return     end     local value = busdatatype.decode(event.datahex, dpt)     if value ~= nil then       if type(value) == 'boolean' then         value = value and 1 or 0       end       mclient:publish(topic, tostring(value))     end   end   lbclient = require('localbus').new(1)   lbclient:sethandler('groupwrite', publishvalue)   lbclientfd = socket.fdmaskset(lbclient:getfd(), 'r')   -- run timer every 5 seconds   timer = require('timerfd').new(5)   timerfd = socket.fdmaskset(timer:getfd(), 'r') end -- mqtt connected if mclientfd then   mclientfdset = socket.fdmaskset(mclientfd, mclient:want_write() and 'rw' or 'r')   res, lbclientstat, timerstat, mclientstat =       socket.selectfds(10, lbclientfd, timerfd, mclientfdset) -- mqtt not connected else   res, lbclientstat, timerstat =     socket.selectfds(10, lbclientfd, timerfd) end if mclientfd and mclientstat then   if socket.fdmaskread(mclientstat) then     mclient:loop_read()   end   if socket.fdmaskwrite(mclientstat) then     mclient:loop_write()   end end if lbclientstat then   lbclient:step() end if timerstat then   -- clear armed timer   timer:read()   if mclientfd then     mclient:loop_misc()   else     mconnect()   end end

For publishing, I am currently using the below code. I am using a for-loop to publish topics.
Code:
const mqttPublishA = () => {     const aaa = [       {"commands":[{"alias":"0/0/2","value":0}]},       {"commands":[{"alias":"0/0/5","value":0}]},       {"commands":[{"alias":"0/0/10","value":0}]},       {"commands":[{"alias":"0/0/11","value":0}]},       {"commands":[{"alias":"0/0/12","value":0}]},       {"commands":[{"alias":"0/0/13","value":0}]},       {"commands":[{"alias":"0/0/15","value":0}]},       {"commands":[{"alias":"0/0/17","value":0}]},       {"commands":[{"alias":"0/0/18","value":0}]},       {"commands":[{"alias":"0/0/20","value":0}]},       {"commands":[{"alias":"0/0/21","value":0}]},       {"commands":[{"alias":"0/0/22","value":0}]},       {"commands":[{"alias":"0/0/23","value":0}]},       {"commands":[{"alias":"0/0/24","value":0}]},       {"commands":[{"alias":"1/0/2","value":0}]},       {"commands":[{"alias":"1/0/3","value":0}]},       {"commands":[{"alias":"1/0/4","value":0}]},       {"commands":[{"alias":"1/0/5","value":0}]},       {"commands":[{"alias":"1/0/6","value":0}]},       {"commands":[{"alias":"1/0/7","value":0}]},       {"commands":[{"alias":"1/0/8","value":0}]},       {"commands":[{"alias":"1/0/9","value":0}]},       {"commands":[{"alias":"1/0/10","value":0}]},       {"commands":[{"alias":"1/0/11","value":0}]},       {"commands":[{"alias":"1/0/12","value":0}]},       {"commands":[{"alias":"1/0/13","value":0}]},       {"commands":[{"alias":"1/0/15","value":0}]},       {"commands":[{"alias":"1/0/16","value":0}]},     ]     aaa.forEach((element, index) => mqttPublish({       topic: `floor1/room1/${index}`,       qos: 0,       payload: JSON.stringify(element),     }));   }   const mqttPublish = (context) => {     console.log(context)     if (client) {       const { topic, qos, payload } = context;       client.publish(topic, payload, { qos }, error => {         if (error) {           console.log('Publish error: ', error);         }       });     }   }


Also, are there other ways to make the MQTT protocol process faster in general?
For example,
  • parallelize the subscribe process
  • create a new resident script for each group address, separating the subscribe process
Pretty much new to LM and MQTT, so any help is welcome.
Thank you in advance!
Reply


Messages In This Thread
MQTT protocol using resident scripts - by achang - 23.05.2022, 13:11

Forum Jump: