This forum uses cookies
This forum makes use of cookies to store your login information if you are registered, and your last visit if you are not. Cookies are small text documents stored on your computer; the cookies set by this forum can only be used on this website and pose no security risk. Cookies on this forum also track the specific topics you have read and when you last read them. Please confirm whether you accept or reject these cookies being set.

MQTT - KNX integration with direct bindings
#1
Based on several examples from this forum of working with MQTT, i've decided i need more universal configuration-based binding to integrate my Zigbee devices into KNX network through zigbee2mqtt bridge. With script below it is also possible to bind mqtt endpoints together directly without need to write event scripts.

You also need eventloop script in your user library. Feel free to use/modify any code

1) Create user script "user.mqtt2knx"  below and modify your MQTT bindings using supplied examples. You'll need to restart resident script each time you modify bindings.

Code:
package.preload['mqtt2knx.transformers'] = (function(...)
    local transformers = {
      onoff_to_bool = function(val)
        val = val:lower()
        return val == "on" and true or false
      end,

      bool_to_onoff = function(val)
        return val and "on" or "off"
      end,

      const = function(n)
        return function() return n end
      end,

      nosend = function() return nil end,

      identity = function(x) return x end,
      }
   
    return transformers
  end)

package.preload['mqtt2knx.bindings'] = (function(...)
    local trans = require('mqtt2knx.transformers')
    local ONOFF_TO_BOOL = trans.onoff_to_bool
    local BOOL_TO_ONOFF = trans.bool_to_onoff
    local CONST = trans.const
    local NOSEND = trans.nosend
    local IDENTITY = trans.identity

    -- examples of MQTT bindings (in this case with zigbee2mqtt)
    local m2k_bindings = {
      -- relay on/off bi-directional binding via "state" attribute
      {
        mqtt = 'relay[state]',
        knx = '1/1/1',
        to_knx = ONOFF_TO_BOOL,
        to_mqtt = BOOL_TO_ONOFF
      },
     
      -- Simple sensor one-directional binding
      {
        mqtt = 'door[contact]',
        knx = '1/1/1',
      },
      -- Motion sensor with force override
      -- KNX force overwrite needed if object is sending same message on activation
      {
        mqtt = 'motion[occupancy]',
        knx = '1/1/1',
        force_overwrite = true,
      },
     
      -- button click example, button can send click=single|double|long values
      {
        mqtt = 'button[click=single]',
        knx = '1/1/1',
        to_knx = CONST(true),
        force_overwrite = true
      },

      -- sync with KNX and at the same time directly toggle MQTT relay on button click
      {
        mqtt = 'button[click=double]',
        knx = '1/1/1',
        to_knx = CONST(true),
        force_overwrite = true,
        -- send constant value directly to another MQTT device
        mqtt_bind = 'relay[state=toggle]'
      }, 
     
      -- direct binding of water sensor with water protection relay without KNX mapping
      {
        mqtt = 'water_leak[water_leak]',
        mqtt_bind = 'relay[state]',
        bind_conv = BOOL_TO_ONOFF,
      },
    }

    return m2k_bindings
  end)

package.preload['mqtt2knx.maplist'] = (function(...)
    local maplist = {}
    maplist.__index = maplist
   
    function maplist.new()
      return setmetatable({}, maplist)
    end
   
    function maplist:add(key, value)
      local l = self[key]
      if(not l) then
        self[key] = { value }
      else
        l[#l+1] = value
      end
    end
   
    function maplist:each_match(key, callback)
      local l = self[key]
      if(l) then
        for _, v in ipairs(l) do
          callback(v)
        end
      end
    end
   
    return maplist
  end)

package.preload['mqtt2knx.converter'] = (function(...)
    local query_regex = '^([%w_]+)(%[?([%w_]*)=?([%w_]*)%]?)'

    local function parse_mqtt_query(mqtt_prefix, query)
      local topic, _, prop, val = string.match(query, query_regex)
      if(val == '') then val = nil end

      return {
          topic = mqtt_prefix..'/'..topic,
          property = prop,
          value = val
        }
    end

    local function knx_value_selector(addr)
      local dt = grp.find(addr).datatype
 
      return function(event)
        return knxdatatype.decode(event.datahex, dt)
      end     
    end

    local function knx_msg_converter(addr, fn_valueconv, force_overwrite)
      return function(value)
        value = fn_valueconv(value)
        if(value ~= nil) then
          return {
            type = 'knx',
            addr = addr,
            value = value,
            force_overwrite = force_overwrite
          }
        end
      end
    end

    local function mqtt_value_selector(queryobj)
      return function(msg)
        local val = msg[queryobj.property]
          -- property value filter, used in multi-state objects like button with states click=[single|double|long]
        local vf = queryobj.value

        if ((val ~= nil) and (vf == nil or vf == val)) then
          return val
        end
      end
    end

    local function mqtt_msg_converer(queryobj, fn_valueconv)
      return function(value)
        value = fn_valueconv(value)
        if(value ~= nil) then
          return {
            type = 'mqtt',
            topic = queryobj.topic..'/set',
            value = {
              [queryobj.property] = value
            }
          }
        end
      end
    end

    local function create_converter(selector, msg_converter)
      return function(msg)
        local value = selector(msg)
        if(value ~= nil) then
          return msg_converter(value)
        end
      end
    end

    return {
      parse_query = parse_mqtt_query,
      knx_value = knx_value_selector,
      knx_msg = knx_msg_converter,
      mqtt_value = mqtt_value_selector,
      mqtt_msg = mqtt_msg_converer,
      create = create_converter,
    }

  end)


package.preload['mqtt2knx'] = (function(...)
    local trans = require('mqtt2knx.transformers')
    local converter = require('mqtt2knx.converter')
    local maplist = require('mqtt2knx.maplist')
    local bindings = require('mqtt2knx.bindings')

    local m2k = {}
    m2k.__index = m2k

    local function normalize(mqtt_prefix, cfg)
      local o = {
        mqtt = converter.parse_query(mqtt_prefix, cfg.mqtt),
        knx = cfg.knx,
        to_knx = cfg.to_knx or trans.identity,
        -- most mqtt endpoints are sensors which don't recieve value
        to_mqtt = cfg.to_mqtt or trans.nosend,
        force_overwrite = cfg.force_overwrite or false,
        -- direct binding to another mqtt object
        mqtt_bind = cfg.mqtt_bind and converter.parse_query(mqtt_prefix, cfg.mqtt_bind) or nil,
        bind_conv = cfg.bind_conv or trans.identity
      }
      if(o.mqtt_bind and o.mqtt_bind.value ~= nil) then
        -- when target selector have parameter value set, use it as const
        o.bind_conv = trans.const(o.mqtt_bind.value)
      end

      return o
    end


    function m2k.create(mqtt_prefix)
      local o = setmetatable({}, m2k)
      o.list = maplist.new()

      local list = o.list

      for _, b in ipairs(bindings) do
        local cfg = normalize(mqtt_prefix, b)
       
        local mqtt_val = converter.mqtt_value(cfg.mqtt)
        if(cfg.knx) then
          local mqtt_msg = converter.mqtt_msg(cfg.mqtt, cfg.to_mqtt)
          local knx_val = converter.knx_value(cfg.knx)
          local knx_msg = converter.knx_msg(cfg.knx, cfg.to_knx, cfg.force_overwrite)
         
          -- create back and fourth converters
          list:add(cfg.mqtt.topic, converter.create(mqtt_val, knx_msg))
          list:add(cfg.knx, converter.create(knx_val, mqtt_msg))
        end

        if(cfg.mqtt_bind) then
          -- direct mqtt2mqtt bind
          local bind_msg = converter.mqtt_msg(cfg.mqtt_bind, cfg.bind_conv)
          list:add(cfg.mqtt.topic, converter.create(mqtt_val, bind_msg))
        end
      end

      return o
    end

    function m2k:convert(key, msg)
      local res = {}
      self.list:each_match(key, function(fn_convert)
        res[#res + 1] = fn_convert(msg)
      end)
      return res
    end

    function m2k:has_key(key)
      return self.list[key] ~= nil
    end

    return m2k
  end)

return require('mqtt2knx')


2) create resident script (timeout doesn't matter), update with your MQTT broker config and MQTT topic


Code:
local socket = require('socket')
-- create mapper for zigbee/device_name topics
local mapper = require('user.mqtt2knx').create('zigbee')
local JSON = require('json')
local eventloop = require('user.eventloop')

local mqtt_cfg = {
  broker = 'IP',
  login = 'USERNAME',
  pwd = 'PASSWORD',
  id = 'CLIENT_ID',
  topics = 'zigbee/+',
  }
local mqtt_client


function mqtt_onmessage(mid, topic, payload)
  local json = JSON.pdecode(payload)
  if(not json) then
    log('mqtt payload is not a json', payload)
    return
  end
 
  local messages = mapper:convert(topic, json)
  send_messages(messages)
end

function localbus_onmessage(event)
  if(event.sender == 'mq') then
    return
  end

  local messages = mapper:convert(event.dst, event)
  send_messages(messages)
end

function localbus_onread(event)
  -- if it's registered mqtt object then send back cached response
  if(mapper:has_key(event.dst)) then
  local value = grp.getvalue(event.dst)
    --log('reading knx (src/dst/val)', event.src, event.dst, value)
    grp.response(event.dst, value, event.datatype)
  end
end

function send_messages(list)
  for _, m in ipairs(list) do
    if(m.type == 'knx') then
      if(m.force_overwrite or grp.getvalue(m.addr) ~= m.value) then
        --log('sending to knx', m.addr, m.value)
        grp.sender = 'mq'
        grp.write(m.addr, m.value)
      end
    elseif(m.type == 'mqtt') then
      --log('sending to mqtt', m.topic, m.value)
      mqtt_client:publish(m.topic, JSON.encode(m.value))
    end
  end
end

function setup()
  local loop = eventloop.get()
 
  local mqtt = loop.create_mqtt(mqtt_cfg, mqtt_onmessage)
  local bus = loop.create_localbus({ groupwrite = localbus_onmessage, groupread = localbus_onread})
 
  loop:add(mqtt)
  loop:add(bus)
 
  mqtt_client = mqtt.client
end



if(not mqtt_client) then
  setup()
end
log('starting mqtt2knx loop')
eventloop.get():run_loop()
Reply
#2
Thank you!! Smile
Reply


Forum Jump: