This forum uses cookies
This forum makes use of cookies to store your login information if you are registered, and your last visit if you are not. Cookies are small text documents stored on your computer; the cookies set by this forum can only be used on this website and pose no security risk. Cookies on this forum also track the specific topics you have read and when you last read them. Please confirm that you accept these cookies being set.

MQTT protocol using resident scripts
#1
Hello,

I have a few questions about the MQTT protocol.
I’ve tried setting the LM up so that it subscribes to each topic associated with a corresponding group address using a resident script and setting the interval to 0.
Then I tried publishing 40 topics, but it took approximately 2 seconds for the process to finish, and from the logs I can tell that it is taking time between the end of a single iteration of client.ON_MESSAGE and the beginning of the next iteration. However, I am not sure why that is the case. Any ideas on why it is taking some time between iterations within the for-loop? I’ve tried 2 variations of the script but had no luck.

Script 1:
Code:
require('json')

broker = "xx.xxx.xxx.xxx"
port = 1883
username = "xxx"
password = "yyy"
topic = "floor1/room1"

mqtt = require("mosquitto")
client = mqtt.new()

client.ON_CONNECT = function(status, rc, msg) -- true, 0, string connection  accepted
 
  if status then
    log("mqtt connected")
     client:subscribe("floor1/room1/0")
    client:subscribe("floor1/room1/1")
    client:subscribe("floor1/room1/2")
    client:subscribe("floor1/room1/3")
    client:subscribe("floor1/room1/4")
    client:subscribe("floor1/room1/5")
    client:subscribe("floor1/room1/6")
    client:subscribe("floor1/room1/7")
    client:subscribe("floor1/room1/8")
    client:subscribe("floor1/room1/9")
    client:subscribe("floor1/room1/10")
  else
    log("mqtt connect failed " .. tostring(msg))
    client:disconnect()
  end
end

client.ON_MESSAGE = function(mid, topic, payload_string)
 
  log("結果" .. payload_string)
  decoded = json.decode(payload_string)
 
    for i,m in ipairs(decoded.commands) do
    local ms = string.match(tostring(os.clock()), "%d%.(%d+)")
     log("書き込み開始  group_address: "..tostring(m.alias).. ", value: " ..tostring(m.value).. ",   time:  " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)
     grp.write(m.alias, m.value)
     log("書き込み終了  group_address: "..tostring(m.alias).. ", value: " ..tostring(m.value).. ",   time:  " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)
    end
  end

client:login_set(username, password)
status, rc, msg = client:connect(broker, port)

if status then
  client:loop_forever()
else
  log("connect failed: " .. tostring(msg))
end

Script 2
Code:
if not broker then
  broker = 'xx.xxx.xxx.xxx'

  function multiply(mult)
    return function(value)
      local num = tonumber(value)
      if num then
        return num * mult
      else
        return value
      end
    end
  end

  -- topic to object map
  mqtt_to_object = {
    ['floor1/room1/0'] = '0/0/2',
    ['floor1/room1/1'] = '0/0/5',
     ['floor1/room1/2'] = '0/0/10',
     ['floor1/room1/3'] = '0/0/11',
     ['floor1/room1/4'] = '0/0/12',
     ['floor1/room1/5'] = '0/0/13',
  }

  -- optional topic value conversion function
  mqtt_to_object_conv = {
    ['floor1/room1/0'] = multiply(100),
    ['floor1/room1/1'] = multiply(0.01),
  }

  -- object to topic map
  object_to_mqtt = {
    ['floor1/room1/status'] = 'floor1/room1/0',
    ['floor1/room1/status'] = 'floor1/room1/1',
     ['floor1/room1/status'] = 'floor1/room1/2',
     ['floor1/room1/status'] = 'floor1/room1/3',
     ['floor1/room1/status'] = 'floor1/room1/4',
     ['floor1/room1/status'] = 'floor1/room1/5',
  }

  datatypes = {}

  grp.sender = 'mq'
  require('socket')

  for addr, _ in pairs(object_to_mqtt) do
    local obj = grp.find(addr)
    if obj then
      datatypes[ addr ] = obj.datatype
    end
  end

  mclient = require('mosquitto').new()

  mclient.ON_CONNECT = function(res, ...)
    log('mqtt connect status', res, ...)

    if res then
      for topic, _ in pairs(mqtt_to_object) do
        mclient:subscribe(topic)
      end
    else
      mclient:disconnect()
    end
  end

  mclient.ON_MESSAGE = function(mid, topic, payload)
    local addr = mqtt_to_object[ topic ]
    if addr then
      local ms = string.match(tostring(os.clock()), "%d%.(%d+)")
      log("書き込み開始  group_address: "..tostring(topic).. ", value: " ..tostring(payload).. ",   time:  " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)
      grp.write(addr, payload)
      log("書き込み終了  group_address: "..tostring(topic).. ", value: " ..tostring(payload).. ",   time:  " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)
    end
  end

  mclient.ON_DISCONNECT = function(...)
    log('mqtt disconnect', ...)
    mclientfd = nil
  end

  function mconnect()
    local fd

    mclient:connect(broker)
    fd = mclient:socket()

    -- fd ref is valid
    if fd then
      mclientfd = fd
    end
  end

  mconnect()

  function publishvalue(event)
    -- message from us or client is not connected
    if event.sender == 'mq' or not mclientfd then
      return
    end

    local addr = event.dst
    local dpt = datatypes[ addr ]
    local topic = object_to_mqtt[ addr ]

    -- unknown object
    if not dpt or not topic then
      return
    end

    local value = busdatatype.decode(event.datahex, dpt)
    if value ~= nil then
      if type(value) == 'boolean' then
        value = value and 1 or 0
      end

      mclient:publish(topic, tostring(value))
    end
  end

  lbclient = require('localbus').new(1)
  lbclient:sethandler('groupwrite', publishvalue)

  lbclientfd = socket.fdmaskset(lbclient:getfd(), 'r')

  -- run timer every 5 seconds
  timer = require('timerfd').new(5)
  timerfd = socket.fdmaskset(timer:getfd(), 'r')
end

-- mqtt connected
if mclientfd then
  mclientfdset = socket.fdmaskset(mclientfd, mclient:want_write() and 'rw' or 'r')
  res, lbclientstat, timerstat, mclientstat =
      socket.selectfds(10, lbclientfd, timerfd, mclientfdset)
-- mqtt not connected
else
  res, lbclientstat, timerstat =
    socket.selectfds(10, lbclientfd, timerfd)
end

if mclientfd and mclientstat then
  if socket.fdmaskread(mclientstat) then
    mclient:loop_read()
  end

  if socket.fdmaskwrite(mclientstat) then
    mclient:loop_write()
  end
end

if lbclientstat then
  lbclient:step()
end

if timerstat then
  -- clear armed timer
  timer:read()

  if mclientfd then
    mclient:loop_misc()
  else
    mconnect()
  end
end

For publishing, I am currently using the below code. I am using a for-loop to publish topics.
Code:
const mqttPublishA = () => {
    const aaa = [
      {"commands":[{"alias":"0/0/2","value":0}]},
      {"commands":[{"alias":"0/0/5","value":0}]},
      {"commands":[{"alias":"0/0/10","value":0}]},
      {"commands":[{"alias":"0/0/11","value":0}]},
      {"commands":[{"alias":"0/0/12","value":0}]},
      {"commands":[{"alias":"0/0/13","value":0}]},
      {"commands":[{"alias":"0/0/15","value":0}]},
      {"commands":[{"alias":"0/0/17","value":0}]},
      {"commands":[{"alias":"0/0/18","value":0}]},
      {"commands":[{"alias":"0/0/20","value":0}]},
      {"commands":[{"alias":"0/0/21","value":0}]},
      {"commands":[{"alias":"0/0/22","value":0}]},
      {"commands":[{"alias":"0/0/23","value":0}]},
      {"commands":[{"alias":"0/0/24","value":0}]},
      {"commands":[{"alias":"1/0/2","value":0}]},
      {"commands":[{"alias":"1/0/3","value":0}]},
      {"commands":[{"alias":"1/0/4","value":0}]},
      {"commands":[{"alias":"1/0/5","value":0}]},
      {"commands":[{"alias":"1/0/6","value":0}]},
      {"commands":[{"alias":"1/0/7","value":0}]},
      {"commands":[{"alias":"1/0/8","value":0}]},
      {"commands":[{"alias":"1/0/9","value":0}]},
      {"commands":[{"alias":"1/0/10","value":0}]},
      {"commands":[{"alias":"1/0/11","value":0}]},
      {"commands":[{"alias":"1/0/12","value":0}]},
      {"commands":[{"alias":"1/0/13","value":0}]},
      {"commands":[{"alias":"1/0/15","value":0}]},
      {"commands":[{"alias":"1/0/16","value":0}]},
    ]

    aaa.forEach((element, index) => mqttPublish({
      topic: `floor1/room1/${index}`,
      qos: 0,
      payload: JSON.stringify(element),
    }));
  }

  const mqttPublish = (context) => {
    console.log(context)
    if (client) {
      const { topic, qos, payload } = context;
      client.publish(topic, payload, { qos }, error => {
        if (error) {
          console.log('Publish error: ', error);
        }
      });
    }
  }


Also, are there other ways to make the MQTT protocol process faster in general?
For example,
  • parallelize the subscribe process
  • create a new resident script for each group address, separating the subscribe process
Pretty much new to LM and MQTT, so any help is welcome.
Thank you in advance!
Reply


Messages In This Thread
MQTT protocol using resident scripts - by achang - 23.05.2022, 13:11

Forum Jump: