This forum uses cookies
This forum makes use of cookies to store your login information if you are registered, and your last visit if you are not. Cookies are small text documents stored on your computer; the cookies set by this forum can only be used on this website and pose no security risk. Cookies on this forum also track the specific topics you have read and when you last read them. Please confirm that you accept these cookies being set.

Logging & Monitoring KNX activity
#1
Hi,

just a feedback on haw I have implemented the monitoring of the knx network of my house
As it is also inspired from several thread of this forum, it make sense to make it
As I am not a LUA / LogicMachine expert, some part of the code can be improved for such, but it works...

First I have used "tags" feature of KNX object in order to define the type of logging I want on each object
- "tobelogged" for immediate logging (for use of light by example)
- "tobelogged-tempo5" for logging but with 5mn min between each measure point (for power usage by example, time delay can be adapted, duplicated, etc)

Both type of logging are based on "Event-based" scripts, they :
- do not log again the same value, I use the LM storage to store trace of last logging activity per object in order to optimize it
- store temporary "event information to be stored in a database" in the LM storage too (would be possible to used MQTT instead in the futur ?)
-> it is possible to add new / other type of "event logging fonction"

A third script scan items in the LM storage every 30s, send them to a database and finally delete them in the LM storage 

Code:
--*********************************
--
--  Event-based / "Event logger immediat" triggered by tag = "tobelogged" of KNX objects
--
--*********************************

require('json')

local obj = grp.find(event.dst)
-- log("EVENT LOGGER IMMEDIAT")

-- (measureOn, name, source, type, value)
var11 = os.date("%Y-%m-%d %H:%M:%S",obj.updatetime)
var12 = string.trim(tostring(obj.name))
var13 = string.trim(tostring(obj.address))
var14 = string.trim(tostring(obj.data))
var15 = string.trim(tostring(obj.tagcache))


trace1 = storage.get('TABLE_' .. var13, '')

if trace1 == '' then
 
  --log("CREATING TRACE FOR " .. var13)
  saveDataToLocalStorage('TABLE_', var11, var12, var13, var14, var15, obj.updatetime)
 
else
 
  --log("UPDATING TRACE FOR : " .. var13)
 
  if string.trim(tostring(trace1[2])) == string.trim(tostring(var14)) then
   
    --log("NO LOG / NO VALUES CHANGE for : " .. var14)
   
  else
   
    --log("TRACE UPDATED FOR : " .. var13)
    saveDataToLocalStorage('TABLE_', var11, var12, var13, var14, var15, obj.updatetime)
 
  end

end


Code:
--*********************************
--
--  Event-based / "Event logger tempo 5" triggered by tag = "tobelogged-tempo5" of KNX objects
--
--*********************************

require('json')

local obj = grp.find(event.dst)
--log("EVENT LOGGER TEMPO 5")

-- (measureOn, name, source, type, value)
local var21 = tostring(os.date("%Y-%m-%d %H:%M:%S",obj.updatetime))
local var22 = string.trim(tostring(obj.name))
local var23 = string.trim(tostring(obj.address))
local var24 = string.trim(tostring(obj.data))
local var25 = string.trim(tostring(obj.tagcache))

local trace2 = storage.get('TABLE_TEMPO_5_' .. var23, '')

if trace2 == '' then
 
  saveDataToLocalStorage('TABLE_TEMPO_5_', var21, var22, var23, var24, var25, obj.updatetime)
 
else
 
  if string.trim(tostring(trace2[2])) == string.trim(tostring(var24)) then
   
    --log("NO LOG / NO VALUES CHANGE for : " .. var24)
   
  else
   
    local diff = math.abs(os.time() - tonumber(trace2[3]))
   
    if diff > 300 then
     
      saveDataToLocalStorage('TABLE_TEMPO_5_', var21, var22, var23, var24, var25, obj.updatetime)
     
    else
     
      --log("NO LOG / TOO EARLY FOR: " .. var23 .. " -> " .. tostring(diff) )
     
    end
   
  end
 
end


Code:
--*********************************
--
--  user function / "saveDataToLocalStorage"
--  store event (knx or other source) to the LM storage
--
--*********************************

function saveDataToLocalStorage(traceName, var1, var2, var3, var4, var5, var6)     

  require('json')
 
  local key = "LOG-" .. tostring(var1) .. "-" .. tostring(var3)
  local data = {}
  data["timestamp"] = tostring(var1)
  data["name"] = tostring(var2)
  data["source"] = tostring(var3)
  data["value"] = tostring(var4)
  data["tagcache"] = string.split(tostring(var5), ",")
  storage.set(key, json.encode(data))
  local newtrace = {}
  newtrace[1] = var1
  newtrace[2] = var4
  newtrace[3] = var6
  storage.set(traceName .. tostring(var3), newtrace)
  --log("TRACE UPDATED FOR : " .. var3)

end

Code:
--*********************************
--
--  resident script / triggered every 30s, scan LM storage, send LOG items to database if any and then delete them in the LM storge after
--
--*********************************

require('luasql.mysql')
require('json')
http = require('socket.http')

key_table = storage.all()

for key, value in next, key_table do
  --log("looking at : " .. key)
  tmp = string.find(key,"LOG-")
  if tmp == nil then
    --log("data DO NOT match : " .. key .. " - tmp : " .. tostring(tmp) )
  else
    if tmp >= 0 then
      --log("data match : " .. key .. " - tmp : " .. tostring(tmp) )
      local data_tab = json.decode(value)   
      local t = data_tab.tagcache
     
      local series = ""
      local series_name = ""
      local tagss = ""
      local i = 1
      for k in next, t do
        if i == 2 then
          series = string.trim(tostring(t[k]))
        end
        if i == 3 then
          series_name = string.trim(tostring(t[k]))
        end
        if i > 3 then
          tagss = tagss .. string.trim(tostring(t[k])) .. ";"
        end
        i = i + 1
      end
      if not tagss == "" then
          tagss = string.sub(tagss, 1 , string.len(tagss)-1 )
      end

      local t1 = string.trim(tostring(data_tab.timestamp))
      local Y =  tonumber(string.sub(t1, 1, 4))
      local M =  tonumber(string.sub(t1, 6, 7))
      local D =  tonumber(string.sub(t1, 9, 10))
      local h =  tonumber(string.sub(t1, 12, 13))
      local m =  tonumber(string.sub(t1, 15, 16))
      local s =  tonumber(string.sub(t1, 18, 19))   
      local t2 = os.time({year=Y, month=M, day=D, hour=h, min=m, sec=s, isdst = false})
      -- txt is the data part of the http command to be send to an influxDb instance in order to store it
      -- to prepare the migration i would like to do by the end of the year
      local txt = string.trim(tostring(series)) .. ",tags=" .. string.trim(tostring(tagss)) .. ",source=" .. string.trim(tostring(data_tab.source)) .. " " .. series_name .. "=" .. string.trim(tostring(data_tab.value)) .. " " .. string.trim(tostring(t2) )
     
      -- Store data into MariaDB / MySQL
     
      errcur = nil
      local DB_NAME = 'MyDBname'
      local USERNAME = 'MyUSer'
      local PASSWORD = 'MyPwd'
      local MYSQL_SERVER_IP = '192.168.0.xxx'
      local MYSQL_SERVER_PORT = '3307'
      env = luasql.mysql()
      dbcon, errcon = env:connect(DB_NAME, USERNAME, PASSWORD, MYSQL_SERVER_IP, MYSQL_SERVER_PORT)
      cmd = 'INSERT INTO measures (measureOn, name, source, value, raw) VALUES (\"' .. string.trim(tostring(data_tab.timestamp)) .. '\", \"' .. string.trim(tostring(data_tab.name)) .. '\", \"' .. string.trim(tostring(data_tab.source)) .. '\", \"' .. string.trim(tostring(data_tab.value)) .. '\", \"' .. string.trim(tostring(txt)) .. '\")'
      --log("SENDING CMD : " .. cmd)
      cursor, errcurs = dbcon:execute ( cmd )
      -- manage errors
      if (tostring(errcurs) == "nil") then
        --log("CMD done !")
        storage.delete(key)
        --log("KEY REMOVED : " .. key)
      else
        log("CMD error : " .. tostring(errcurs))
      end
     
    end
  end
end

the block of code dealing with mysql can be replaced by the following one in order to send it to an InfluxDB instance instead
Code:
      -- Store data into influxDB

      local INFLUX_SERVER_IP = '192.168.0.xx'
      local INFLUX_SERVER_PORT = '8086'
      local INFLUX_DB_NAME = 'MyDBname'
      http.TIMEOUT = 5
      res, errh = http.request("http://" .. INFLUX_SERVER_IP ..":" .. INFLUX_SERVER_PORT .. "/write?db=" .. INFLUX_DB_NAME .. "&precision=s", txt)
     
      -- manage errors
      if (tostring(errcurs) == "nil") then --and not errh == 204 ) then
        --log("CMD done !")
        storage.delete(key)
        --log("KEY REMOVED : " .. key)
      else
        log("CMD error : " .. tostring(errcurs))
        log("InfluxDB INSERT ERROR", res, errh)
      end


MQTT server would be a good candidat to replace LM local storage, maybe a next improvement
the use of LM storage can be a problem : if you store too many item in it (in case of issue on you database or your network by example), your LM might be really really slow
-> this solution is not for production as is but only for personal / home use

hope it will be helpful for some of you

Best regards
Reply
#2
Some issues with your code:
1. Don't use var1 and similiar variable names. It makes the code hard to read and understand.
2. You should not use storage.all() as it's quite an expensive operation when there is a lot of data in the storage.
3. Your MySQL query does not properly escape input variables. For example, query will fail if object name contains double quotes.

For your task you can use list data type of the storage engine. It allows to create a queue system with multiple scripts adding data to queue and one script reading and removing data from it.

Add item to list:
Code:
list_name = 'log_1'
data = '1234'
storage.exec('rpush', list_name, data)

Get last element of the list (or nil when list is empty):
Code:
function get_last_list_item(key)
  local items = storage.exec('lrange', key, -1, -1)
  if type(items) == 'table' then
    return items[ 1 ]
  end
end

list_name = 'log_1'
last = get_last_list_item(list_name)

Read all entries from multiple lists and remove this data. This code read all storage lists which name starts with "log_".
Code:
-- open single connection for better performance when multiple storage commands are used
storage.openconn()

-- find all matching keys
keys = storage.keys('log_*')

for _, key in ipairs(keys) do
  -- get all elements from the list
  data = storage.exec('lrange', key, 0, -1)

  if type(data) == 'table' then
    for _, item in ipairs(data) do
      -- do something with data
    end

    -- remove read entries from the list
    storage.exec('ltrim', key, #data, -1)
  end
end

storage.closeconn()
Reply


Forum Jump: