23.05.2022, 13:11
Hello,
I have a few questions about the MQTT protocol.
I’ve tried setting the LM up so that it subscribes to each topic associated with a corresponding group address using a resident script and setting the interval to 0.
Then I tried publishing 40 topics, but it took approximately 2 seconds for the process to finish, and from the logs I can tell that it is taking time between the end of a single iteration of client.ON_MESSAGE and the beginning of the next iteration. However, I am not sure why that is the case. Any ideas on why it is taking some time between iterations within the for-loop? I’ve tried 2 variations of the script but had no luck.
Script 1:
Script 2
For publishing, I am currently using the below code. I am using a for-loop to publish topics.
Also, are there other ways to make the MQTT protocol process faster in general?
For example,
Thank you in advance!
I have a few questions about the MQTT protocol.
I’ve tried setting the LM up so that it subscribes to each topic associated with a corresponding group address using a resident script and setting the interval to 0.
Then I tried publishing 40 topics, but it took approximately 2 seconds for the process to finish, and from the logs I can tell that it is taking time between the end of a single iteration of client.ON_MESSAGE and the beginning of the next iteration. However, I am not sure why that is the case. Any ideas on why it is taking some time between iterations within the for-loop? I’ve tried 2 variations of the script but had no luck.
Script 1:
Code:
require('json')
broker = "xx.xxx.xxx.xxx"
port = 1883
username = "xxx"
password = "yyy"
topic = "floor1/room1"
mqtt = require("mosquitto")
client = mqtt.new()
client.ON_CONNECT = function(status, rc, msg) -- true, 0, string connection accepted
if status then
log("mqtt connected")
client:subscribe("floor1/room1/0")
client:subscribe("floor1/room1/1")
client:subscribe("floor1/room1/2")
client:subscribe("floor1/room1/3")
client:subscribe("floor1/room1/4")
client:subscribe("floor1/room1/5")
client:subscribe("floor1/room1/6")
client:subscribe("floor1/room1/7")
client:subscribe("floor1/room1/8")
client:subscribe("floor1/room1/9")
client:subscribe("floor1/room1/10")
else
log("mqtt connect failed " .. tostring(msg))
client:disconnect()
end
end
client.ON_MESSAGE = function(mid, topic, payload_string)
log("結果" .. payload_string)
decoded = json.decode(payload_string)
for i,m in ipairs(decoded.commands) do
local ms = string.match(tostring(os.clock()), "%d%.(%d+)")
log("書き込み開始 group_address: "..tostring(m.alias).. ", value: " ..tostring(m.value).. ", time: " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)
grp.write(m.alias, m.value)
log("書き込み終了 group_address: "..tostring(m.alias).. ", value: " ..tostring(m.value).. ", time: " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)
end
end
client:login_set(username, password)
status, rc, msg = client:connect(broker, port)
if status then
client:loop_forever()
else
log("connect failed: " .. tostring(msg))
end
Script 2
Code:
if not broker then
broker = 'xx.xxx.xxx.xxx'
function multiply(mult)
return function(value)
local num = tonumber(value)
if num then
return num * mult
else
return value
end
end
end
-- topic to object map
mqtt_to_object = {
['floor1/room1/0'] = '0/0/2',
['floor1/room1/1'] = '0/0/5',
['floor1/room1/2'] = '0/0/10',
['floor1/room1/3'] = '0/0/11',
['floor1/room1/4'] = '0/0/12',
['floor1/room1/5'] = '0/0/13',
}
-- optional topic value conversion function
mqtt_to_object_conv = {
['floor1/room1/0'] = multiply(100),
['floor1/room1/1'] = multiply(0.01),
}
-- object to topic map
object_to_mqtt = {
['floor1/room1/status'] = 'floor1/room1/0',
['floor1/room1/status'] = 'floor1/room1/1',
['floor1/room1/status'] = 'floor1/room1/2',
['floor1/room1/status'] = 'floor1/room1/3',
['floor1/room1/status'] = 'floor1/room1/4',
['floor1/room1/status'] = 'floor1/room1/5',
}
datatypes = {}
grp.sender = 'mq'
require('socket')
for addr, _ in pairs(object_to_mqtt) do
local obj = grp.find(addr)
if obj then
datatypes[ addr ] = obj.datatype
end
end
mclient = require('mosquitto').new()
mclient.ON_CONNECT = function(res, ...)
log('mqtt connect status', res, ...)
if res then
for topic, _ in pairs(mqtt_to_object) do
mclient:subscribe(topic)
end
else
mclient:disconnect()
end
end
mclient.ON_MESSAGE = function(mid, topic, payload)
local addr = mqtt_to_object[ topic ]
if addr then
local ms = string.match(tostring(os.clock()), "%d%.(%d+)")
log("書き込み開始 group_address: "..tostring(topic).. ", value: " ..tostring(payload).. ", time: " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)
grp.write(addr, payload)
log("書き込み終了 group_address: "..tostring(topic).. ", value: " ..tostring(payload).. ", time: " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)
end
end
mclient.ON_DISCONNECT = function(...)
log('mqtt disconnect', ...)
mclientfd = nil
end
function mconnect()
local fd
mclient:connect(broker)
fd = mclient:socket()
-- fd ref is valid
if fd then
mclientfd = fd
end
end
mconnect()
function publishvalue(event)
-- message from us or client is not connected
if event.sender == 'mq' or not mclientfd then
return
end
local addr = event.dst
local dpt = datatypes[ addr ]
local topic = object_to_mqtt[ addr ]
-- unknown object
if not dpt or not topic then
return
end
local value = busdatatype.decode(event.datahex, dpt)
if value ~= nil then
if type(value) == 'boolean' then
value = value and 1 or 0
end
mclient:publish(topic, tostring(value))
end
end
lbclient = require('localbus').new(1)
lbclient:sethandler('groupwrite', publishvalue)
lbclientfd = socket.fdmaskset(lbclient:getfd(), 'r')
-- run timer every 5 seconds
timer = require('timerfd').new(5)
timerfd = socket.fdmaskset(timer:getfd(), 'r')
end
-- mqtt connected
if mclientfd then
mclientfdset = socket.fdmaskset(mclientfd, mclient:want_write() and 'rw' or 'r')
res, lbclientstat, timerstat, mclientstat =
socket.selectfds(10, lbclientfd, timerfd, mclientfdset)
-- mqtt not connected
else
res, lbclientstat, timerstat =
socket.selectfds(10, lbclientfd, timerfd)
end
if mclientfd and mclientstat then
if socket.fdmaskread(mclientstat) then
mclient:loop_read()
end
if socket.fdmaskwrite(mclientstat) then
mclient:loop_write()
end
end
if lbclientstat then
lbclient:step()
end
if timerstat then
-- clear armed timer
timer:read()
if mclientfd then
mclient:loop_misc()
else
mconnect()
end
end
For publishing, I am currently using the below code. I am using a for-loop to publish topics.
Code:
const mqttPublishA = () => {
const aaa = [
{"commands":[{"alias":"0/0/2","value":0}]},
{"commands":[{"alias":"0/0/5","value":0}]},
{"commands":[{"alias":"0/0/10","value":0}]},
{"commands":[{"alias":"0/0/11","value":0}]},
{"commands":[{"alias":"0/0/12","value":0}]},
{"commands":[{"alias":"0/0/13","value":0}]},
{"commands":[{"alias":"0/0/15","value":0}]},
{"commands":[{"alias":"0/0/17","value":0}]},
{"commands":[{"alias":"0/0/18","value":0}]},
{"commands":[{"alias":"0/0/20","value":0}]},
{"commands":[{"alias":"0/0/21","value":0}]},
{"commands":[{"alias":"0/0/22","value":0}]},
{"commands":[{"alias":"0/0/23","value":0}]},
{"commands":[{"alias":"0/0/24","value":0}]},
{"commands":[{"alias":"1/0/2","value":0}]},
{"commands":[{"alias":"1/0/3","value":0}]},
{"commands":[{"alias":"1/0/4","value":0}]},
{"commands":[{"alias":"1/0/5","value":0}]},
{"commands":[{"alias":"1/0/6","value":0}]},
{"commands":[{"alias":"1/0/7","value":0}]},
{"commands":[{"alias":"1/0/8","value":0}]},
{"commands":[{"alias":"1/0/9","value":0}]},
{"commands":[{"alias":"1/0/10","value":0}]},
{"commands":[{"alias":"1/0/11","value":0}]},
{"commands":[{"alias":"1/0/12","value":0}]},
{"commands":[{"alias":"1/0/13","value":0}]},
{"commands":[{"alias":"1/0/15","value":0}]},
{"commands":[{"alias":"1/0/16","value":0}]},
]
aaa.forEach((element, index) => mqttPublish({
topic: `floor1/room1/${index}`,
qos: 0,
payload: JSON.stringify(element),
}));
}
const mqttPublish = (context) => {
console.log(context)
if (client) {
const { topic, qos, payload } = context;
client.publish(topic, payload, { qos }, error => {
if (error) {
console.log('Publish error: ', error);
}
});
}
}
Also, are there other ways to make the MQTT protocol process faster in general?
For example,
- parallelize the subscribe process
- create a new resident script for each group address, separating the subscribe process
Thank you in advance!