02.06.2020, 12:11
Based on several examples from this forum of working with MQTT, i've decided i need more universal configuration-based binding to integrate my Zigbee devices into KNX network through zigbee2mqtt bridge. With script below it is also possible to bind mqtt endpoints together directly without need to write event scripts.
You also need eventloop script in your user library. Feel free to use/modify any code
1) Create user script "user.mqtt2knx" below and modify your MQTT bindings using supplied examples. You'll need to restart resident script each time you modify bindings.
2) create resident script (timeout doesn't matter), update with your MQTT broker config and MQTT topic
You also need eventloop script in your user library. Feel free to use/modify any code
1) Create user script "user.mqtt2knx" below and modify your MQTT bindings using supplied examples. You'll need to restart resident script each time you modify bindings.
Code:
package.preload['mqtt2knx.transformers'] = (function(...)
local transformers = {
onoff_to_bool = function(val)
val = val:lower()
return val == "on" and true or false
end,
bool_to_onoff = function(val)
return val and "on" or "off"
end,
const = function(n)
return function() return n end
end,
nosend = function() return nil end,
identity = function(x) return x end,
}
return transformers
end)
package.preload['mqtt2knx.bindings'] = (function(...)
local trans = require('mqtt2knx.transformers')
local ONOFF_TO_BOOL = trans.onoff_to_bool
local BOOL_TO_ONOFF = trans.bool_to_onoff
local CONST = trans.const
local NOSEND = trans.nosend
local IDENTITY = trans.identity
-- examples of MQTT bindings (in this case with zigbee2mqtt)
local m2k_bindings = {
-- relay on/off bi-directional binding via "state" attribute
{
mqtt = 'relay[state]',
knx = '1/1/1',
to_knx = ONOFF_TO_BOOL,
to_mqtt = BOOL_TO_ONOFF
},
-- Simple sensor one-directional binding
{
mqtt = 'door[contact]',
knx = '1/1/1',
},
-- Motion sensor with force override
-- KNX force overwrite needed if object is sending same message on activation
{
mqtt = 'motion[occupancy]',
knx = '1/1/1',
force_overwrite = true,
},
-- button click example, button can send click=single|double|long values
{
mqtt = 'button[click=single]',
knx = '1/1/1',
to_knx = CONST(true),
force_overwrite = true
},
-- sync with KNX and at the same time directly toggle MQTT relay on button click
{
mqtt = 'button[click=double]',
knx = '1/1/1',
to_knx = CONST(true),
force_overwrite = true,
-- send constant value directly to another MQTT device
mqtt_bind = 'relay[state=toggle]'
},
-- direct binding of water sensor with water protection relay without KNX mapping
{
mqtt = 'water_leak[water_leak]',
mqtt_bind = 'relay[state]',
bind_conv = BOOL_TO_ONOFF,
},
}
return m2k_bindings
end)
package.preload['mqtt2knx.maplist'] = (function(...)
local maplist = {}
maplist.__index = maplist
function maplist.new()
return setmetatable({}, maplist)
end
function maplist:add(key, value)
local l = self[key]
if(not l) then
self[key] = { value }
else
l[#l+1] = value
end
end
function maplist:each_match(key, callback)
local l = self[key]
if(l) then
for _, v in ipairs(l) do
callback(v)
end
end
end
return maplist
end)
package.preload['mqtt2knx.converter'] = (function(...)
local query_regex = '^([%w_]+)(%[?([%w_]*)=?([%w_]*)%]?)'
local function parse_mqtt_query(mqtt_prefix, query)
local topic, _, prop, val = string.match(query, query_regex)
if(val == '') then val = nil end
return {
topic = mqtt_prefix..'/'..topic,
property = prop,
value = val
}
end
local function knx_value_selector(addr)
local dt = grp.find(addr).datatype
return function(event)
return knxdatatype.decode(event.datahex, dt)
end
end
local function knx_msg_converter(addr, fn_valueconv, force_overwrite)
return function(value)
value = fn_valueconv(value)
if(value ~= nil) then
return {
type = 'knx',
addr = addr,
value = value,
force_overwrite = force_overwrite
}
end
end
end
local function mqtt_value_selector(queryobj)
return function(msg)
local val = msg[queryobj.property]
-- property value filter, used in multi-state objects like button with states click=[single|double|long]
local vf = queryobj.value
if ((val ~= nil) and (vf == nil or vf == val)) then
return val
end
end
end
local function mqtt_msg_converer(queryobj, fn_valueconv)
return function(value)
value = fn_valueconv(value)
if(value ~= nil) then
return {
type = 'mqtt',
topic = queryobj.topic..'/set',
value = {
[queryobj.property] = value
}
}
end
end
end
local function create_converter(selector, msg_converter)
return function(msg)
local value = selector(msg)
if(value ~= nil) then
return msg_converter(value)
end
end
end
return {
parse_query = parse_mqtt_query,
knx_value = knx_value_selector,
knx_msg = knx_msg_converter,
mqtt_value = mqtt_value_selector,
mqtt_msg = mqtt_msg_converer,
create = create_converter,
}
end)
package.preload['mqtt2knx'] = (function(...)
local trans = require('mqtt2knx.transformers')
local converter = require('mqtt2knx.converter')
local maplist = require('mqtt2knx.maplist')
local bindings = require('mqtt2knx.bindings')
local m2k = {}
m2k.__index = m2k
local function normalize(mqtt_prefix, cfg)
local o = {
mqtt = converter.parse_query(mqtt_prefix, cfg.mqtt),
knx = cfg.knx,
to_knx = cfg.to_knx or trans.identity,
-- most mqtt endpoints are sensors which don't recieve value
to_mqtt = cfg.to_mqtt or trans.nosend,
force_overwrite = cfg.force_overwrite or false,
-- direct binding to another mqtt object
mqtt_bind = cfg.mqtt_bind and converter.parse_query(mqtt_prefix, cfg.mqtt_bind) or nil,
bind_conv = cfg.bind_conv or trans.identity
}
if(o.mqtt_bind and o.mqtt_bind.value ~= nil) then
-- when target selector have parameter value set, use it as const
o.bind_conv = trans.const(o.mqtt_bind.value)
end
return o
end
function m2k.create(mqtt_prefix)
local o = setmetatable({}, m2k)
o.list = maplist.new()
local list = o.list
for _, b in ipairs(bindings) do
local cfg = normalize(mqtt_prefix, b)
local mqtt_val = converter.mqtt_value(cfg.mqtt)
if(cfg.knx) then
local mqtt_msg = converter.mqtt_msg(cfg.mqtt, cfg.to_mqtt)
local knx_val = converter.knx_value(cfg.knx)
local knx_msg = converter.knx_msg(cfg.knx, cfg.to_knx, cfg.force_overwrite)
-- create back and fourth converters
list:add(cfg.mqtt.topic, converter.create(mqtt_val, knx_msg))
list:add(cfg.knx, converter.create(knx_val, mqtt_msg))
end
if(cfg.mqtt_bind) then
-- direct mqtt2mqtt bind
local bind_msg = converter.mqtt_msg(cfg.mqtt_bind, cfg.bind_conv)
list:add(cfg.mqtt.topic, converter.create(mqtt_val, bind_msg))
end
end
return o
end
function m2k:convert(key, msg)
local res = {}
self.list:each_match(key, function(fn_convert)
res[#res + 1] = fn_convert(msg)
end)
return res
end
function m2k:has_key(key)
return self.list[key] ~= nil
end
return m2k
end)
return require('mqtt2knx')
2) create resident script (timeout doesn't matter), update with your MQTT broker config and MQTT topic
Code:
local socket = require('socket')
-- create mapper for zigbee/device_name topics
local mapper = require('user.mqtt2knx').create('zigbee')
local JSON = require('json')
local eventloop = require('user.eventloop')
local mqtt_cfg = {
broker = 'IP',
login = 'USERNAME',
pwd = 'PASSWORD',
id = 'CLIENT_ID',
topics = 'zigbee/+',
}
local mqtt_client
function mqtt_onmessage(mid, topic, payload)
local json = JSON.pdecode(payload)
if(not json) then
log('mqtt payload is not a json', payload)
return
end
local messages = mapper:convert(topic, json)
send_messages(messages)
end
function localbus_onmessage(event)
if(event.sender == 'mq') then
return
end
local messages = mapper:convert(event.dst, event)
send_messages(messages)
end
function localbus_onread(event)
-- if it's registered mqtt object then send back cached response
if(mapper:has_key(event.dst)) then
local value = grp.getvalue(event.dst)
--log('reading knx (src/dst/val)', event.src, event.dst, value)
grp.response(event.dst, value, event.datatype)
end
end
function send_messages(list)
for _, m in ipairs(list) do
if(m.type == 'knx') then
if(m.force_overwrite or grp.getvalue(m.addr) ~= m.value) then
--log('sending to knx', m.addr, m.value)
grp.sender = 'mq'
grp.write(m.addr, m.value)
end
elseif(m.type == 'mqtt') then
--log('sending to mqtt', m.topic, m.value)
mqtt_client:publish(m.topic, JSON.encode(m.value))
end
end
end
function setup()
local loop = eventloop.get()
local mqtt = loop.create_mqtt(mqtt_cfg, mqtt_onmessage)
local bus = loop.create_localbus({ groupwrite = localbus_onmessage, groupread = localbus_onread})
loop:add(mqtt)
loop:add(bus)
mqtt_client = mqtt.client
end
if(not mqtt_client) then
setup()
end
log('starting mqtt2knx loop')
eventloop.get():run_loop()