I've tried to gather best practices across the forum on writing non-blocking loops to handle events from mqtt / bus and timer events in order to write constantly running logic for sending / receiving / syncing data or similar. I've come up with user library below that simplifies API to initiate and attach to event sources. Hopefully it will be useful to others.
Usage (put code below in a resident script, timeout doesn't matter since loop never leaves):
Create user library 'user.eventloop'
	
	
	
	
Usage (put code below in a resident script, timeout doesn't matter since loop never leaves):
Code:
local mqtt_cfg = {
  broker = 'IP',
  login = 'USERNAME',
  pwd = 'PASSWORD',
  id = 'CLIENT_ID',
  topics = {'zigbee/+', 'zigbee/bridge/config'},
-- alternative for single topic subscription
-- topics = 'zigbee/+',
}
function mqtt_onmessage(mid, topic, payload)
-- logic
end
function localbus_onmessage(event)
-- logic
end
function on_timer()
-- logic
end
function setup()
  local lp = loop.get()
  
  -- use one or more of the below in any combination
  -- mqtt client with callback function to receive messages
  -- store mqtt.client to local variable if you want to publish messages later
  local mqtt = loop.create_mqtt(mqtt_cfg, mqtt_onmessage)
  -- add 'groupread = read_callback' to add read callback handler
  local bus = loop.create_localbus({ groupwrite = localbus_onmessage })
  -- timer that fires callback every 10 seconds, period can be a fraction of a second
  local timer = loop.create_timer(10, on_timer)
  
  -- add all workers to our loop
  lp:add(mqtt)
  lp:add(bus)
  lp:add(timer)
end
setup()
-- run loop forever until script is terminated
loop.get():run_loop()Create user library 'user.eventloop'
Code:
socket = require('socket')
local function safe_callback(callback)
  return function(...)
    --return callback(...)
  local res, errMsg = pcall(callback, ...)
    if(not res) then
      error(string.format('unhandled exception: %s', errMsg))
    end
    return res, errMsg
  end
end
package.preload['user.eventloop.loop'] = (function(...)
    local loop = {}
    loop.__index = loop
    
    function loop:add(worker)
      self.workers[#self.workers + 1] = worker
      if(worker.on_add) then
        worker:on_add(self)
      end
    end
    
    function loop:remove(worker)
      local w = self.workers
      for i = 1, #w do
        if(w[i] == worker) then
          table.remove(w, i)
          break
        end
      end
    end
    function loop:prepare_fds()
      local fds = {}
      local actions = {}
      for i = 1, #self.workers do
        local v = self.workers[i]
        local fd = v:get_fd()
        if(fd) then
          fds[#fds + 1] = fd
          actions[#actions + 1] = v
        end
      end
      
      return fds, actions
    end
    
    function loop:run_loop()
      while(#self.workers > 0) do
        local fds, actions = self:prepare_fds()
        local res = { socket.selectfds(10, unpack(fds)) }
        
        -- first result is true if returned something, nil if not
        if(res[1]) then
          for i = 1, #fds do
            local fd = res[i + 1]
            if(fd) then
              actions[i]:step(fd)
            end
          end
        end
      end
    end
    
    function loop.create_timer(...)
      return require('user.eventloop.timer').new(...)
    end
    
    function loop.create_localbus(...)
      return require('user.eventloop.localbus').new(...)
    end
    
    function loop.create_mqtt(...)
      return require('user.eventloop.mqtt').new(...)
    end
    
    local instance
    function loop.get()
      if(not instance) then
        instance = setmetatable({}, loop)
        instance.workers = {}
      end
      return instance
    end
      
    return loop
  end)
package.preload['user.eventloop.timer'] = (function(...)
    local timer = { type = 'timer' }
    timer.__index = timer
    
    function timer:get_fd()
    return self.fd
    end
    
    function timer:step()
      -- disarm timer
      self.timer:read()
      self.callback()
    end
function timer:on_add(loop)
      self.loop = loop
    end
    
    function timer:stop()
      self.timer:close()
      self.loop:remove(self)
    end
    
    function timer.new(interval, callback)
      local t = setmetatable({}, timer)
      t.callback = safe_callback(callback)
      t.timer = require('timerfd').new(interval)
      t.fd = socket.fdmaskset(t.timer:getfd(), 'r')
      
      return t
    end
    
    return timer
  end)
package.preload['user.eventloop.localbus'] = (function(...)
    local lbus = { type = 'localbus' }
    lbus.__index = lbus
    
    function lbus:get_fd()
      return self.fd
    end
    
    function lbus:step()
      self.bus:step()
    end
    
    function lbus.new(handlers)
      local b = setmetatable({}, lbus)
      
      b.bus = require('localbus').new(1)
      if(handlers.groupwrite) then
        b.bus:sethandler('groupwrite', safe_callback(handlers.groupwrite))
      end
      if(handlers.groupread) then
      b.bus:sethandler('groupread', safe_callback(handlers.groupread))
      end
      b.fd = socket.fdmaskset(b.bus:getfd(), 'r')
      
      return b
    end
    
    return lbus
  end)
package.preload['user.eventloop.mqtt'] = (function(...)
    local bind = function(obj, func) return function(...) func(obj, ...) end end
    local mqtt = { type = 'mqtt' }
    mqtt.__index = mqtt
    
    function mqtt:get_fd()
      if(self.soc) then
        return socket.fdmaskset(self.soc, self.client:want_write() and 'rw' or 'r')
      else
        return nil
      end
    end
    
    function mqtt:on_add(loop)
      -- need to do misc() every second according to mosquitto doc
      local timer = loop.create_timer(1, bind(self, self.step_misc))
      loop:add(timer)
    end
      
    function mqtt:step_misc()
      if(self.soc) then
        self.client:loop_misc()
      else
        self:connect()
      end
    end
    
    function mqtt:step(fd)
      if(socket.fdmaskread(fd)) then
        self.client:loop_read()
      end
      if(socket.fdmaskwrite(fd)) then
        self.client:loop_write()
      end
    end
    
    function mqtt:connect()
      self.client:connect(self.broker)
      self.soc = self.client:socket() or nil
    end
    
    function mqtt:on_connect(success)
      if(success) then
        log('connected to MQTT')
        if(type(self.topics) == 'string') then
          self.client:subscribe(self.topics)
        else
          for i, t in ipairs(self.topics) do
            self.client:subscribe(t)
          end
        end
      end
    end
    
    function mqtt:on_disconnect(...)
      log('mqtt disconnect', ...)
      self.soc = nil
    end
    
    function mqtt.new(cfg, msg_callback)
      local m = setmetatable({}, mqtt)
      
      m.client = require('mosquitto').new(cfg.id)
      m.client:login_set(cfg.login, cfg.pwd)
      m.client.ON_CONNECT = bind(m, m.on_connect)
      m.client.ON_DISCONNECT = bind(m, m.on_disconnect)
      -- callback have (mid, topic, payload) arguments
      m.client.ON_MESSAGE = safe_callback(msg_callback)
      
      m.broker = cfg.broker
      m.topics = cfg.topics
      
      return m
    end
    
    return mqtt
  end)
return require('user.eventloop.loop') 
 

 
