Logging & Monitoring KNX activity - olivier - 10.04.2020
Hi,
just a feedback on haw I have implemented the monitoring of the knx network of my house
As it is also inspired from several thread of this forum, it make sense to make it
As I am not a LUA / LogicMachine expert, some part of the code can be improved for such, but it works...
First I have used "tags" feature of KNX object in order to define the type of logging I want on each object
- "tobelogged" for immediate logging (for use of light by example)
- "tobelogged-tempo5" for logging but with 5mn min between each measure point (for power usage by example, time delay can be adapted, duplicated, etc)
Both type of logging are based on "Event-based" scripts, they :
- do not log again the same value, I use the LM storage to store trace of last logging activity per object in order to optimize it
- store temporary "event information to be stored in a database" in the LM storage too (would be possible to used MQTT instead in the futur ?)
-> it is possible to add new / other type of "event logging fonction"
A third script scan items in the LM storage every 30s, send them to a database and finally delete them in the LM storage
Code: --*********************************
--
-- Event-based / "Event logger immediat" triggered by tag = "tobelogged" of KNX objects
--
--*********************************
require('json')
local obj = grp.find(event.dst)
-- log("EVENT LOGGER IMMEDIAT")
-- (measureOn, name, source, type, value)
var11 = os.date("%Y-%m-%d %H:%M:%S",obj.updatetime)
var12 = string.trim(tostring(obj.name))
var13 = string.trim(tostring(obj.address))
var14 = string.trim(tostring(obj.data))
var15 = string.trim(tostring(obj.tagcache))
trace1 = storage.get('TABLE_' .. var13, '')
if trace1 == '' then
--log("CREATING TRACE FOR " .. var13)
saveDataToLocalStorage('TABLE_', var11, var12, var13, var14, var15, obj.updatetime)
else
--log("UPDATING TRACE FOR : " .. var13)
if string.trim(tostring(trace1[2])) == string.trim(tostring(var14)) then
--log("NO LOG / NO VALUES CHANGE for : " .. var14)
else
--log("TRACE UPDATED FOR : " .. var13)
saveDataToLocalStorage('TABLE_', var11, var12, var13, var14, var15, obj.updatetime)
end
end
Code: --*********************************
--
-- Event-based / "Event logger tempo 5" triggered by tag = "tobelogged-tempo5" of KNX objects
--
--*********************************
require('json')
local obj = grp.find(event.dst)
--log("EVENT LOGGER TEMPO 5")
-- (measureOn, name, source, type, value)
local var21 = tostring(os.date("%Y-%m-%d %H:%M:%S",obj.updatetime))
local var22 = string.trim(tostring(obj.name))
local var23 = string.trim(tostring(obj.address))
local var24 = string.trim(tostring(obj.data))
local var25 = string.trim(tostring(obj.tagcache))
local trace2 = storage.get('TABLE_TEMPO_5_' .. var23, '')
if trace2 == '' then
saveDataToLocalStorage('TABLE_TEMPO_5_', var21, var22, var23, var24, var25, obj.updatetime)
else
if string.trim(tostring(trace2[2])) == string.trim(tostring(var24)) then
--log("NO LOG / NO VALUES CHANGE for : " .. var24)
else
local diff = math.abs(os.time() - tonumber(trace2[3]))
if diff > 300 then
saveDataToLocalStorage('TABLE_TEMPO_5_', var21, var22, var23, var24, var25, obj.updatetime)
else
--log("NO LOG / TOO EARLY FOR: " .. var23 .. " -> " .. tostring(diff) )
end
end
end
Code: --*********************************
--
-- user function / "saveDataToLocalStorage"
-- store event (knx or other source) to the LM storage
--
--*********************************
function saveDataToLocalStorage(traceName, var1, var2, var3, var4, var5, var6)
require('json')
local key = "LOG-" .. tostring(var1) .. "-" .. tostring(var3)
local data = {}
data["timestamp"] = tostring(var1)
data["name"] = tostring(var2)
data["source"] = tostring(var3)
data["value"] = tostring(var4)
data["tagcache"] = string.split(tostring(var5), ",")
storage.set(key, json.encode(data))
local newtrace = {}
newtrace[1] = var1
newtrace[2] = var4
newtrace[3] = var6
storage.set(traceName .. tostring(var3), newtrace)
--log("TRACE UPDATED FOR : " .. var3)
end
Code: --*********************************
--
-- resident script / triggered every 30s, scan LM storage, send LOG items to database if any and then delete them in the LM storge after
--
--*********************************
require('luasql.mysql')
require('json')
http = require('socket.http')
key_table = storage.all()
for key, value in next, key_table do
--log("looking at : " .. key)
tmp = string.find(key,"LOG-")
if tmp == nil then
--log("data DO NOT match : " .. key .. " - tmp : " .. tostring(tmp) )
else
if tmp >= 0 then
--log("data match : " .. key .. " - tmp : " .. tostring(tmp) )
local data_tab = json.decode(value)
local t = data_tab.tagcache
local series = ""
local series_name = ""
local tagss = ""
local i = 1
for k in next, t do
if i == 2 then
series = string.trim(tostring(t[k]))
end
if i == 3 then
series_name = string.trim(tostring(t[k]))
end
if i > 3 then
tagss = tagss .. string.trim(tostring(t[k])) .. ";"
end
i = i + 1
end
if not tagss == "" then
tagss = string.sub(tagss, 1 , string.len(tagss)-1 )
end
local t1 = string.trim(tostring(data_tab.timestamp))
local Y = tonumber(string.sub(t1, 1, 4))
local M = tonumber(string.sub(t1, 6, 7))
local D = tonumber(string.sub(t1, 9, 10))
local h = tonumber(string.sub(t1, 12, 13))
local m = tonumber(string.sub(t1, 15, 16))
local s = tonumber(string.sub(t1, 18, 19))
local t2 = os.time({year=Y, month=M, day=D, hour=h, min=m, sec=s, isdst = false})
-- txt is the data part of the http command to be send to an influxDb instance in order to store it
-- to prepare the migration i would like to do by the end of the year
local txt = string.trim(tostring(series)) .. ",tags=" .. string.trim(tostring(tagss)) .. ",source=" .. string.trim(tostring(data_tab.source)) .. " " .. series_name .. "=" .. string.trim(tostring(data_tab.value)) .. " " .. string.trim(tostring(t2) )
-- Store data into MariaDB / MySQL
errcur = nil
local DB_NAME = 'MyDBname'
local USERNAME = 'MyUSer'
local PASSWORD = 'MyPwd'
local MYSQL_SERVER_IP = '192.168.0.xxx'
local MYSQL_SERVER_PORT = '3307'
env = luasql.mysql()
dbcon, errcon = env:connect(DB_NAME, USERNAME, PASSWORD, MYSQL_SERVER_IP, MYSQL_SERVER_PORT)
cmd = 'INSERT INTO measures (measureOn, name, source, value, raw) VALUES (\"' .. string.trim(tostring(data_tab.timestamp)) .. '\", \"' .. string.trim(tostring(data_tab.name)) .. '\", \"' .. string.trim(tostring(data_tab.source)) .. '\", \"' .. string.trim(tostring(data_tab.value)) .. '\", \"' .. string.trim(tostring(txt)) .. '\")'
--log("SENDING CMD : " .. cmd)
cursor, errcurs = dbcon:execute ( cmd )
-- manage errors
if (tostring(errcurs) == "nil") then
--log("CMD done !")
storage.delete(key)
--log("KEY REMOVED : " .. key)
else
log("CMD error : " .. tostring(errcurs))
end
end
end
end
the block of code dealing with mysql can be replaced by the following one in order to send it to an InfluxDB instance instead
Code: -- Store data into influxDB
local INFLUX_SERVER_IP = '192.168.0.xx'
local INFLUX_SERVER_PORT = '8086'
local INFLUX_DB_NAME = 'MyDBname'
http.TIMEOUT = 5
res, errh = http.request("http://" .. INFLUX_SERVER_IP ..":" .. INFLUX_SERVER_PORT .. "/write?db=" .. INFLUX_DB_NAME .. "&precision=s", txt)
-- manage errors
if (tostring(errcurs) == "nil") then --and not errh == 204 ) then
--log("CMD done !")
storage.delete(key)
--log("KEY REMOVED : " .. key)
else
log("CMD error : " .. tostring(errcurs))
log("InfluxDB INSERT ERROR", res, errh)
end
MQTT server would be a good candidat to replace LM local storage, maybe a next improvement
the use of LM storage can be a problem : if you store too many item in it (in case of issue on you database or your network by example), your LM might be really really slow
-> this solution is not for production as is but only for personal / home use
hope it will be helpful for some of you
Best regards
RE: Logging & Monitoring KNX activity - admin - 14.04.2020
Some issues with your code:
1. Don't use var1 and similiar variable names. It makes the code hard to read and understand.
2. You should not use storage.all() as it's quite an expensive operation when there is a lot of data in the storage.
3. Your MySQL query does not properly escape input variables. For example, query will fail if object name contains double quotes.
For your task you can use list data type of the storage engine. It allows to create a queue system with multiple scripts adding data to queue and one script reading and removing data from it.
Add item to list:
Code: list_name = 'log_1'
data = '1234'
storage.exec('rpush', list_name, data)
Get last element of the list (or nil when list is empty):
Code: function get_last_list_item(key)
local items = storage.exec('lrange', key, -1, -1)
if type(items) == 'table' then
return items[ 1 ]
end
end
list_name = 'log_1'
last = get_last_list_item(list_name)
Read all entries from multiple lists and remove this data. This code read all storage lists which name starts with "log_".
Code: -- open single connection for better performance when multiple storage commands are used
storage.openconn()
-- find all matching keys
keys = storage.keys('log_*')
for _, key in ipairs(keys) do
-- get all elements from the list
data = storage.exec('lrange', key, 0, -1)
if type(data) == 'table' then
for _, item in ipairs(data) do
-- do something with data
end
-- remove read entries from the list
storage.exec('ltrim', key, #data, -1)
end
end
storage.closeconn()
|