This forum uses cookies
This forum makes use of cookies to store your login information if you are registered, and your last visit if you are not. Cookies are small text documents stored on your computer; the cookies set by this forum can only be used on this website and pose no security risk. Cookies on this forum also track the specific topics you have read and when you last read them. Please confirm that you accept these cookies being set.

universal MQTT / Timer / Localbus event loop
#1
I've tried to gather best practices across the forum on writing non-blocking loops to handle events from mqtt / bus and timer events in order to write constantly running logic for sending / receiving / syncing data or similar. I've come up with user library below that simplifies API to initiate and attach to event sources. Hopefully it will be useful to others.

Usage (put code below in a resident script, timeout doesn't matter since loop never leaves):


Code:
local mqtt_cfg = {
  broker = 'IP',
  login = 'USERNAME',
  pwd = 'PASSWORD',
  id = 'CLIENT_ID',
  topics = {'zigbee/+', 'zigbee/bridge/config'},
-- alternative for single topic subscription
-- topics = 'zigbee/+',
}

function mqtt_onmessage(mid, topic, payload)
-- logic
end

function localbus_onmessage(event)
-- logic
end

function on_timer()
-- logic
end

function setup()
  local lp = loop.get()
 
  -- use one or more of the below in any combination

  -- mqtt client with callback function to receive messages
  -- store mqtt.client to local variable if you want to publish messages later
  local mqtt = loop.create_mqtt(mqtt_cfg, mqtt_onmessage)
  -- add 'groupread = read_callback' to add read callback handler
  local bus = loop.create_localbus({ groupwrite = localbus_onmessage })
  -- timer that fires callback every 10 seconds, period can be a fraction of a second
  local timer = loop.create_timer(10, on_timer)
 
  -- add all workers to our loop
  lp:add(mqtt)
  lp:add(bus)
  lp:add(timer)
end

setup()
-- run loop forever until script is terminated
loop.get():run_loop()




Create user library 'user.eventloop'

Code:
socket = require('socket')

local function safe_callback(callback)
  return function(...)
    --return callback(...)
  local res, errMsg = pcall(callback, ...)
    if(not res) then
      error(string.format('unhandled exception: %s', errMsg))
    end
    return res, errMsg
  end
end

package.preload['user.eventloop.loop'] = (function(...)
    local loop = {}
    loop.__index = loop
   
    function loop:add(worker)
      self.workers[#self.workers + 1] = worker
      if(worker.on_add) then
        worker:on_add(self)
      end
    end
   
    function loop:remove(worker)
      local w = self.workers
      for i = 1, #w do
        if(w[i] == worker) then
          table.remove(w, i)
          break
        end
      end
    end

    function loop:prepare_fds()
      local fds = {}
      local actions = {}
      for i = 1, #self.workers do
        local v = self.workers[i]
        local fd = v:get_fd()
        if(fd) then
          fds[#fds + 1] = fd
          actions[#actions + 1] = v
        end
      end
     
      return fds, actions
    end
   
    function loop:run_loop()
      while(#self.workers > 0) do
        local fds, actions = self:prepare_fds()

        local res = { socket.selectfds(10, unpack(fds)) }
       
        -- first result is true if returned something, nil if not
        if(res[1]) then
          for i = 1, #fds do
            local fd = res[i + 1]
            if(fd) then
              actions[i]:step(fd)
            end
          end
        end
      end
    end
   
    function loop.create_timer(...)
      return require('user.eventloop.timer').new(...)
    end
   
    function loop.create_localbus(...)
      return require('user.eventloop.localbus').new(...)
    end
   
    function loop.create_mqtt(...)
      return require('user.eventloop.mqtt').new(...)
    end
   
    local instance
    function loop.get()
      if(not instance) then
        instance = setmetatable({}, loop)
        instance.workers = {}
      end
      return instance
    end
     
    return loop
  end)


package.preload['user.eventloop.timer'] = (function(...)
    local timer = { type = 'timer' }
    timer.__index = timer
   
    function timer:get_fd()
    return self.fd
    end
   
    function timer:step()
      -- disarm timer
      self.timer:read()
      self.callback()
    end

function timer:on_add(loop)
      self.loop = loop
    end
   
    function timer:stop()
      self.timer:close()
      self.loop:remove(self)
    end
   
    function timer.new(interval, callback)
      local t = setmetatable({}, timer)
      t.callback = safe_callback(callback)
      t.timer = require('timerfd').new(interval)
      t.fd = socket.fdmaskset(t.timer:getfd(), 'r')
     
      return t
    end
   
    return timer
  end)


package.preload['user.eventloop.localbus'] = (function(...)
    local lbus = { type = 'localbus' }
    lbus.__index = lbus
   
    function lbus:get_fd()
      return self.fd
    end
   
    function lbus:step()
      self.bus:step()
    end
   
    function lbus.new(handlers)
      local b = setmetatable({}, lbus)
     
      b.bus = require('localbus').new(1)
      if(handlers.groupwrite) then
        b.bus:sethandler('groupwrite', safe_callback(handlers.groupwrite))
      end
      if(handlers.groupread) then
      b.bus:sethandler('groupread', safe_callback(handlers.groupread))
      end
      b.fd = socket.fdmaskset(b.bus:getfd(), 'r')
     
      return b
    end
   
    return lbus
  end)

package.preload['user.eventloop.mqtt'] = (function(...)
    local bind = function(obj, func) return function(...) func(obj, ...) end end
    local mqtt = { type = 'mqtt' }
    mqtt.__index = mqtt
   
    function mqtt:get_fd()
      if(self.soc) then
        return socket.fdmaskset(self.soc, self.client:want_write() and 'rw' or 'r')
      else
        return nil
      end
    end
   
    function mqtt:on_add(loop)
      -- need to do misc() every second according to mosquitto doc
      local timer = loop.create_timer(1, bind(self, self.step_misc))
      loop:add(timer)
    end
     
    function mqtt:step_misc()
      if(self.soc) then
        self.client:loop_misc()
      else
        self:connect()
      end
    end
   
    function mqtt:step(fd)
      if(socket.fdmaskread(fd)) then
        self.client:loop_read()
      end

      if(socket.fdmaskwrite(fd)) then
        self.client:loop_write()
      end
    end
   
    function mqtt:connect()
      self.client:connect(self.broker)
      self.soc = self.client:socket() or nil
    end
   
    function mqtt:on_connect(success)
      if(success) then
        log('connected to MQTT')
        if(type(self.topics) == 'string') then
          self.client:subscribe(self.topics)
        else
          for i, t in ipairs(self.topics) do
            self.client:subscribe(t)
          end
        end
      end
    end
   
    function mqtt:on_disconnect(...)
      log('mqtt disconnect', ...)
      self.soc = nil
    end
   
    function mqtt.new(cfg, msg_callback)
      local m = setmetatable({}, mqtt)
     
      m.client = require('mosquitto').new(cfg.id)
      m.client:login_set(cfg.login, cfg.pwd)
      m.client.ON_CONNECT = bind(m, m.on_connect)
      m.client.ON_DISCONNECT = bind(m, m.on_disconnect)
      -- callback have (mid, topic, payload) arguments
      m.client.ON_MESSAGE = safe_callback(msg_callback)
     
      m.broker = cfg.broker
      m.topics = cfg.topics
     
      return m
    end
   
    return mqtt
  end)

return require('user.eventloop.loop')
Reply
#2
Looks good but you need to check on_connect first argument. It will be false if connection failed.
Reply
#3
Thanks, i've added this check to code above to make it cleaner. It seems it wasn't critical – in case socket connection was established, but server did not accept connection (for ex. wrong credentials), disconnect event happened quickly after that. In other cases (server not available) connect event wasn't fired at all.
Reply
#4
Hi i am running some tests with the above code for mqtt, some reason i receive an error in the log as below, i don't know why the loop.get() function is not returning?  

Resident script:29: attempt to index global 'loop' (a nil value)
stack traceback:
Resident script:29: in function 'setup'


resident script line 29 has, local lp = loop.get()

i have added the library called 'eventloop' as above.

[Edit]
ahhh i have it now, i removed the first local before loop ={} in the library i am connected now...
Reply
#5
I've published updated library version to include safe callbacks in case of error in the user function, which would eventually ruin resident script with undesired behaviour if not handled.
Reply
#6
if i want to stop a timer from the resident script callback function how can i realize this? 
Many Thanks

Code:
function on_timer2()

if (some condition) then
-- stop Timer here
end
end


function setup()
  local lp = loop.get()
  
  -- use one or more of the below in any combination

  -- mqtt client with callback function to receive messages
  -- store mqtt.client to local variable if you want to publish messages later
  local mqtt = loop.create_mqtt(mqtt_cfg, mqtt_onmessage)
  -- add 'groupread = read_callback' to add read callback handler
  local bus = loop.create_localbus({ groupwrite = localbus_onmessage })
  -- timer that fires callback every 10 seconds, period can be a fraction of a second
  
  local timer = loop.create_timer(10, on_timer)
  local timer = loop.create_timer(10, on_timer2)
  
  -- add all workers to our loop
  lp:add(mqtt)
  lp:add(bus)
  lp:add(timer)
  
end
Reply
#7
(23.09.2020, 01:26)benanderson_475 Wrote: if i want to stop a timer from the resident script callback function how can i realize this? 
Many Thanks

I've updated library code, you can now call
Code:
timer:stop()
when you need it to be permanently stopped

Note that this also removes timer from the event loop, as a side effect, when you have nothing to run, the loop will exit, but resident script will be restarted after specified delay and everything will start over.
Reply
#8
(23.09.2020, 14:20)myg Wrote:
(23.09.2020, 01:26)benanderson_475 Wrote: if i want to stop a timer from the resident script callback function how can i realize this? 
Many Thanks

I've updated library code, you can now call
Code:
timer:stop()
when you need it to be permanently stopped

Note that this also removes timer from the event loop, as a side effect, when you have nothing to run, the loop will exit, but resident script will be restarted after specified delay and everything will start over.

@myg Many thanks,
Reply
#9
Hi,

I am running this code to react to incoming MQTT messages.

During execution, I access storage using a semaphore, but after a while I get 'failed to create semaphore'.

Code:
require('sem')
-- wait for 5 seconds max before executing callback
res1, res2 = sem.runlocked('eventlock', 5, function(lockres)
  -- lock acquired
  if lockres then
    storage.set('my_storage', '')
    return true, 'ok'
  -- failed to acquire lock
  elseif lockres == nil then
    return nil, 'failed to create semaphore'
  else
    return nil, 'failed to lock semaphore'
  end
end)
log(res1, res2)


In the documentation for semaphore:close(), it is stated:
Quote:
Code:
semaphore:close()
Closes the semaphore. Not required, semaphore handles are closed automatically when script execution ends

Do I explicitely need to do it in the case of an eventloop? What can I do to prevent failure to create a semaphore?

Thanks
Reply
#10
sem.runlocked closes the semaphore automatically. This issue might happen when there are too many open handles (files, sockets, semaphores etc) which are not closed when needed.
If you only need to receive values from MQTT you can simply use client:loop_forever without any additional libraries.
Reply


Forum Jump: