This forum uses cookies
This forum makes use of cookies to store your login information if you are registered, and your last visit if you are not. Cookies are small text documents stored on your computer; the cookies set by this forum can only be used on this website and pose no security risk. Cookies on this forum also track the specific topics you have read and when you last read them. Please confirm that you accept these cookies being set.

MQTT protocol using resident scripts
#1
Hello,

I have a few questions about the MQTT protocol.
I’ve tried setting the LM up so that it subscribes to each topic associated with a corresponding group address using a resident script and setting the interval to 0.
Then I tried publishing 40 topics, but it took approximately 2 seconds for the process to finish, and from the logs I can tell that it is taking time between the end of a single iteration of client.ON_MESSAGE and the beginning of the next iteration. However, I am not sure why that is the case. Any ideas on why it is taking some time between iterations within the for-loop? I’ve tried 2 variations of the script but had no luck.

Script 1:
Code:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
require('json') broker = "xx.xxx.xxx.xxx" port = 1883 username = "xxx" password = "yyy" topic = "floor1/room1" mqtt = require("mosquitto") client = mqtt.new() client.ON_CONNECT = function(status, rc, msg) -- true, 0, string connection  accepted     if status then     log("mqtt connected")      client:subscribe("floor1/room1/0")     client:subscribe("floor1/room1/1")     client:subscribe("floor1/room1/2")     client:subscribe("floor1/room1/3")     client:subscribe("floor1/room1/4")     client:subscribe("floor1/room1/5")     client:subscribe("floor1/room1/6")     client:subscribe("floor1/room1/7")     client:subscribe("floor1/room1/8")     client:subscribe("floor1/room1/9")     client:subscribe("floor1/room1/10")   else     log("mqtt connect failed " .. tostring(msg))     client:disconnect()   end end client.ON_MESSAGE = function(mid, topic, payload_string)     log("結果" .. payload_string)   decoded = json.decode(payload_string)       for i,m in ipairs(decoded.commands) do     local ms = string.match(tostring(os.clock()), "%d%.(%d+)")      log("書き込み開始  group_address: "..tostring(m.alias).. ", value: " ..tostring(m.value).. ",   time:  " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)      grp.write(m.alias, m.value)      log("書き込み終了  group_address: "..tostring(m.alias).. ", value: " ..tostring(m.value).. ",   time:  " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)     end   end client:login_set(username, password) status, rc, msg = client:connect(broker, port) if status then   client:loop_forever() else   log("connect failed: " .. tostring(msg)) end

Script 2
Code:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
if not broker then   broker = 'xx.xxx.xxx.xxx'   function multiply(mult)     return function(value)       local num = tonumber(value)       if num then         return num * mult       else         return value       end     end   end   -- topic to object map   mqtt_to_object = {     ['floor1/room1/0'] = '0/0/2',     ['floor1/room1/1'] = '0/0/5',      ['floor1/room1/2'] = '0/0/10',      ['floor1/room1/3'] = '0/0/11',      ['floor1/room1/4'] = '0/0/12',      ['floor1/room1/5'] = '0/0/13',   }   -- optional topic value conversion function   mqtt_to_object_conv = {     ['floor1/room1/0'] = multiply(100),     ['floor1/room1/1'] = multiply(0.01),   }   -- object to topic map   object_to_mqtt = {     ['floor1/room1/status'] = 'floor1/room1/0',     ['floor1/room1/status'] = 'floor1/room1/1',      ['floor1/room1/status'] = 'floor1/room1/2',      ['floor1/room1/status'] = 'floor1/room1/3',      ['floor1/room1/status'] = 'floor1/room1/4',      ['floor1/room1/status'] = 'floor1/room1/5',   }   datatypes = {}   grp.sender = 'mq'   require('socket')   for addr, _ in pairs(object_to_mqtt) do     local obj = grp.find(addr)     if obj then       datatypes[ addr ] = obj.datatype     end   end   mclient = require('mosquitto').new()   mclient.ON_CONNECT = function(res, ...)     log('mqtt connect status', res, ...)     if res then       for topic, _ in pairs(mqtt_to_object) do         mclient:subscribe(topic)       end     else       mclient:disconnect()     end   end   mclient.ON_MESSAGE = function(mid, topic, payload)     local addr = mqtt_to_object[ topic ]     if addr then       local ms = string.match(tostring(os.clock()), "%d%.(%d+)")       log("書き込み開始  group_address: "..tostring(topic).. ", value: " ..tostring(payload).. ",   time:  " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)       grp.write(addr, payload)       log("書き込み終了  group_address: "..tostring(topic).. ", value: " ..tostring(payload).. ",   time:  " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)     end   end   mclient.ON_DISCONNECT = function(...)     log('mqtt disconnect', ...)     mclientfd = nil   end   function mconnect()     local fd     mclient:connect(broker)     fd = mclient:socket()     -- fd ref is valid     if fd then       mclientfd = fd     end   end   mconnect()   function publishvalue(event)     -- message from us or client is not connected     if event.sender == 'mq' or not mclientfd then       return     end     local addr = event.dst     local dpt = datatypes[ addr ]     local topic = object_to_mqtt[ addr ]     -- unknown object     if not dpt or not topic then       return     end     local value = busdatatype.decode(event.datahex, dpt)     if value ~= nil then       if type(value) == 'boolean' then         value = value and 1 or 0       end       mclient:publish(topic, tostring(value))     end   end   lbclient = require('localbus').new(1)   lbclient:sethandler('groupwrite', publishvalue)   lbclientfd = socket.fdmaskset(lbclient:getfd(), 'r')   -- run timer every 5 seconds   timer = require('timerfd').new(5)   timerfd = socket.fdmaskset(timer:getfd(), 'r') end -- mqtt connected if mclientfd then   mclientfdset = socket.fdmaskset(mclientfd, mclient:want_write() and 'rw' or 'r')   res, lbclientstat, timerstat, mclientstat =       socket.selectfds(10, lbclientfd, timerfd, mclientfdset) -- mqtt not connected else   res, lbclientstat, timerstat =     socket.selectfds(10, lbclientfd, timerfd) end if mclientfd and mclientstat then   if socket.fdmaskread(mclientstat) then     mclient:loop_read()   end   if socket.fdmaskwrite(mclientstat) then     mclient:loop_write()   end end if lbclientstat then   lbclient:step() end if timerstat then   -- clear armed timer   timer:read()   if mclientfd then     mclient:loop_misc()   else     mconnect()   end end

For publishing, I am currently using the below code. I am using a for-loop to publish topics.
Code:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
const mqttPublishA = () => {     const aaa = [       {"commands":[{"alias":"0/0/2","value":0}]},       {"commands":[{"alias":"0/0/5","value":0}]},       {"commands":[{"alias":"0/0/10","value":0}]},       {"commands":[{"alias":"0/0/11","value":0}]},       {"commands":[{"alias":"0/0/12","value":0}]},       {"commands":[{"alias":"0/0/13","value":0}]},       {"commands":[{"alias":"0/0/15","value":0}]},       {"commands":[{"alias":"0/0/17","value":0}]},       {"commands":[{"alias":"0/0/18","value":0}]},       {"commands":[{"alias":"0/0/20","value":0}]},       {"commands":[{"alias":"0/0/21","value":0}]},       {"commands":[{"alias":"0/0/22","value":0}]},       {"commands":[{"alias":"0/0/23","value":0}]},       {"commands":[{"alias":"0/0/24","value":0}]},       {"commands":[{"alias":"1/0/2","value":0}]},       {"commands":[{"alias":"1/0/3","value":0}]},       {"commands":[{"alias":"1/0/4","value":0}]},       {"commands":[{"alias":"1/0/5","value":0}]},       {"commands":[{"alias":"1/0/6","value":0}]},       {"commands":[{"alias":"1/0/7","value":0}]},       {"commands":[{"alias":"1/0/8","value":0}]},       {"commands":[{"alias":"1/0/9","value":0}]},       {"commands":[{"alias":"1/0/10","value":0}]},       {"commands":[{"alias":"1/0/11","value":0}]},       {"commands":[{"alias":"1/0/12","value":0}]},       {"commands":[{"alias":"1/0/13","value":0}]},       {"commands":[{"alias":"1/0/15","value":0}]},       {"commands":[{"alias":"1/0/16","value":0}]},     ]     aaa.forEach((element, index) => mqttPublish({       topic: `floor1/room1/${index}`,       qos: 0,       payload: JSON.stringify(element),     }));   }   const mqttPublish = (context) => {     console.log(context)     if (client) {       const { topic, qos, payload } = context;       client.publish(topic, payload, { qos }, error => {         if (error) {           console.log('Publish error: ', error);         }       });     }   }


Also, are there other ways to make the MQTT protocol process faster in general?
For example,
  • parallelize the subscribe process
  • create a new resident script for each group address, separating the subscribe process
Pretty much new to LM and MQTT, so any help is welcome.
Thank you in advance!
Reply
#2
Are you using an external broker? If so, have you tried using internal broker in LM? With the same script I get around 100 messages per second. Also check that LM is not overloaded with some other script that can cause a slow-down.
Keep in mind that having many log calls inside the script will also slow it down somewhat.
Reply
#3
Hi, admin!

Me and archang is working on the same team.

I tried using MQTT Broker app in LM but I cannot figure out how to connect to the internal broker from the LM scripts.

I've tried connecting broker IP 127.0.0.1 but got error message "mqtt connect failed connection refused - Connection Refused: not authorised."

I didn't set any username or password in MQTT Broker app config.

Could you share the script that works on your side and got around 100 messsages per sec ?
Reply
#4
For testing you can enable "Allow anonymous connections (no username/password)" and remove client:login_set(...) from your script.

Instead of adding log calls to the script you can enable logging for a certain object that will be written to via MQTT. Then you can check the timestamps in Object logs to determine how many telegrams are handled per second.

Code:
123456789101112131415161718192021222324252627282930
require('json') broker = '127.0.0.1' port = 1883 mqtt = require('mosquitto') client = mqtt.new() client.ON_CONNECT = function(status, rc, msg)   log('connect', status, rc, msg)   if status then     client:subscribe('#')   else     client:disconnect()   end end client.ON_MESSAGE = function(mid, topic, payload)   local decoded = json.pdecode(payload)   if type(decoded) == 'table' then     for i, m in ipairs(decoded.commands) do       grp.write(m.alias, m.value)     end   end end status, rc, msg = client:connect(broker, port) client:loop_forever()

Some further improvements can be made with object datatype caching. Otherwise each grp.write call requires a database query to determine the object data type. But keep in mind that the main bottleneck is KNX/TP low speed.
Reply
#5
Hi Admin,

Thank you for your comment and for your code!
I am able to connect to the internal MQTT Broker now.

I also tried not to use log in script and did the object logging instead.
However, executing grp.write() for 28 group addresses still took 2 seconds.

I don't have other resident scripts and these group addresses are not associated with event scripts.
I don't think I can get around 100 messages per second ... am I missing something ?
Reply
#6
As I've already mentioned there main bottleneck is KNX/TP. In theory it can handle around 46 telegrams per second. But it is recommended to keep the bus load at 80% maximum. Otherwise if there's a large burst of telegrams some of them can be lost. There's also some overhead when sending from LM so I would say around 30 is a reasonable limit.
Reply
#7
I see.
Do you think switching the KNX connection mode to IP Router from TP-UART might make the situation better?
(Which KNX connection mode did you use when you got around 100 messages per second? )
Reply
#8
KNX connection type does not matter. Telegrams bound to KNX/TP have a separate queue but it's possible to overflow it if sending too many telegrams at once.
Can you explain why 100 telegrams per second are needed? Are you using single LM in a large KNX/IP network? In this case I would recommend splitting the network into several segments and using multiple LMs.
Reply


Forum Jump: