17.04.2020, 03:55
(14.04.2020, 07:57)admin Wrote: Here's Python code converted to Lua:Thanks @admin
I have it working now
![Smile Smile](https://forum.logicmachine.net/images/smilies/smile.png)
To get a pin to display on the tv call function request_pin_code()
once you have the pin displayed on the tv then call function authorise_pin_code('0080') with the pin from the tv, that completes the pairing process, the tv will show message "pairing process complete"
Then you can call send encrypted commands with function, send_key('NRC_POWER-ONOFF') etc
Code:
encdec = require('encdec')
aes = require('user.aes')
require('json')
ip = '192.168.1.100'
name = 'My Remote'
URL_CONTROL_NRC = 'nrc/control_0'
URL_CONTROL_DMR = 'dmr/control_0'
URL_CONTROL_NRC_DEF = 'nrc/sdd_0.xml'
URN_RENDERING_CONTROL = 'schemas-upnp-org:service:RenderingControl:1'
URN_REMOTE_CONTROL = 'panasonic-com:service:p00NetworkControl:1'
function request(host, url, urn, action, args)
local body, http, ltn12, sink, res, err
ltn12 = require('ltn12')
http = require('socket.http')
body = [[<?xml version="1.0" encoding="utf-8"?>
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body><u:]] .. action .. [[ xmlns:u="urn:]] .. urn .. [[">]] .. args .. [[</u:]] .. action .. [[></s:Body>
</s:Envelope>]]
sink = {}
res, err = http.request({
url = 'http://' .. host .. ':55000/' .. url,
method = 'POST',
headers = {
['soapaction'] = '"urn:' .. urn .. '#' .. action .. '"',
['Content-Type'] = 'text/xml; charset=utf-8',
['Content-Length'] = #body,
},
sink = ltn12.sink.table(sink),
source = ltn12.source.string(body),
})
if sink then
-- log(table.concat(sink))
return table.concat(sink)
else
return nil, err
end
end
function encrypt_soap_payload(data, key, hmac_key, iv)
payload = '000000000000'
n = #data
payload = payload .. string.char(bit.band(bit.rshift(n, 24), 0xFF))
payload = payload .. string.char(bit.band(bit.rshift(n, 16), 0xFF))
payload = payload .. string.char(bit.band(bit.rshift(n, 8), 0xFF))
payload = payload .. string.char(bit.band(n, 0xFF))
payload = payload .. data
aes_cbc, err = aes:new(key, nil, aes.cipher(128, 'cbc'), { iv = iv }, nil, 1)
ciphertext = aes_cbc:encrypt(payload)
sig = encdec.hmacsha256(ciphertext, hmac_key, true)
encrypted_payload = encdec.base64enc(ciphertext .. sig)
return encrypted_payload
end
function decrypt_soap_payload(data, key, hmac_key, iv)
aes_cbc, err = aes:new(key, nil, aes.cipher(128, 'cbc'), { iv = iv }, nil, 0)
decrypted = aes_cbc:decrypt(encdec.base64dec(data))
decrypted = string.gsub(string.sub(lmcore.strtohex(decrypted), 33), '%x%x', function(value) return string.char(tonumber(value, 16)) end)
return decrypted
end
function get_session_keys(enc_key)
iv = encdec.base64dec(enc_key)
iv_vals = { iv:byte(1, -1) }
key_vals = {}
for i = 1, 16, 4 do
key_vals[ i ] = iv_vals[ i + 2]
key_vals[ i + 1 ] = iv_vals[ i + 3]
key_vals[ i + 2 ] = iv_vals[ i ]
key_vals[ i + 3 ] = iv_vals[ i+ 1]
end
sesh_key = string.char(unpack(key_vals))
sesh_hmac_key = iv..iv
sesh_iv = iv
return sesh_key, sesh_hmac_key, sesh_iv
end
function request_session_id(app_id, sesh_key, sesh_hmac_key, sesh_iv)
encinfo = encrypt_soap_payload('<X_ApplicationId>' .. app_id .. '</X_ApplicationId>',sesh_key, sesh_hmac_key, sesh_iv)
params = ('<X_ApplicationId>'..app_id..'</X_ApplicationId>'.. '<X_EncInfo>'..encinfo..'</X_EncInfo>' )
sesh_id = request(ip, URL_CONTROL_NRC, URN_REMOTE_CONTROL, 'X_GetEncryptSessionId', params)
return sesh_id
end
function request_pin_code()
pairing_res = request(ip, URL_CONTROL_NRC, URN_REMOTE_CONTROL, 'X_DisplayPinCode', '<X_DeviceName>' .. name .. '</X_DeviceName>')
if pairing_res then
challenge_Key = string.match(pairing_res,'<X_ChallengeKey>(.*)</X_ChallengeKey>')
iv = encdec.base64dec(challenge_Key)
storage.set('IV', iv)
end
end
function authorise_pin_code(pincode)
iv = storage.get('IV')
iv_vals = { iv:byte(1, -1) }
key_vals = {}
for i = 1, 16, 4 do
key_vals[ i ] = bit.band(bit.bnot(iv_vals[ i + 3 ]), 0xFF)
key_vals[ i + 1 ] = bit.band(bit.bnot(iv_vals[ i + 2 ]), 0xFF)
key_vals[ i + 2 ] = bit.band(bit.bnot(iv_vals[ i + 1 ]), 0xFF)
key_vals[ i + 3 ] = bit.band(bit.bnot(iv_vals[ i ]), 0xFF)
end
key = string.char(unpack(key_vals))
hmac_key_mask = lmcore.hextostr('15C95AC2B08AA7EB4E228F811E34D04FA54BA7DCAC9879FA8ACDA3FC244F3854', true)
hmac_key_mask_vals = { hmac_key_mask:byte(1, -1) }
hmac_vals = {}
for i = 1, 32, 4 do
hmac_vals[ i ] = bit.bxor(hmac_key_mask_vals[ i ], iv_vals[ bit.band(i + 1, 0xF) + 1 ])
hmac_vals[ i + 1 ] = bit.bxor(hmac_key_mask_vals[ i + 1 ], iv_vals[ bit.band(i + 2, 0xF) + 1 ])
hmac_vals[ i + 2 ] = bit.bxor(hmac_key_mask_vals[ i + 2 ], iv_vals[ bit.band(i - 1, 0xF) + 1 ])
hmac_vals[ i + 3 ] = bit.bxor(hmac_key_mask_vals[ i + 3 ], iv_vals[ bit.band(i, 0xF) + 1 ])
end
hmac_key = string.char(unpack(hmac_vals))
params = '<X_AuthInfo>' .. encrypt_soap_payload("<X_PinCode>" .. pincode .. "</X_PinCode>", key, hmac_key, iv) .. '</X_AuthInfo>'
authorise_res = request(ip, URL_CONTROL_NRC, URN_REMOTE_CONTROL, 'X_RequestAuth', params)
auth_res = string.match(authorise_res,'<X_AuthResult>(.*)</X_AuthResult>')
decrypted = decrypt_soap_payload(auth_res, key, hmac_key, iv)
app_id = string.match(decrypted,'<X_ApplicationId>(.*)</X_ApplicationId>')
enc_key = string.match(decrypted,'<X_Keyword>(.*)</X_Keyword>')
-- get AES and Hmac keys from x_keyword
sesh_key, sesh_hmac_key, sesh_iv = get_session_keys(enc_key)
-- request session key to be able to send encrypted commands
sesh_id_res = request_session_id(app_id, sesh_key, sesh_hmac_key, sesh_iv)
enc_result = string.match(sesh_id_res,'<X_EncResult>(.*)</X_EncResult>')
enc_result = decrypt_soap_payload(enc_result, sesh_key, sesh_hmac_key, sesh_iv)
--Set session ID and begin sequence number at 1. We have to increment the sequence number upon each successful NRC command.
sesh_id = string.match(enc_result,'<X_SessionId>(.*)</X_SessionId>')
sesh_seq_num = 1
session_keys = json.encode({
SESSION_ID = sesh_id,
SESSION_SEQUENCE_NUMBER = sesh_seq_num,
SESSION_IV = sesh_iv,
SESSION_HMAC_KEY = sesh_hmac_key,
SESSION_KEY = sesh_key,
APP_ID = app_id,
KEY = key,
HMAC_KEY = hmac_key,
IV = iv
})
storage.set('session_keys', session_keys)
return sesh_id, sesh_seq_num, sesh_iv
end
function send_enc_cmd(action, urn, params)
x = storage.get('session_keys')
S_K =json.decode(x)
-- modify sequence number to 8 bits
seq_num = string.format("%08d", S_K.SESSION_SEQUENCE_NUMBER)
encrypted_command = [[<X_SessionId>]] .. S_K.SESSION_ID .. [[</X_SessionId>]]..
[[<X_SequenceNumber>]] .. seq_num ..[[</X_SequenceNumber>]]..
[[<X_OriginalCommand>]] .. [[<u:]].. action .. [[ xmlns:u="urn:]] .. urn .. [[">]] .. params .. [[</u:]] .. action .. [[>]] .. [[</X_OriginalCommand>]]
enc_cmd = encrypt_soap_payload(encrypted_command, S_K.SESSION_KEY, S_K.SESSION_HMAC_KEY, S_K.SESSION_IV)
params = '<X_ApplicationId>'..S_K.APP_ID..'</X_ApplicationId>'.. '<X_EncInfo>'.. enc_cmd..'</X_EncInfo>'
send_enc_cmd_res = request(ip, URL_CONTROL_NRC, URN_REMOTE_CONTROL, 'X_EncryptedCommand', params)
res_err = string.match(send_enc_cmd_res,'<s:Fault>')
-- if send_enc_cmd_res is sucessfull then increment sequence number
if not res_err then
S_K.SESSION_SEQUENCE_NUMBER = S_K.SESSION_SEQUENCE_NUMBER + 1
session_keys = json.encode({
SESSION_ID = S_K.SESSION_ID,
SESSION_SEQUENCE_NUMBER = S_K.SESSION_SEQUENCE_NUMBER, -- increment session sequence number each time command is sucessfull
SESSION_IV = S_K.SESSION_IV,
SESSION_HMAC_KEY = S_K.SESSION_HMAC_KEY,
SESSION_KEY = S_K.SESSION_KEY,
APP_ID = S_K.APP_ID,
KEY = S_K.KEY,
HMAC_KEY = S_K.HMAC_KEY,
IV =S_K.IV
})
storage.set('session_keys', session_keys)
end
return params
end
function send_key(key)
params = send_enc_cmd('X_SendKey', URN_REMOTE_CONTROL, '<X_KeyEvent>'..key..'</X_KeyEvent>')
end
function get_mute()
req_res = request(ip, URL_CONTROL_DMR, URN_RENDERING_CONTROL, 'GetMute', '<InstanceID>0</InstanceID><Channel>Master</Channel>')
mute_level = string.match(req_res,'<CurrentMute>(.*)</CurrentMute>')
return mute_level
end
function get_volume()
req_res = request(ip, URL_CONTROL_DMR, URN_RENDERING_CONTROL, 'GetVolume', '<InstanceID>0</InstanceID><Channel>Master</Channel>')
vol_level = string.match(req_res,'<CurrentVolume>(.*)</CurrentVolume>')
return vol_level
end
function set_mute(state)
request(ip, URL_CONTROL_DMR, URN_RENDERING_CONTROL, 'SetMute', '<InstanceID>0</InstanceID><Channel>Master</Channel><DesiredMute>'.. state .. '</DesiredMute>')
end
function set_volume(level)
request(ip, URL_CONTROL_DMR, URN_RENDERING_CONTROL, 'SetVolume', '<InstanceID>0</InstanceID><Channel>Master</Channel><DesiredVolume>'.. level .. '</DesiredVolume>')
end