This forum uses cookies
This forum makes use of cookies to store your login information if you are registered, and your last visit if you are not. Cookies are small text documents stored on your computer; the cookies set by this forum can only be used on this website and pose no security risk. Cookies on this forum also track the specific topics you have read and when you last read them. Please confirm whether you accept or reject these cookies being set.

universal MQTT / Timer / Localbus event loop
#1
I've tried to gather best practices across the forum on writing non-blocking loops to handle events from mqtt / bus and timer events in order to write constantly running logic for sending / receiving / syncing data or similar. I've come up with user library below that simplifies API to initiate and attach to event sources. Hopefully it will be useful to others.

Usage (put code below in a resident script, timeout doesn't matter since loop never leaves):


Code:
local mqtt_cfg = {
  broker = 'IP',
  login = 'USERNAME',
  pwd = 'PASSWORD',
  id = 'CLIENT_ID',
  topics = {'zigbee/+', 'zigbee/bridge/config'},
-- alternative for single topic subscription
-- topics = 'zigbee/+',
}

function mqtt_onmessage(mid, topic, payload)
-- logic
end

function localbus_onmessage(event)
-- logic
end

function on_timer()
-- logic
end

function setup()
  local lp = loop.get()
 
  -- use one or more of the below in any combination

  -- mqtt client with callback function to receive messages
  -- store mqtt.client to local variable if you want to publish messages later
  local mqtt = loop.create_mqtt(mqtt_cfg, mqtt_onmessage)
  -- add 'groupread = read_callback' to add read callback handler
  local bus = loop.create_localbus({ groupwrite = localbus_onmessage })
  -- timer that fires callback every 10 seconds, period can be a fraction of a second
  local timer = loop.create_timer(10, on_timer)
 
  -- add all workers to our loop
  lp:add(mqtt)
  lp:add(bus)
  lp:add(timer)
end

setup()
-- run loop forever until script is terminated
loop.get():run_loop()




Create user library 'user.eventloop'
Code:
socket = require('socket')

local function safe_callback(callback)
  return function(...)
  local res, errMsg = pcall(callback, ...)
    if(not res) then
      error('unhandled exception: ', errMsg)
    end
    return res, errMsg
  end
end

package.preload['user.eventloop.loop'] = (function(...)
    local loop = {}
    loop.__index = loop
   
    function loop:add(worker)
      self.workers[#self.workers + 1] = worker
      if(worker.on_add) then
        worker:on_add(self)
      end
    end
   
    function loop:prepare_fds()
      local fds = {}
      local actions = {}
      for i = 1, #self.workers do
        local v = self.workers[i]
        local fd = v:get_fd()
        if(fd) then
          fds[#fds + 1] = fd
          actions[#actions + 1] = v
        end
      end
     
      return fds, actions
    end
   
    function loop:run_loop()
      while(true) do
        local fds, actions = self:prepare_fds()

        local res = { socket.selectfds(10, unpack(fds)) }
       
        -- first result is true if returned something, nil if not
        if(res[1]) then
          for i = 1, #fds do
            local fd = res[i + 1]
            if(fd) then
              actions[i]:step(fd)
            end
          end
        end
      end
    end
   
    function loop.create_timer(...)
      return require('user.eventloop.timer').new(...)
    end
   
    function loop.create_localbus(...)
      return require('user.eventloop.localbus').new(...)
    end
   
    function loop.create_mqtt(...)
      return require('user.eventloop.mqtt').new(...)
    end
   
    local instance
    function loop.get()
      if(not instance) then
        instance = setmetatable({}, loop)
        instance.workers = {}
      end
      return instance
    end
     
    return loop
  end)


package.preload['user.eventloop.timer'] = (function(...)
    local timer = { type = 'timer' }
    timer.__index = timer
   
    function timer:get_fd()
    return self.fd
    end
   
    function timer:step()
      -- disarm timer
      self.timer:read()
      self.callback()
    end
   
    function timer.new(interval, callback)
      local t = setmetatable({}, timer)
      t.callback = safe_callback(callback)
      t.timer = require('timerfd').new(interval)
      t.fd = socket.fdmaskset(t.timer:getfd(), 'r')
     
      return t
    end
   
    return timer
  end)


package.preload['user.eventloop.localbus'] = (function(...)
    local lbus = { type = 'localbus' }
    lbus.__index = lbus
   
    function lbus:get_fd()
      return self.fd
    end
   
    function lbus:step()
      self.bus:step()
    end
   
    function lbus.new(handlers)
      local b = setmetatable({}, lbus)
     
      b.bus = require('localbus').new(1)
      if(handlers.groupwrite) then
        b.bus:sethandler('groupwrite', safe_callback(handlers.groupwrite))
      end
      if(handlers.groupread) then
      b.bus:sethandler('groupread', safe_callback(handlers.groupread))
      end
      b.fd = socket.fdmaskset(b.bus:getfd(), 'r')
     
      return b
    end
   
    return lbus
  end)

package.preload['user.eventloop.mqtt'] = (function(...)
    local bind = function(obj, func) return function(...) func(obj, ...) end end
    local mqtt = { type = 'mqtt' }
    mqtt.__index = mqtt
   
    function mqtt:get_fd()
      if(self.soc) then
        return socket.fdmaskset(self.soc, self.client:want_write() and 'rw' or 'r')
      else
        return nil
      end
    end
   
    function mqtt:on_add(loop)
      -- need to do misc() every second according to mosquitto doc
      local timer = loop.create_timer(1, bind(self, self.step_misc))
      loop:add(timer)
    end
     
    function mqtt:step_misc()
      if(self.soc) then
        self.client:loop_misc()
      else
        self:connect()
      end
    end
   
    function mqtt:step(fd)
      if(socket.fdmaskread(fd)) then
        self.client:loop_read()
      end

      if(socket.fdmaskwrite(fd)) then
        self.client:loop_write()
      end
    end
   
    function mqtt:connect()
      self.client:connect(self.broker)
      self.soc = self.client:socket() or nil
    end
   
    function mqtt:on_connect(success)
      if(success) then
        log('connected to MQTT')
        if(type(self.topics) == 'string') then
          self.client:subscribe(self.topics)
        else
          for i, t in ipairs(self.topics) do
            self.client:subscribe(t)
          end
        end
      end
    end
   
    function mqtt:on_disconnect(...)
      log('mqtt disconnect', ...)
      self.soc = nil
    end
   
    function mqtt.new(cfg, msg_callback)
      local m = setmetatable({}, mqtt)
     
      m.client = require('mosquitto').new(cfg.id)
      m.client:login_set(cfg.login, cfg.pwd)
      m.client.ON_CONNECT = bind(m, m.on_connect)
      m.client.ON_DISCONNECT = bind(m, m.on_disconnect)
      -- callback have (mid, topic, payload) arguments
      m.client.ON_MESSAGE = safe_callback(msg_callback)
     
      m.broker = cfg.broker
      m.topics = cfg.topics
     
      return m
    end
   
    return mqtt
  end)

return require('user.eventloop.loop')
Reply
#2
Looks good but you need to check on_connect first argument. It will be false if connection failed.
Reply
#3
Thanks, i've added this check to code above to make it cleaner. It seems it wasn't critical – in case socket connection was established, but server did not accept connection (for ex. wrong credentials), disconnect event happened quickly after that. In other cases (server not available) connect event wasn't fired at all.
Reply
#4
Hi i am running some tests with the above code for mqtt, some reason i receive an error in the log as below, i don't know why the loop.get() function is not returning?  

Resident script:29: attempt to index global 'loop' (a nil value)
stack traceback:
Resident script:29: in function 'setup'


resident script line 29 has, local lp = loop.get()

i have added the library called 'eventloop' as above.

[Edit]
ahhh i have it now, i removed the first local before loop ={} in the library i am connected now...
Reply
#5
I've published updated library version to include safe callbacks in case of error in the user function, which would eventually ruin resident script with undesired behaviour if not handled.
Reply


Forum Jump: