universal MQTT / Timer / Localbus event loop - myg - 02.06.2020
I've tried to gather best practices across the forum on writing non-blocking loops to handle events from mqtt / bus and timer events in order to write constantly running logic for sending / receiving / syncing data or similar. I've come up with user library below that simplifies API to initiate and attach to event sources. Hopefully it will be useful to others.
Usage (put code below in a resident script, timeout doesn't matter since loop never leaves):
Code: local mqtt_cfg = {
broker = 'IP',
login = 'USERNAME',
pwd = 'PASSWORD',
id = 'CLIENT_ID',
topics = {'zigbee/+', 'zigbee/bridge/config'},
-- alternative for single topic subscription
-- topics = 'zigbee/+',
}
function mqtt_onmessage(mid, topic, payload)
-- logic
end
function localbus_onmessage(event)
-- logic
end
function on_timer()
-- logic
end
function setup()
local lp = loop.get()
-- use one or more of the below in any combination
-- mqtt client with callback function to receive messages
-- store mqtt.client to local variable if you want to publish messages later
local mqtt = loop.create_mqtt(mqtt_cfg, mqtt_onmessage)
-- add 'groupread = read_callback' to add read callback handler
local bus = loop.create_localbus({ groupwrite = localbus_onmessage })
-- timer that fires callback every 10 seconds, period can be a fraction of a second
local timer = loop.create_timer(10, on_timer)
-- add all workers to our loop
lp:add(mqtt)
lp:add(bus)
lp:add(timer)
end
setup()
-- run loop forever until script is terminated
loop.get():run_loop()
Create user library 'user.eventloop'
Code: socket = require('socket')
local function safe_callback(callback)
return function(...)
--return callback(...)
local res, errMsg = pcall(callback, ...)
if(not res) then
error(string.format('unhandled exception: %s', errMsg))
end
return res, errMsg
end
end
package.preload['user.eventloop.loop'] = (function(...)
local loop = {}
loop.__index = loop
function loop:add(worker)
self.workers[#self.workers + 1] = worker
if(worker.on_add) then
worker:on_add(self)
end
end
function loop:remove(worker)
local w = self.workers
for i = 1, #w do
if(w[i] == worker) then
table.remove(w, i)
break
end
end
end
function loop:prepare_fds()
local fds = {}
local actions = {}
for i = 1, #self.workers do
local v = self.workers[i]
local fd = v:get_fd()
if(fd) then
fds[#fds + 1] = fd
actions[#actions + 1] = v
end
end
return fds, actions
end
function loop:run_loop()
while(#self.workers > 0) do
local fds, actions = self:prepare_fds()
local res = { socket.selectfds(10, unpack(fds)) }
-- first result is true if returned something, nil if not
if(res[1]) then
for i = 1, #fds do
local fd = res[i + 1]
if(fd) then
actions[i]:step(fd)
end
end
end
end
end
function loop.create_timer(...)
return require('user.eventloop.timer').new(...)
end
function loop.create_localbus(...)
return require('user.eventloop.localbus').new(...)
end
function loop.create_mqtt(...)
return require('user.eventloop.mqtt').new(...)
end
local instance
function loop.get()
if(not instance) then
instance = setmetatable({}, loop)
instance.workers = {}
end
return instance
end
return loop
end)
package.preload['user.eventloop.timer'] = (function(...)
local timer = { type = 'timer' }
timer.__index = timer
function timer:get_fd()
return self.fd
end
function timer:step()
-- disarm timer
self.timer:read()
self.callback()
end
function timer:on_add(loop)
self.loop = loop
end
function timer:stop()
self.timer:close()
self.loop:remove(self)
end
function timer.new(interval, callback)
local t = setmetatable({}, timer)
t.callback = safe_callback(callback)
t.timer = require('timerfd').new(interval)
t.fd = socket.fdmaskset(t.timer:getfd(), 'r')
return t
end
return timer
end)
package.preload['user.eventloop.localbus'] = (function(...)
local lbus = { type = 'localbus' }
lbus.__index = lbus
function lbus:get_fd()
return self.fd
end
function lbus:step()
self.bus:step()
end
function lbus.new(handlers)
local b = setmetatable({}, lbus)
b.bus = require('localbus').new(1)
if(handlers.groupwrite) then
b.bus:sethandler('groupwrite', safe_callback(handlers.groupwrite))
end
if(handlers.groupread) then
b.bus:sethandler('groupread', safe_callback(handlers.groupread))
end
b.fd = socket.fdmaskset(b.bus:getfd(), 'r')
return b
end
return lbus
end)
package.preload['user.eventloop.mqtt'] = (function(...)
local bind = function(obj, func) return function(...) func(obj, ...) end end
local mqtt = { type = 'mqtt' }
mqtt.__index = mqtt
function mqtt:get_fd()
if(self.soc) then
return socket.fdmaskset(self.soc, self.client:want_write() and 'rw' or 'r')
else
return nil
end
end
function mqtt:on_add(loop)
-- need to do misc() every second according to mosquitto doc
local timer = loop.create_timer(1, bind(self, self.step_misc))
loop:add(timer)
end
function mqtt:step_misc()
if(self.soc) then
self.client:loop_misc()
else
self:connect()
end
end
function mqtt:step(fd)
if(socket.fdmaskread(fd)) then
self.client:loop_read()
end
if(socket.fdmaskwrite(fd)) then
self.client:loop_write()
end
end
function mqtt:connect()
self.client:connect(self.broker)
self.soc = self.client:socket() or nil
end
function mqtt:on_connect(success)
if(success) then
log('connected to MQTT')
if(type(self.topics) == 'string') then
self.client:subscribe(self.topics)
else
for i, t in ipairs(self.topics) do
self.client:subscribe(t)
end
end
end
end
function mqtt:on_disconnect(...)
log('mqtt disconnect', ...)
self.soc = nil
end
function mqtt.new(cfg, msg_callback)
local m = setmetatable({}, mqtt)
m.client = require('mosquitto').new(cfg.id)
m.client:login_set(cfg.login, cfg.pwd)
m.client.ON_CONNECT = bind(m, m.on_connect)
m.client.ON_DISCONNECT = bind(m, m.on_disconnect)
-- callback have (mid, topic, payload) arguments
m.client.ON_MESSAGE = safe_callback(msg_callback)
m.broker = cfg.broker
m.topics = cfg.topics
return m
end
return mqtt
end)
return require('user.eventloop.loop')
RE: universal MQTT / Timer / Localbus event loop - admin - 02.06.2020
Looks good but you need to check on_connect first argument. It will be false if connection failed.
RE: universal MQTT / Timer / Localbus event loop - myg - 02.06.2020
Thanks, i've added this check to code above to make it cleaner. It seems it wasn't critical – in case socket connection was established, but server did not accept connection (for ex. wrong credentials), disconnect event happened quickly after that. In other cases (server not available) connect event wasn't fired at all.
RE: universal MQTT / Timer / Localbus event loop - benanderson_475 - 13.09.2020
Hi i am running some tests with the above code for mqtt, some reason i receive an error in the log as below, i don't know why the loop.get() function is not returning?
Resident script:29: attempt to index global 'loop' (a nil value)
stack traceback:
Resident script:29: in function 'setup'
resident script line 29 has, local lp = loop.get()
i have added the library called 'eventloop' as above.
[Edit]
ahhh i have it now, i removed the first local before loop ={} in the library i am connected now...
RE: universal MQTT / Timer / Localbus event loop - myg - 16.09.2020
I've published updated library version to include safe callbacks in case of error in the user function, which would eventually ruin resident script with undesired behaviour if not handled.
RE: universal MQTT / Timer / Localbus event loop - benanderson_475 - 23.09.2020
if i want to stop a timer from the resident script callback function how can i realize this?
Many Thanks
Code: function on_timer2()
if (some condition) then
-- stop Timer here
end
end
function setup()
local lp = loop.get()
-- use one or more of the below in any combination
-- mqtt client with callback function to receive messages
-- store mqtt.client to local variable if you want to publish messages later
local mqtt = loop.create_mqtt(mqtt_cfg, mqtt_onmessage)
-- add 'groupread = read_callback' to add read callback handler
local bus = loop.create_localbus({ groupwrite = localbus_onmessage })
-- timer that fires callback every 10 seconds, period can be a fraction of a second
local timer = loop.create_timer(10, on_timer)
local timer = loop.create_timer(10, on_timer2)
-- add all workers to our loop
lp:add(mqtt)
lp:add(bus)
lp:add(timer)
end
RE: universal MQTT / Timer / Localbus event loop - myg - 23.09.2020
(23.09.2020, 01:26)benanderson_475 Wrote: if i want to stop a timer from the resident script callback function how can i realize this?
Many Thanks
I've updated library code, you can now call
when you need it to be permanently stopped
Note that this also removes timer from the event loop, as a side effect, when you have nothing to run, the loop will exit, but resident script will be restarted after specified delay and everything will start over.
RE: universal MQTT / Timer / Localbus event loop - benanderson_475 - 28.09.2020
(23.09.2020, 14:20)myg Wrote: (23.09.2020, 01:26)benanderson_475 Wrote: if i want to stop a timer from the resident script callback function how can i realize this?
Many Thanks
I've updated library code, you can now call
when you need it to be permanently stopped
Note that this also removes timer from the event loop, as a side effect, when you have nothing to run, the loop will exit, but resident script will be restarted after specified delay and everything will start over.
@myg Many thanks,
RE: universal MQTT / Timer / Localbus event loop - spyder - 22.03.2023
Hi,
I am running this code to react to incoming MQTT messages.
During execution, I access storage using a semaphore, but after a while I get 'failed to create semaphore'.
Code: require('sem')
-- wait for 5 seconds max before executing callback
res1, res2 = sem.runlocked('eventlock', 5, function(lockres)
-- lock acquired
if lockres then
storage.set('my_storage', '')
return true, 'ok'
-- failed to acquire lock
elseif lockres == nil then
return nil, 'failed to create semaphore'
else
return nil, 'failed to lock semaphore'
end
end)
log(res1, res2)
In the documentation for semaphore:close(), it is stated:
Quote:Closes the semaphore. Not required, semaphore handles are closed automatically when script execution ends
Do I explicitely need to do it in the case of an eventloop? What can I do to prevent failure to create a semaphore?
Thanks
RE: universal MQTT / Timer / Localbus event loop - admin - 24.03.2023
sem.runlocked closes the semaphore automatically. This issue might happen when there are too many open handles (files, sockets, semaphores etc) which are not closed when needed.
If you only need to receive values from MQTT you can simply use client:loop_forever without any additional libraries.
|