This forum uses cookies
This forum makes use of cookies to store your login information if you are registered, and your last visit if you are not. Cookies are small text documents stored on your computer; the cookies set by this forum can only be used on this website and pose no security risk. Cookies on this forum also track the specific topics you have read and when you last read them. Please confirm that you accept these cookies being set.

universal MQTT / Timer / Localbus event loop
#1
I've tried to gather best practices across the forum on writing non-blocking loops to handle events from mqtt / bus and timer events in order to write constantly running logic for sending / receiving / syncing data or similar. I've come up with user library below that simplifies API to initiate and attach to event sources. Hopefully it will be useful to others.

Usage (put code below in a resident script, timeout doesn't matter since loop never leaves):


Code:
1234567891011121314151617181920212223242526272829303132333435363738394041424344
local mqtt_cfg = {   broker = 'IP',   login = 'USERNAME',   pwd = 'PASSWORD',   id = 'CLIENT_ID',   topics = {'zigbee/+', 'zigbee/bridge/config'}, -- alternative for single topic subscription -- topics = 'zigbee/+', } function mqtt_onmessage(mid, topic, payload) -- logic end function localbus_onmessage(event) -- logic end function on_timer() -- logic end function setup()   local lp = loop.get()     -- use one or more of the below in any combination   -- mqtt client with callback function to receive messages   -- store mqtt.client to local variable if you want to publish messages later   local mqtt = loop.create_mqtt(mqtt_cfg, mqtt_onmessage)   -- add 'groupread = read_callback' to add read callback handler   local bus = loop.create_localbus({ groupwrite = localbus_onmessage })   -- timer that fires callback every 10 seconds, period can be a fraction of a second   local timer = loop.create_timer(10, on_timer)     -- add all workers to our loop   lp:add(mqtt)   lp:add(bus)   lp:add(timer) end setup() -- run loop forever until script is terminated loop.get():run_loop()




Create user library 'user.eventloop'

Code:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
socket = require('socket') local function safe_callback(callback)   return function(...)     --return callback(...)   local res, errMsg = pcall(callback, ...)     if(not res) then       error(string.format('unhandled exception: %s', errMsg))     end     return res, errMsg   end end package.preload['user.eventloop.loop'] = (function(...)     local loop = {}     loop.__index = loop         function loop:add(worker)       self.workers[#self.workers + 1] = worker       if(worker.on_add) then         worker:on_add(self)       end     end         function loop:remove(worker)       local w = self.workers       for i = 1, #w do         if(w[i] == worker) then           table.remove(w, i)           break         end       end     end     function loop:prepare_fds()       local fds = {}       local actions = {}       for i = 1, #self.workers do         local v = self.workers[i]         local fd = v:get_fd()         if(fd) then           fds[#fds + 1] = fd           actions[#actions + 1] = v         end       end             return fds, actions     end         function loop:run_loop()       while(#self.workers > 0) do         local fds, actions = self:prepare_fds()         local res = { socket.selectfds(10, unpack(fds)) }                 -- first result is true if returned something, nil if not         if(res[1]) then           for i = 1, #fds do             local fd = res[i + 1]             if(fd) then               actions[i]:step(fd)             end           end         end       end     end         function loop.create_timer(...)       return require('user.eventloop.timer').new(...)     end         function loop.create_localbus(...)       return require('user.eventloop.localbus').new(...)     end         function loop.create_mqtt(...)       return require('user.eventloop.mqtt').new(...)     end         local instance     function loop.get()       if(not instance) then         instance = setmetatable({}, loop)         instance.workers = {}       end       return instance     end           return loop   end) package.preload['user.eventloop.timer'] = (function(...)     local timer = { type = 'timer' }     timer.__index = timer         function timer:get_fd()     return self.fd     end         function timer:step()       -- disarm timer       self.timer:read()       self.callback()     end function timer:on_add(loop)       self.loop = loop     end         function timer:stop()       self.timer:close()       self.loop:remove(self)     end         function timer.new(interval, callback)       local t = setmetatable({}, timer)       t.callback = safe_callback(callback)       t.timer = require('timerfd').new(interval)       t.fd = socket.fdmaskset(t.timer:getfd(), 'r')             return t     end         return timer   end) package.preload['user.eventloop.localbus'] = (function(...)     local lbus = { type = 'localbus' }     lbus.__index = lbus         function lbus:get_fd()       return self.fd     end         function lbus:step()       self.bus:step()     end         function lbus.new(handlers)       local b = setmetatable({}, lbus)             b.bus = require('localbus').new(1)       if(handlers.groupwrite) then         b.bus:sethandler('groupwrite', safe_callback(handlers.groupwrite))       end       if(handlers.groupread) then       b.bus:sethandler('groupread', safe_callback(handlers.groupread))       end       b.fd = socket.fdmaskset(b.bus:getfd(), 'r')             return b     end         return lbus   end) package.preload['user.eventloop.mqtt'] = (function(...)     local bind = function(obj, func) return function(...) func(obj, ...) end end     local mqtt = { type = 'mqtt' }     mqtt.__index = mqtt         function mqtt:get_fd()       if(self.soc) then         return socket.fdmaskset(self.soc, self.client:want_write() and 'rw' or 'r')       else         return nil       end     end         function mqtt:on_add(loop)       -- need to do misc() every second according to mosquitto doc       local timer = loop.create_timer(1, bind(self, self.step_misc))       loop:add(timer)     end           function mqtt:step_misc()       if(self.soc) then         self.client:loop_misc()       else         self:connect()       end     end         function mqtt:step(fd)       if(socket.fdmaskread(fd)) then         self.client:loop_read()       end       if(socket.fdmaskwrite(fd)) then         self.client:loop_write()       end     end         function mqtt:connect()       self.client:connect(self.broker)       self.soc = self.client:socket() or nil     end         function mqtt:on_connect(success)       if(success) then         log('connected to MQTT')         if(type(self.topics) == 'string') then           self.client:subscribe(self.topics)         else           for i, t in ipairs(self.topics) do             self.client:subscribe(t)           end         end       end     end         function mqtt:on_disconnect(...)       log('mqtt disconnect', ...)       self.soc = nil     end         function mqtt.new(cfg, msg_callback)       local m = setmetatable({}, mqtt)             m.client = require('mosquitto').new(cfg.id)       m.client:login_set(cfg.login, cfg.pwd)       m.client.ON_CONNECT = bind(m, m.on_connect)       m.client.ON_DISCONNECT = bind(m, m.on_disconnect)       -- callback have (mid, topic, payload) arguments       m.client.ON_MESSAGE = safe_callback(msg_callback)             m.broker = cfg.broker       m.topics = cfg.topics             return m     end         return mqtt   end) return require('user.eventloop.loop')
Reply
#2
Looks good but you need to check on_connect first argument. It will be false if connection failed.
Reply
#3
Thanks, i've added this check to code above to make it cleaner. It seems it wasn't critical – in case socket connection was established, but server did not accept connection (for ex. wrong credentials), disconnect event happened quickly after that. In other cases (server not available) connect event wasn't fired at all.
Reply
#4
Hi i am running some tests with the above code for mqtt, some reason i receive an error in the log as below, i don't know why the loop.get() function is not returning?  

Resident script:29: attempt to index global 'loop' (a nil value)
stack traceback:
Resident script:29: in function 'setup'


resident script line 29 has, local lp = loop.get()

i have added the library called 'eventloop' as above.

[Edit]
ahhh i have it now, i removed the first local before loop ={} in the library i am connected now...
Reply
#5
I've published updated library version to include safe callbacks in case of error in the user function, which would eventually ruin resident script with undesired behaviour if not handled.
Reply
#6
if i want to stop a timer from the resident script callback function how can i realize this? 
Many Thanks

Code:
1234567891011121314151617181920212223242526272829
function on_timer2() if (some condition) then -- stop Timer here end end function setup()   local lp = loop.get()      -- use one or more of the below in any combination   -- mqtt client with callback function to receive messages   -- store mqtt.client to local variable if you want to publish messages later   local mqtt = loop.create_mqtt(mqtt_cfg, mqtt_onmessage)   -- add 'groupread = read_callback' to add read callback handler   local bus = loop.create_localbus({ groupwrite = localbus_onmessage })   -- timer that fires callback every 10 seconds, period can be a fraction of a second      local timer = loop.create_timer(10, on_timer)   local timer = loop.create_timer(10, on_timer2)      -- add all workers to our loop   lp:add(mqtt)   lp:add(bus)   lp:add(timer)    end
Reply
#7
(23.09.2020, 01:26)benanderson_475 Wrote: if i want to stop a timer from the resident script callback function how can i realize this? 
Many Thanks

I've updated library code, you can now call
Code:
1
timer:stop()
when you need it to be permanently stopped

Note that this also removes timer from the event loop, as a side effect, when you have nothing to run, the loop will exit, but resident script will be restarted after specified delay and everything will start over.
Reply
#8
(23.09.2020, 14:20)myg Wrote:
(23.09.2020, 01:26)benanderson_475 Wrote: if i want to stop a timer from the resident script callback function how can i realize this? 
Many Thanks

I've updated library code, you can now call
Code:
1
timer:stop()
when you need it to be permanently stopped

Note that this also removes timer from the event loop, as a side effect, when you have nothing to run, the loop will exit, but resident script will be restarted after specified delay and everything will start over.

@myg Many thanks,
Reply
#9
Hi,

I am running this code to react to incoming MQTT messages.

During execution, I access storage using a semaphore, but after a while I get 'failed to create semaphore'.

Code:
123456789101112131415
require('sem') -- wait for 5 seconds max before executing callback res1, res2 = sem.runlocked('eventlock', 5, function(lockres)   -- lock acquired   if lockres then     storage.set('my_storage', '')     return true, 'ok'   -- failed to acquire lock   elseif lockres == nil then     return nil, 'failed to create semaphore'   else     return nil, 'failed to lock semaphore'   end end) log(res1, res2)


In the documentation for semaphore:close(), it is stated:
Quote:
Code:
1
semaphore:close()
Closes the semaphore. Not required, semaphore handles are closed automatically when script execution ends

Do I explicitely need to do it in the case of an eventloop? What can I do to prevent failure to create a semaphore?

Thanks
Reply
#10
sem.runlocked closes the semaphore automatically. This issue might happen when there are too many open handles (files, sockets, semaphores etc) which are not closed when needed.
If you only need to receive values from MQTT you can simply use client:loop_forever without any additional libraries.
Reply


Forum Jump: