I've tried to gather best practices across the forum on writing non-blocking loops to handle events from mqtt / bus and timer events in order to write constantly running logic for sending / receiving / syncing data or similar. I've come up with user library below that simplifies API to initiate and attach to event sources. Hopefully it will be useful to others.
Usage (put code below in a resident script, timeout doesn't matter since loop never leaves):
Create user library 'user.eventloop'
Usage (put code below in a resident script, timeout doesn't matter since loop never leaves):
Code:
local mqtt_cfg = {
broker = 'IP',
login = 'USERNAME',
pwd = 'PASSWORD',
id = 'CLIENT_ID',
topics = {'zigbee/+', 'zigbee/bridge/config'},
-- alternative for single topic subscription
-- topics = 'zigbee/+',
}
function mqtt_onmessage(mid, topic, payload)
-- logic
end
function localbus_onmessage(event)
-- logic
end
function on_timer()
-- logic
end
function setup()
local lp = loop.get()
-- use one or more of the below in any combination
-- mqtt client with callback function to receive messages
-- store mqtt.client to local variable if you want to publish messages later
local mqtt = loop.create_mqtt(mqtt_cfg, mqtt_onmessage)
-- add 'groupread = read_callback' to add read callback handler
local bus = loop.create_localbus({ groupwrite = localbus_onmessage })
-- timer that fires callback every 10 seconds, period can be a fraction of a second
local timer = loop.create_timer(10, on_timer)
-- add all workers to our loop
lp:add(mqtt)
lp:add(bus)
lp:add(timer)
end
setup()
-- run loop forever until script is terminated
loop.get():run_loop()
Create user library 'user.eventloop'
Code:
socket = require('socket')
local function safe_callback(callback)
return function(...)
--return callback(...)
local res, errMsg = pcall(callback, ...)
if(not res) then
error(string.format('unhandled exception: %s', errMsg))
end
return res, errMsg
end
end
package.preload['user.eventloop.loop'] = (function(...)
local loop = {}
loop.__index = loop
function loop:add(worker)
self.workers[#self.workers + 1] = worker
if(worker.on_add) then
worker:on_add(self)
end
end
function loop:remove(worker)
local w = self.workers
for i = 1, #w do
if(w[i] == worker) then
table.remove(w, i)
break
end
end
end
function loop:prepare_fds()
local fds = {}
local actions = {}
for i = 1, #self.workers do
local v = self.workers[i]
local fd = v:get_fd()
if(fd) then
fds[#fds + 1] = fd
actions[#actions + 1] = v
end
end
return fds, actions
end
function loop:run_loop()
while(#self.workers > 0) do
local fds, actions = self:prepare_fds()
local res = { socket.selectfds(10, unpack(fds)) }
-- first result is true if returned something, nil if not
if(res[1]) then
for i = 1, #fds do
local fd = res[i + 1]
if(fd) then
actions[i]:step(fd)
end
end
end
end
end
function loop.create_timer(...)
return require('user.eventloop.timer').new(...)
end
function loop.create_localbus(...)
return require('user.eventloop.localbus').new(...)
end
function loop.create_mqtt(...)
return require('user.eventloop.mqtt').new(...)
end
local instance
function loop.get()
if(not instance) then
instance = setmetatable({}, loop)
instance.workers = {}
end
return instance
end
return loop
end)
package.preload['user.eventloop.timer'] = (function(...)
local timer = { type = 'timer' }
timer.__index = timer
function timer:get_fd()
return self.fd
end
function timer:step()
-- disarm timer
self.timer:read()
self.callback()
end
function timer:on_add(loop)
self.loop = loop
end
function timer:stop()
self.timer:close()
self.loop:remove(self)
end
function timer.new(interval, callback)
local t = setmetatable({}, timer)
t.callback = safe_callback(callback)
t.timer = require('timerfd').new(interval)
t.fd = socket.fdmaskset(t.timer:getfd(), 'r')
return t
end
return timer
end)
package.preload['user.eventloop.localbus'] = (function(...)
local lbus = { type = 'localbus' }
lbus.__index = lbus
function lbus:get_fd()
return self.fd
end
function lbus:step()
self.bus:step()
end
function lbus.new(handlers)
local b = setmetatable({}, lbus)
b.bus = require('localbus').new(1)
if(handlers.groupwrite) then
b.bus:sethandler('groupwrite', safe_callback(handlers.groupwrite))
end
if(handlers.groupread) then
b.bus:sethandler('groupread', safe_callback(handlers.groupread))
end
b.fd = socket.fdmaskset(b.bus:getfd(), 'r')
return b
end
return lbus
end)
package.preload['user.eventloop.mqtt'] = (function(...)
local bind = function(obj, func) return function(...) func(obj, ...) end end
local mqtt = { type = 'mqtt' }
mqtt.__index = mqtt
function mqtt:get_fd()
if(self.soc) then
return socket.fdmaskset(self.soc, self.client:want_write() and 'rw' or 'r')
else
return nil
end
end
function mqtt:on_add(loop)
-- need to do misc() every second according to mosquitto doc
local timer = loop.create_timer(1, bind(self, self.step_misc))
loop:add(timer)
end
function mqtt:step_misc()
if(self.soc) then
self.client:loop_misc()
else
self:connect()
end
end
function mqtt:step(fd)
if(socket.fdmaskread(fd)) then
self.client:loop_read()
end
if(socket.fdmaskwrite(fd)) then
self.client:loop_write()
end
end
function mqtt:connect()
self.client:connect(self.broker)
self.soc = self.client:socket() or nil
end
function mqtt:on_connect(success)
if(success) then
log('connected to MQTT')
if(type(self.topics) == 'string') then
self.client:subscribe(self.topics)
else
for i, t in ipairs(self.topics) do
self.client:subscribe(t)
end
end
end
end
function mqtt:on_disconnect(...)
log('mqtt disconnect', ...)
self.soc = nil
end
function mqtt.new(cfg, msg_callback)
local m = setmetatable({}, mqtt)
m.client = require('mosquitto').new(cfg.id)
m.client:login_set(cfg.login, cfg.pwd)
m.client.ON_CONNECT = bind(m, m.on_connect)
m.client.ON_DISCONNECT = bind(m, m.on_disconnect)
-- callback have (mid, topic, payload) arguments
m.client.ON_MESSAGE = safe_callback(msg_callback)
m.broker = cfg.broker
m.topics = cfg.topics
return m
end
return mqtt
end)
return require('user.eventloop.loop')