This forum uses cookies
This forum makes use of cookies to store your login information if you are registered, and your last visit if you are not. Cookies are small text documents stored on your computer; the cookies set by this forum can only be used on this website and pose no security risk. Cookies on this forum also track the specific topics you have read and when you last read them. Please confirm that you accept these cookies being set.

MQTT protocol using resident scripts
#1
Hello,

I have a few questions about the MQTT protocol.
I’ve tried setting the LM up so that it subscribes to each topic associated with a corresponding group address using a resident script and setting the interval to 0.
Then I tried publishing 40 topics, but it took approximately 2 seconds for the process to finish, and from the logs I can tell that it is taking time between the end of a single iteration of client.ON_MESSAGE and the beginning of the next iteration. However, I am not sure why that is the case. Any ideas on why it is taking some time between iterations within the for-loop? I’ve tried 2 variations of the script but had no luck.

Script 1:
Code:
require('json')

broker = "xx.xxx.xxx.xxx"
port = 1883
username = "xxx"
password = "yyy"
topic = "floor1/room1"

mqtt = require("mosquitto")
client = mqtt.new()

client.ON_CONNECT = function(status, rc, msg) -- true, 0, string connection  accepted
 
  if status then
    log("mqtt connected")
     client:subscribe("floor1/room1/0")
    client:subscribe("floor1/room1/1")
    client:subscribe("floor1/room1/2")
    client:subscribe("floor1/room1/3")
    client:subscribe("floor1/room1/4")
    client:subscribe("floor1/room1/5")
    client:subscribe("floor1/room1/6")
    client:subscribe("floor1/room1/7")
    client:subscribe("floor1/room1/8")
    client:subscribe("floor1/room1/9")
    client:subscribe("floor1/room1/10")
  else
    log("mqtt connect failed " .. tostring(msg))
    client:disconnect()
  end
end

client.ON_MESSAGE = function(mid, topic, payload_string)
 
  log("結果" .. payload_string)
  decoded = json.decode(payload_string)
 
    for i,m in ipairs(decoded.commands) do
    local ms = string.match(tostring(os.clock()), "%d%.(%d+)")
     log("書き込み開始  group_address: "..tostring(m.alias).. ", value: " ..tostring(m.value).. ",   time:  " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)
     grp.write(m.alias, m.value)
     log("書き込み終了  group_address: "..tostring(m.alias).. ", value: " ..tostring(m.value).. ",   time:  " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)
    end
  end

client:login_set(username, password)
status, rc, msg = client:connect(broker, port)

if status then
  client:loop_forever()
else
  log("connect failed: " .. tostring(msg))
end

Script 2
Code:
if not broker then
  broker = 'xx.xxx.xxx.xxx'

  function multiply(mult)
    return function(value)
      local num = tonumber(value)
      if num then
        return num * mult
      else
        return value
      end
    end
  end

  -- topic to object map
  mqtt_to_object = {
    ['floor1/room1/0'] = '0/0/2',
    ['floor1/room1/1'] = '0/0/5',
     ['floor1/room1/2'] = '0/0/10',
     ['floor1/room1/3'] = '0/0/11',
     ['floor1/room1/4'] = '0/0/12',
     ['floor1/room1/5'] = '0/0/13',
  }

  -- optional topic value conversion function
  mqtt_to_object_conv = {
    ['floor1/room1/0'] = multiply(100),
    ['floor1/room1/1'] = multiply(0.01),
  }

  -- object to topic map
  object_to_mqtt = {
    ['floor1/room1/status'] = 'floor1/room1/0',
    ['floor1/room1/status'] = 'floor1/room1/1',
     ['floor1/room1/status'] = 'floor1/room1/2',
     ['floor1/room1/status'] = 'floor1/room1/3',
     ['floor1/room1/status'] = 'floor1/room1/4',
     ['floor1/room1/status'] = 'floor1/room1/5',
  }

  datatypes = {}

  grp.sender = 'mq'
  require('socket')

  for addr, _ in pairs(object_to_mqtt) do
    local obj = grp.find(addr)
    if obj then
      datatypes[ addr ] = obj.datatype
    end
  end

  mclient = require('mosquitto').new()

  mclient.ON_CONNECT = function(res, ...)
    log('mqtt connect status', res, ...)

    if res then
      for topic, _ in pairs(mqtt_to_object) do
        mclient:subscribe(topic)
      end
    else
      mclient:disconnect()
    end
  end

  mclient.ON_MESSAGE = function(mid, topic, payload)
    local addr = mqtt_to_object[ topic ]
    if addr then
      local ms = string.match(tostring(os.clock()), "%d%.(%d+)")
      log("書き込み開始  group_address: "..tostring(topic).. ", value: " ..tostring(payload).. ",   time:  " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)
      grp.write(addr, payload)
      log("書き込み終了  group_address: "..tostring(topic).. ", value: " ..tostring(payload).. ",   time:  " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)
    end
  end

  mclient.ON_DISCONNECT = function(...)
    log('mqtt disconnect', ...)
    mclientfd = nil
  end

  function mconnect()
    local fd

    mclient:connect(broker)
    fd = mclient:socket()

    -- fd ref is valid
    if fd then
      mclientfd = fd
    end
  end

  mconnect()

  function publishvalue(event)
    -- message from us or client is not connected
    if event.sender == 'mq' or not mclientfd then
      return
    end

    local addr = event.dst
    local dpt = datatypes[ addr ]
    local topic = object_to_mqtt[ addr ]

    -- unknown object
    if not dpt or not topic then
      return
    end

    local value = busdatatype.decode(event.datahex, dpt)
    if value ~= nil then
      if type(value) == 'boolean' then
        value = value and 1 or 0
      end

      mclient:publish(topic, tostring(value))
    end
  end

  lbclient = require('localbus').new(1)
  lbclient:sethandler('groupwrite', publishvalue)

  lbclientfd = socket.fdmaskset(lbclient:getfd(), 'r')

  -- run timer every 5 seconds
  timer = require('timerfd').new(5)
  timerfd = socket.fdmaskset(timer:getfd(), 'r')
end

-- mqtt connected
if mclientfd then
  mclientfdset = socket.fdmaskset(mclientfd, mclient:want_write() and 'rw' or 'r')
  res, lbclientstat, timerstat, mclientstat =
      socket.selectfds(10, lbclientfd, timerfd, mclientfdset)
-- mqtt not connected
else
  res, lbclientstat, timerstat =
    socket.selectfds(10, lbclientfd, timerfd)
end

if mclientfd and mclientstat then
  if socket.fdmaskread(mclientstat) then
    mclient:loop_read()
  end

  if socket.fdmaskwrite(mclientstat) then
    mclient:loop_write()
  end
end

if lbclientstat then
  lbclient:step()
end

if timerstat then
  -- clear armed timer
  timer:read()

  if mclientfd then
    mclient:loop_misc()
  else
    mconnect()
  end
end

For publishing, I am currently using the below code. I am using a for-loop to publish topics.
Code:
const mqttPublishA = () => {
    const aaa = [
      {"commands":[{"alias":"0/0/2","value":0}]},
      {"commands":[{"alias":"0/0/5","value":0}]},
      {"commands":[{"alias":"0/0/10","value":0}]},
      {"commands":[{"alias":"0/0/11","value":0}]},
      {"commands":[{"alias":"0/0/12","value":0}]},
      {"commands":[{"alias":"0/0/13","value":0}]},
      {"commands":[{"alias":"0/0/15","value":0}]},
      {"commands":[{"alias":"0/0/17","value":0}]},
      {"commands":[{"alias":"0/0/18","value":0}]},
      {"commands":[{"alias":"0/0/20","value":0}]},
      {"commands":[{"alias":"0/0/21","value":0}]},
      {"commands":[{"alias":"0/0/22","value":0}]},
      {"commands":[{"alias":"0/0/23","value":0}]},
      {"commands":[{"alias":"0/0/24","value":0}]},
      {"commands":[{"alias":"1/0/2","value":0}]},
      {"commands":[{"alias":"1/0/3","value":0}]},
      {"commands":[{"alias":"1/0/4","value":0}]},
      {"commands":[{"alias":"1/0/5","value":0}]},
      {"commands":[{"alias":"1/0/6","value":0}]},
      {"commands":[{"alias":"1/0/7","value":0}]},
      {"commands":[{"alias":"1/0/8","value":0}]},
      {"commands":[{"alias":"1/0/9","value":0}]},
      {"commands":[{"alias":"1/0/10","value":0}]},
      {"commands":[{"alias":"1/0/11","value":0}]},
      {"commands":[{"alias":"1/0/12","value":0}]},
      {"commands":[{"alias":"1/0/13","value":0}]},
      {"commands":[{"alias":"1/0/15","value":0}]},
      {"commands":[{"alias":"1/0/16","value":0}]},
    ]

    aaa.forEach((element, index) => mqttPublish({
      topic: `floor1/room1/${index}`,
      qos: 0,
      payload: JSON.stringify(element),
    }));
  }

  const mqttPublish = (context) => {
    console.log(context)
    if (client) {
      const { topic, qos, payload } = context;
      client.publish(topic, payload, { qos }, error => {
        if (error) {
          console.log('Publish error: ', error);
        }
      });
    }
  }


Also, are there other ways to make the MQTT protocol process faster in general?
For example,
  • parallelize the subscribe process
  • create a new resident script for each group address, separating the subscribe process
Pretty much new to LM and MQTT, so any help is welcome.
Thank you in advance!
Reply
#2
Are you using an external broker? If so, have you tried using internal broker in LM? With the same script I get around 100 messages per second. Also check that LM is not overloaded with some other script that can cause a slow-down.
Keep in mind that having many log calls inside the script will also slow it down somewhat.
Reply
#3
Hi, admin!

Me and archang is working on the same team.

I tried using MQTT Broker app in LM but I cannot figure out how to connect to the internal broker from the LM scripts.

I've tried connecting broker IP 127.0.0.1 but got error message "mqtt connect failed connection refused - Connection Refused: not authorised."

I didn't set any username or password in MQTT Broker app config.

Could you share the script that works on your side and got around 100 messsages per sec ?
Reply
#4
For testing you can enable "Allow anonymous connections (no username/password)" and remove client:login_set(...) from your script.

Instead of adding log calls to the script you can enable logging for a certain object that will be written to via MQTT. Then you can check the timestamps in Object logs to determine how many telegrams are handled per second.

Code:
require('json')
broker = '127.0.0.1'
port = 1883

mqtt = require('mosquitto')
client = mqtt.new()

client.ON_CONNECT = function(status, rc, msg)
  log('connect', status, rc, msg)

  if status then
    client:subscribe('#')
  else
    client:disconnect()
  end
end

client.ON_MESSAGE = function(mid, topic, payload)
  local decoded = json.pdecode(payload)

  if type(decoded) == 'table' then
    for i, m in ipairs(decoded.commands) do
      grp.write(m.alias, m.value)
    end
  end
end

status, rc, msg = client:connect(broker, port)

client:loop_forever()

Some further improvements can be made with object datatype caching. Otherwise each grp.write call requires a database query to determine the object data type. But keep in mind that the main bottleneck is KNX/TP low speed.
Reply
#5
Hi Admin,

Thank you for your comment and for your code!
I am able to connect to the internal MQTT Broker now.

I also tried not to use log in script and did the object logging instead.
However, executing grp.write() for 28 group addresses still took 2 seconds.

I don't have other resident scripts and these group addresses are not associated with event scripts.
I don't think I can get around 100 messages per second ... am I missing something ?
Reply
#6
As I've already mentioned there main bottleneck is KNX/TP. In theory it can handle around 46 telegrams per second. But it is recommended to keep the bus load at 80% maximum. Otherwise if there's a large burst of telegrams some of them can be lost. There's also some overhead when sending from LM so I would say around 30 is a reasonable limit.
Reply
#7
I see.
Do you think switching the KNX connection mode to IP Router from TP-UART might make the situation better?
(Which KNX connection mode did you use when you got around 100 messages per second? )
Reply
#8
KNX connection type does not matter. Telegrams bound to KNX/TP have a separate queue but it's possible to overflow it if sending too many telegrams at once.
Can you explain why 100 telegrams per second are needed? Are you using single LM in a large KNX/IP network? In this case I would recommend splitting the network into several segments and using multiple LMs.
Reply


Forum Jump: