This forum uses cookies
This forum makes use of cookies to store your login information if you are registered, and your last visit if you are not. Cookies are small text documents stored on your computer; the cookies set by this forum can only be used on this website and pose no security risk. Cookies on this forum also track the specific topics you have read and when you last read them. Please confirm that you accept these cookies being set.

universal MQTT / Timer / Localbus event loop
#1
I've tried to gather best practices across the forum on writing non-blocking loops to handle events from mqtt / bus and timer events in order to write constantly running logic for sending / receiving / syncing data or similar. I've come up with user library below that simplifies API to initiate and attach to event sources. Hopefully it will be useful to others.

Usage (put code below in a resident script, timeout doesn't matter since loop never leaves):


Code:
local mqtt_cfg = {
  broker = 'IP',
  login = 'USERNAME',
  pwd = 'PASSWORD',
  id = 'CLIENT_ID',
  topics = {'zigbee/+', 'zigbee/bridge/config'},
-- alternative for single topic subscription
-- topics = 'zigbee/+',
}

function mqtt_onmessage(mid, topic, payload)
-- logic
end

function localbus_onmessage(event)
-- logic
end

function on_timer()
-- logic
end

function setup()
  local lp = loop.get()
 
  -- use one or more of the below in any combination

  -- mqtt client with callback function to receive messages
  -- store mqtt.client to local variable if you want to publish messages later
  local mqtt = loop.create_mqtt(mqtt_cfg, mqtt_onmessage)
  -- add 'groupread = read_callback' to add read callback handler
  local bus = loop.create_localbus({ groupwrite = localbus_onmessage })
  -- timer that fires callback every 10 seconds, period can be a fraction of a second
  local timer = loop.create_timer(10, on_timer)
 
  -- add all workers to our loop
  lp:add(mqtt)
  lp:add(bus)
  lp:add(timer)
end

setup()
-- run loop forever until script is terminated
loop.get():run_loop()




Create user library 'user.eventloop'

Code:
socket = require('socket')

local function safe_callback(callback)
  return function(...)
    --return callback(...)
  local res, errMsg = pcall(callback, ...)
    if(not res) then
      error(string.format('unhandled exception: %s', errMsg))
    end
    return res, errMsg
  end
end

package.preload['user.eventloop.loop'] = (function(...)
    local loop = {}
    loop.__index = loop
   
    function loop:add(worker)
      self.workers[#self.workers + 1] = worker
      if(worker.on_add) then
        worker:on_add(self)
      end
    end
   
    function loop:remove(worker)
      local w = self.workers
      for i = 1, #w do
        if(w[i] == worker) then
          table.remove(w, i)
          break
        end
      end
    end

    function loop:prepare_fds()
      local fds = {}
      local actions = {}
      for i = 1, #self.workers do
        local v = self.workers[i]
        local fd = v:get_fd()
        if(fd) then
          fds[#fds + 1] = fd
          actions[#actions + 1] = v
        end
      end
     
      return fds, actions
    end
   
    function loop:run_loop()
      while(#self.workers > 0) do
        local fds, actions = self:prepare_fds()

        local res = { socket.selectfds(10, unpack(fds)) }
       
        -- first result is true if returned something, nil if not
        if(res[1]) then
          for i = 1, #fds do
            local fd = res[i + 1]
            if(fd) then
              actions[i]:step(fd)
            end
          end
        end
      end
    end
   
    function loop.create_timer(...)
      return require('user.eventloop.timer').new(...)
    end
   
    function loop.create_localbus(...)
      return require('user.eventloop.localbus').new(...)
    end
   
    function loop.create_mqtt(...)
      return require('user.eventloop.mqtt').new(...)
    end
   
    local instance
    function loop.get()
      if(not instance) then
        instance = setmetatable({}, loop)
        instance.workers = {}
      end
      return instance
    end
     
    return loop
  end)


package.preload['user.eventloop.timer'] = (function(...)
    local timer = { type = 'timer' }
    timer.__index = timer
   
    function timer:get_fd()
    return self.fd
    end
   
    function timer:step()
      -- disarm timer
      self.timer:read()
      self.callback()
    end

function timer:on_add(loop)
      self.loop = loop
    end
   
    function timer:stop()
      self.timer:close()
      self.loop:remove(self)
    end
   
    function timer.new(interval, callback)
      local t = setmetatable({}, timer)
      t.callback = safe_callback(callback)
      t.timer = require('timerfd').new(interval)
      t.fd = socket.fdmaskset(t.timer:getfd(), 'r')
     
      return t
    end
   
    return timer
  end)


package.preload['user.eventloop.localbus'] = (function(...)
    local lbus = { type = 'localbus' }
    lbus.__index = lbus
   
    function lbus:get_fd()
      return self.fd
    end
   
    function lbus:step()
      self.bus:step()
    end
   
    function lbus.new(handlers)
      local b = setmetatable({}, lbus)
     
      b.bus = require('localbus').new(1)
      if(handlers.groupwrite) then
        b.bus:sethandler('groupwrite', safe_callback(handlers.groupwrite))
      end
      if(handlers.groupread) then
      b.bus:sethandler('groupread', safe_callback(handlers.groupread))
      end
      b.fd = socket.fdmaskset(b.bus:getfd(), 'r')
     
      return b
    end
   
    return lbus
  end)

package.preload['user.eventloop.mqtt'] = (function(...)
    local bind = function(obj, func) return function(...) func(obj, ...) end end
    local mqtt = { type = 'mqtt' }
    mqtt.__index = mqtt
   
    function mqtt:get_fd()
      if(self.soc) then
        return socket.fdmaskset(self.soc, self.client:want_write() and 'rw' or 'r')
      else
        return nil
      end
    end
   
    function mqtt:on_add(loop)
      -- need to do misc() every second according to mosquitto doc
      local timer = loop.create_timer(1, bind(self, self.step_misc))
      loop:add(timer)
    end
     
    function mqtt:step_misc()
      if(self.soc) then
        self.client:loop_misc()
      else
        self:connect()
      end
    end
   
    function mqtt:step(fd)
      if(socket.fdmaskread(fd)) then
        self.client:loop_read()
      end

      if(socket.fdmaskwrite(fd)) then
        self.client:loop_write()
      end
    end
   
    function mqtt:connect()
      self.client:connect(self.broker)
      self.soc = self.client:socket() or nil
    end
   
    function mqtt:on_connect(success)
      if(success) then
        log('connected to MQTT')
        if(type(self.topics) == 'string') then
          self.client:subscribe(self.topics)
        else
          for i, t in ipairs(self.topics) do
            self.client:subscribe(t)
          end
        end
      end
    end
   
    function mqtt:on_disconnect(...)
      log('mqtt disconnect', ...)
      self.soc = nil
    end
   
    function mqtt.new(cfg, msg_callback)
      local m = setmetatable({}, mqtt)
     
      m.client = require('mosquitto').new(cfg.id)
      m.client:login_set(cfg.login, cfg.pwd)
      m.client.ON_CONNECT = bind(m, m.on_connect)
      m.client.ON_DISCONNECT = bind(m, m.on_disconnect)
      -- callback have (mid, topic, payload) arguments
      m.client.ON_MESSAGE = safe_callback(msg_callback)
     
      m.broker = cfg.broker
      m.topics = cfg.topics
     
      return m
    end
   
    return mqtt
  end)

return require('user.eventloop.loop')
Reply


Messages In This Thread
universal MQTT / Timer / Localbus event loop - by myg - 02.06.2020, 10:48

Forum Jump: