MQTT protocol using resident scripts - achang - 23.05.2022
Hello,
I have a few questions about the MQTT protocol.
I’ve tried setting the LM up so that it subscribes to each topic associated with a corresponding group address using a resident script and setting the interval to 0.
Then I tried publishing 40 topics, but it took approximately 2 seconds for the process to finish, and from the logs I can tell that it is taking time between the end of a single iteration of client.ON_MESSAGE and the beginning of the next iteration. However, I am not sure why that is the case. Any ideas on why it is taking some time between iterations within the for-loop? I’ve tried 2 variations of the script but had no luck.
Script 1:
Code: require('json')
broker = "xx.xxx.xxx.xxx"
port = 1883
username = "xxx"
password = "yyy"
topic = "floor1/room1"
mqtt = require("mosquitto")
client = mqtt.new()
client.ON_CONNECT = function(status, rc, msg) -- true, 0, string connection accepted
if status then
log("mqtt connected")
client:subscribe("floor1/room1/0")
client:subscribe("floor1/room1/1")
client:subscribe("floor1/room1/2")
client:subscribe("floor1/room1/3")
client:subscribe("floor1/room1/4")
client:subscribe("floor1/room1/5")
client:subscribe("floor1/room1/6")
client:subscribe("floor1/room1/7")
client:subscribe("floor1/room1/8")
client:subscribe("floor1/room1/9")
client:subscribe("floor1/room1/10")
else
log("mqtt connect failed " .. tostring(msg))
client:disconnect()
end
end
client.ON_MESSAGE = function(mid, topic, payload_string)
log("結果" .. payload_string)
decoded = json.decode(payload_string)
for i,m in ipairs(decoded.commands) do
local ms = string.match(tostring(os.clock()), "%d%.(%d+)")
log("書き込み開始 group_address: "..tostring(m.alias).. ", value: " ..tostring(m.value).. ", time: " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)
grp.write(m.alias, m.value)
log("書き込み終了 group_address: "..tostring(m.alias).. ", value: " ..tostring(m.value).. ", time: " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)
end
end
client:login_set(username, password)
status, rc, msg = client:connect(broker, port)
if status then
client:loop_forever()
else
log("connect failed: " .. tostring(msg))
end
Script 2
Code: if not broker then
broker = 'xx.xxx.xxx.xxx'
function multiply(mult)
return function(value)
local num = tonumber(value)
if num then
return num * mult
else
return value
end
end
end
-- topic to object map
mqtt_to_object = {
['floor1/room1/0'] = '0/0/2',
['floor1/room1/1'] = '0/0/5',
['floor1/room1/2'] = '0/0/10',
['floor1/room1/3'] = '0/0/11',
['floor1/room1/4'] = '0/0/12',
['floor1/room1/5'] = '0/0/13',
}
-- optional topic value conversion function
mqtt_to_object_conv = {
['floor1/room1/0'] = multiply(100),
['floor1/room1/1'] = multiply(0.01),
}
-- object to topic map
object_to_mqtt = {
['floor1/room1/status'] = 'floor1/room1/0',
['floor1/room1/status'] = 'floor1/room1/1',
['floor1/room1/status'] = 'floor1/room1/2',
['floor1/room1/status'] = 'floor1/room1/3',
['floor1/room1/status'] = 'floor1/room1/4',
['floor1/room1/status'] = 'floor1/room1/5',
}
datatypes = {}
grp.sender = 'mq'
require('socket')
for addr, _ in pairs(object_to_mqtt) do
local obj = grp.find(addr)
if obj then
datatypes[ addr ] = obj.datatype
end
end
mclient = require('mosquitto').new()
mclient.ON_CONNECT = function(res, ...)
log('mqtt connect status', res, ...)
if res then
for topic, _ in pairs(mqtt_to_object) do
mclient:subscribe(topic)
end
else
mclient:disconnect()
end
end
mclient.ON_MESSAGE = function(mid, topic, payload)
local addr = mqtt_to_object[ topic ]
if addr then
local ms = string.match(tostring(os.clock()), "%d%.(%d+)")
log("書き込み開始 group_address: "..tostring(topic).. ", value: " ..tostring(payload).. ", time: " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)
grp.write(addr, payload)
log("書き込み終了 group_address: "..tostring(topic).. ", value: " ..tostring(payload).. ", time: " ..os.date("%Y-%m-%d %H:%M:%S") ..ms)
end
end
mclient.ON_DISCONNECT = function(...)
log('mqtt disconnect', ...)
mclientfd = nil
end
function mconnect()
local fd
mclient:connect(broker)
fd = mclient:socket()
-- fd ref is valid
if fd then
mclientfd = fd
end
end
mconnect()
function publishvalue(event)
-- message from us or client is not connected
if event.sender == 'mq' or not mclientfd then
return
end
local addr = event.dst
local dpt = datatypes[ addr ]
local topic = object_to_mqtt[ addr ]
-- unknown object
if not dpt or not topic then
return
end
local value = busdatatype.decode(event.datahex, dpt)
if value ~= nil then
if type(value) == 'boolean' then
value = value and 1 or 0
end
mclient:publish(topic, tostring(value))
end
end
lbclient = require('localbus').new(1)
lbclient:sethandler('groupwrite', publishvalue)
lbclientfd = socket.fdmaskset(lbclient:getfd(), 'r')
-- run timer every 5 seconds
timer = require('timerfd').new(5)
timerfd = socket.fdmaskset(timer:getfd(), 'r')
end
-- mqtt connected
if mclientfd then
mclientfdset = socket.fdmaskset(mclientfd, mclient:want_write() and 'rw' or 'r')
res, lbclientstat, timerstat, mclientstat =
socket.selectfds(10, lbclientfd, timerfd, mclientfdset)
-- mqtt not connected
else
res, lbclientstat, timerstat =
socket.selectfds(10, lbclientfd, timerfd)
end
if mclientfd and mclientstat then
if socket.fdmaskread(mclientstat) then
mclient:loop_read()
end
if socket.fdmaskwrite(mclientstat) then
mclient:loop_write()
end
end
if lbclientstat then
lbclient:step()
end
if timerstat then
-- clear armed timer
timer:read()
if mclientfd then
mclient:loop_misc()
else
mconnect()
end
end
For publishing, I am currently using the below code. I am using a for-loop to publish topics.
Code: const mqttPublishA = () => {
const aaa = [
{"commands":[{"alias":"0/0/2","value":0}]},
{"commands":[{"alias":"0/0/5","value":0}]},
{"commands":[{"alias":"0/0/10","value":0}]},
{"commands":[{"alias":"0/0/11","value":0}]},
{"commands":[{"alias":"0/0/12","value":0}]},
{"commands":[{"alias":"0/0/13","value":0}]},
{"commands":[{"alias":"0/0/15","value":0}]},
{"commands":[{"alias":"0/0/17","value":0}]},
{"commands":[{"alias":"0/0/18","value":0}]},
{"commands":[{"alias":"0/0/20","value":0}]},
{"commands":[{"alias":"0/0/21","value":0}]},
{"commands":[{"alias":"0/0/22","value":0}]},
{"commands":[{"alias":"0/0/23","value":0}]},
{"commands":[{"alias":"0/0/24","value":0}]},
{"commands":[{"alias":"1/0/2","value":0}]},
{"commands":[{"alias":"1/0/3","value":0}]},
{"commands":[{"alias":"1/0/4","value":0}]},
{"commands":[{"alias":"1/0/5","value":0}]},
{"commands":[{"alias":"1/0/6","value":0}]},
{"commands":[{"alias":"1/0/7","value":0}]},
{"commands":[{"alias":"1/0/8","value":0}]},
{"commands":[{"alias":"1/0/9","value":0}]},
{"commands":[{"alias":"1/0/10","value":0}]},
{"commands":[{"alias":"1/0/11","value":0}]},
{"commands":[{"alias":"1/0/12","value":0}]},
{"commands":[{"alias":"1/0/13","value":0}]},
{"commands":[{"alias":"1/0/15","value":0}]},
{"commands":[{"alias":"1/0/16","value":0}]},
]
aaa.forEach((element, index) => mqttPublish({
topic: `floor1/room1/${index}`,
qos: 0,
payload: JSON.stringify(element),
}));
}
const mqttPublish = (context) => {
console.log(context)
if (client) {
const { topic, qos, payload } = context;
client.publish(topic, payload, { qos }, error => {
if (error) {
console.log('Publish error: ', error);
}
});
}
}
Also, are there other ways to make the MQTT protocol process faster in general?
For example,- parallelize the subscribe process
- create a new resident script for each group address, separating the subscribe process
Pretty much new to LM and MQTT, so any help is welcome.
Thank you in advance!
RE: MQTT protocol using resident scripts - admin - 23.05.2022
Are you using an external broker? If so, have you tried using internal broker in LM? With the same script I get around 100 messages per second. Also check that LM is not overloaded with some other script that can cause a slow-down.
Keep in mind that having many log calls inside the script will also slow it down somewhat.
RE: MQTT protocol using resident scripts - Hadeel - 24.05.2022
Hi, admin!
Me and archang is working on the same team.
I tried using MQTT Broker app in LM but I cannot figure out how to connect to the internal broker from the LM scripts.
I've tried connecting broker IP 127.0.0.1 but got error message "mqtt connect failed connection refused - Connection Refused: not authorised."
I didn't set any username or password in MQTT Broker app config.
Could you share the script that works on your side and got around 100 messsages per sec ?
RE: MQTT protocol using resident scripts - admin - 24.05.2022
For testing you can enable "Allow anonymous connections (no username/password)" and remove client:login_set(...) from your script.
Instead of adding log calls to the script you can enable logging for a certain object that will be written to via MQTT. Then you can check the timestamps in Object logs to determine how many telegrams are handled per second.
Code: require('json')
broker = '127.0.0.1'
port = 1883
mqtt = require('mosquitto')
client = mqtt.new()
client.ON_CONNECT = function(status, rc, msg)
log('connect', status, rc, msg)
if status then
client:subscribe('#')
else
client:disconnect()
end
end
client.ON_MESSAGE = function(mid, topic, payload)
local decoded = json.pdecode(payload)
if type(decoded) == 'table' then
for i, m in ipairs(decoded.commands) do
grp.write(m.alias, m.value)
end
end
end
status, rc, msg = client:connect(broker, port)
client:loop_forever()
Some further improvements can be made with object datatype caching. Otherwise each grp.write call requires a database query to determine the object data type. But keep in mind that the main bottleneck is KNX/TP low speed.
RE: MQTT protocol using resident scripts - Hadeel - 24.05.2022
Hi Admin,
Thank you for your comment and for your code!
I am able to connect to the internal MQTT Broker now.
I also tried not to use log in script and did the object logging instead.
However, executing grp.write() for 28 group addresses still took 2 seconds.
I don't have other resident scripts and these group addresses are not associated with event scripts.
I don't think I can get around 100 messages per second ... am I missing something ?
RE: MQTT protocol using resident scripts - admin - 25.05.2022
As I've already mentioned there main bottleneck is KNX/TP. In theory it can handle around 46 telegrams per second. But it is recommended to keep the bus load at 80% maximum. Otherwise if there's a large burst of telegrams some of them can be lost. There's also some overhead when sending from LM so I would say around 30 is a reasonable limit.
RE: MQTT protocol using resident scripts - Hadeel - 25.05.2022
I see.
Do you think switching the KNX connection mode to IP Router from TP-UART might make the situation better?
(Which KNX connection mode did you use when you got around 100 messages per second? )
RE: MQTT protocol using resident scripts - admin - 25.05.2022
KNX connection type does not matter. Telegrams bound to KNX/TP have a separate queue but it's possible to overflow it if sending too many telegrams at once.
Can you explain why 100 telegrams per second are needed? Are you using single LM in a large KNX/IP network? In this case I would recommend splitting the network into several segments and using multiple LMs.
|