This forum uses cookies
This forum makes use of cookies to store your login information if you are registered, and your last visit if you are not. Cookies are small text documents stored on your computer; the cookies set by this forum can only be used on this website and pose no security risk. Cookies on this forum also track the specific topics you have read and when you last read them. Please confirm that you accept these cookies being set.

universal MQTT / Timer / Localbus event loop
#1
I've tried to gather best practices across the forum on writing non-blocking loops to handle events from mqtt / bus and timer events in order to write constantly running logic for sending / receiving / syncing data or similar. I've come up with user library below that simplifies API to initiate and attach to event sources. Hopefully it will be useful to others.

Usage (put code below in a resident script, timeout doesn't matter since loop never leaves):


Code:
local mqtt_cfg = {   broker = 'IP',   login = 'USERNAME',   pwd = 'PASSWORD',   id = 'CLIENT_ID',   topics = {'zigbee/+', 'zigbee/bridge/config'}, -- alternative for single topic subscription -- topics = 'zigbee/+', } function mqtt_onmessage(mid, topic, payload) -- logic end function localbus_onmessage(event) -- logic end function on_timer() -- logic end function setup()   local lp = loop.get()     -- use one or more of the below in any combination   -- mqtt client with callback function to receive messages   -- store mqtt.client to local variable if you want to publish messages later   local mqtt = loop.create_mqtt(mqtt_cfg, mqtt_onmessage)   -- add 'groupread = read_callback' to add read callback handler   local bus = loop.create_localbus({ groupwrite = localbus_onmessage })   -- timer that fires callback every 10 seconds, period can be a fraction of a second   local timer = loop.create_timer(10, on_timer)     -- add all workers to our loop   lp:add(mqtt)   lp:add(bus)   lp:add(timer) end setup() -- run loop forever until script is terminated loop.get():run_loop()




Create user library 'user.eventloop'

Code:
socket = require('socket') local function safe_callback(callback)   return function(...)     --return callback(...)   local res, errMsg = pcall(callback, ...)     if(not res) then       error(string.format('unhandled exception: %s', errMsg))     end     return res, errMsg   end end package.preload['user.eventloop.loop'] = (function(...)     local loop = {}     loop.__index = loop         function loop:add(worker)       self.workers[#self.workers + 1] = worker       if(worker.on_add) then         worker:on_add(self)       end     end         function loop:remove(worker)       local w = self.workers       for i = 1, #w do         if(w[i] == worker) then           table.remove(w, i)           break         end       end     end     function loop:prepare_fds()       local fds = {}       local actions = {}       for i = 1, #self.workers do         local v = self.workers[i]         local fd = v:get_fd()         if(fd) then           fds[#fds + 1] = fd           actions[#actions + 1] = v         end       end             return fds, actions     end         function loop:run_loop()       while(#self.workers > 0) do         local fds, actions = self:prepare_fds()         local res = { socket.selectfds(10, unpack(fds)) }                 -- first result is true if returned something, nil if not         if(res[1]) then           for i = 1, #fds do             local fd = res[i + 1]             if(fd) then               actions[i]:step(fd)             end           end         end       end     end         function loop.create_timer(...)       return require('user.eventloop.timer').new(...)     end         function loop.create_localbus(...)       return require('user.eventloop.localbus').new(...)     end         function loop.create_mqtt(...)       return require('user.eventloop.mqtt').new(...)     end         local instance     function loop.get()       if(not instance) then         instance = setmetatable({}, loop)         instance.workers = {}       end       return instance     end           return loop   end) package.preload['user.eventloop.timer'] = (function(...)     local timer = { type = 'timer' }     timer.__index = timer         function timer:get_fd()     return self.fd     end         function timer:step()       -- disarm timer       self.timer:read()       self.callback()     end function timer:on_add(loop)       self.loop = loop     end         function timer:stop()       self.timer:close()       self.loop:remove(self)     end         function timer.new(interval, callback)       local t = setmetatable({}, timer)       t.callback = safe_callback(callback)       t.timer = require('timerfd').new(interval)       t.fd = socket.fdmaskset(t.timer:getfd(), 'r')             return t     end         return timer   end) package.preload['user.eventloop.localbus'] = (function(...)     local lbus = { type = 'localbus' }     lbus.__index = lbus         function lbus:get_fd()       return self.fd     end         function lbus:step()       self.bus:step()     end         function lbus.new(handlers)       local b = setmetatable({}, lbus)             b.bus = require('localbus').new(1)       if(handlers.groupwrite) then         b.bus:sethandler('groupwrite', safe_callback(handlers.groupwrite))       end       if(handlers.groupread) then       b.bus:sethandler('groupread', safe_callback(handlers.groupread))       end       b.fd = socket.fdmaskset(b.bus:getfd(), 'r')             return b     end         return lbus   end) package.preload['user.eventloop.mqtt'] = (function(...)     local bind = function(obj, func) return function(...) func(obj, ...) end end     local mqtt = { type = 'mqtt' }     mqtt.__index = mqtt         function mqtt:get_fd()       if(self.soc) then         return socket.fdmaskset(self.soc, self.client:want_write() and 'rw' or 'r')       else         return nil       end     end         function mqtt:on_add(loop)       -- need to do misc() every second according to mosquitto doc       local timer = loop.create_timer(1, bind(self, self.step_misc))       loop:add(timer)     end           function mqtt:step_misc()       if(self.soc) then         self.client:loop_misc()       else         self:connect()       end     end         function mqtt:step(fd)       if(socket.fdmaskread(fd)) then         self.client:loop_read()       end       if(socket.fdmaskwrite(fd)) then         self.client:loop_write()       end     end         function mqtt:connect()       self.client:connect(self.broker)       self.soc = self.client:socket() or nil     end         function mqtt:on_connect(success)       if(success) then         log('connected to MQTT')         if(type(self.topics) == 'string') then           self.client:subscribe(self.topics)         else           for i, t in ipairs(self.topics) do             self.client:subscribe(t)           end         end       end     end         function mqtt:on_disconnect(...)       log('mqtt disconnect', ...)       self.soc = nil     end         function mqtt.new(cfg, msg_callback)       local m = setmetatable({}, mqtt)             m.client = require('mosquitto').new(cfg.id)       m.client:login_set(cfg.login, cfg.pwd)       m.client.ON_CONNECT = bind(m, m.on_connect)       m.client.ON_DISCONNECT = bind(m, m.on_disconnect)       -- callback have (mid, topic, payload) arguments       m.client.ON_MESSAGE = safe_callback(msg_callback)             m.broker = cfg.broker       m.topics = cfg.topics             return m     end         return mqtt   end) return require('user.eventloop.loop')
Reply


Messages In This Thread
universal MQTT / Timer / Localbus event loop - by myg - 02.06.2020, 10:48

Forum Jump: