test/mesh: decode segment acknowledgements

This commit is contained in:
Matthias Ringwald 2019-10-24 11:22:27 +02:00
parent a1146ab854
commit 3c061251b2

View File

@ -159,6 +159,9 @@ def read_net_16(data):
def read_net_24(data):
return data[0] << 16 | struct.unpack('>H', data[1:3])[0]
def read_net_32(data):
return struct.unpack('>I', data)[0]
# log engine - simple pretty printer
max_indent = 0
def log_pdu(pdu, indent = 0, hide_properties = []):
@ -303,6 +306,7 @@ class upper_transport_pdu(layer_pdu):
self.szmic = segment.szmic
self.seg_n = segment.seg_n
self.seq_zero = segment.seq_zero
self.direction = segment.direction
# TODO handle seq_zero overrun
self.seq_auth = segment.seq & 0xffffe000 | segment.seq_zero
self.add_property('seq_auth', self.seq_auth)
@ -320,6 +324,7 @@ class upper_transport_pdu(layer_pdu):
self.add_property('aid', self.aid)
self.add_property('akf', self.akf)
self.add_property('segment_len', self.segment_len)
self.add_property('dir', self.direction)
def add_segment(self, network_pdu):
self.origins.append(network_pdu)
@ -348,17 +353,19 @@ class upper_transport_pdu(layer_pdu):
class access_pdu(layer_pdu):
def __init__(self, lower_pdu, data):
super().__init__('Access', b'')
self.src = lower_pdu.src
self.dst = lower_pdu.dst
self.akf = lower_pdu.akf
self.aid = lower_pdu.aid
self.seq_auth = lower_pdu.seq_auth
self.data = data
self.src = lower_pdu.src
self.dst = lower_pdu.dst
self.akf = lower_pdu.akf
self.aid = lower_pdu.aid
self.seq_auth = lower_pdu.seq_auth
self.data = data
self.direction = lower_pdu.direction
self.add_property('src', self.src)
self.add_property('dst', self.dst)
self.add_property('akf', self.akf)
self.add_property('aid', self.aid)
self.add_property('seq_auth', self.seq_auth)
self.add_property('dir', self.direction)
upper_bits = data[0] >> 6
if upper_bits == 0 or upper_bits == 1:
@ -376,15 +383,20 @@ class access_pdu(layer_pdu):
if self.opcode in access_messages:
self.summary = access_messages[self.opcode]
def segmented_message_tag(src, direction):
tag = str(src) + direction
return tag
def segmented_message_for_pdu(pdu):
if pdu.src in segmented_messages:
seg_message = segmented_messages[pdu.src]
tag = segmented_message_tag(pdu.src, pdu.direction)
if tag in segmented_messages:
seg_message = segmented_messages[tag]
# check seq zero
if pdu.seq_zero == seg_message.seq_zero:
return seg_message
# print("new segmented message: src %04x, seq_zero %04x" % (pdu.src, pdu.seq_zero))
seg_message = upper_transport_pdu(pdu)
segmented_messages[pdu.src] = seg_message
segmented_messages[tag] = seg_message
return seg_message
def mesh_set_iv_index(iv_index):
@ -438,21 +450,42 @@ def mesh_upper_transport_decrypt(message, data):
trans_mic_len = 4
ciphertext = data[:-trans_mic_len]
trans_mic = data[-trans_mic_len:]
nonce = mesh_device_nonce(message)
decrypted = None
if message.akf:
nonce = mesh_application_nonce(message)
# print( as_hex(nonce))
for key in mesh_application_keys_for_aid(message.aid):
decrypted = aes_ccm_decrypt(key.appkey, nonce, ciphertext, b'', trans_mic_len, trans_mic)
if decrypted != None:
break
else:
nonce = mesh_device_nonce(message)
decrypted = aes_ccm_decrypt(devkey, nonce, ciphertext, b'', trans_mic_len, trans_mic)
return decrypted
def mesh_process_control(control_pdu):
# TODO decode control message
# TODO add Seg Ack to sender access message origins
log_pdu(control_pdu, 0, [])
control_pdu.opcode = control_pdu.data[0]
control_pdu.add_property('opcode', control_pdu.opcode)
control_pdu.obo = (control_pdu.data[1] & 0x80) >> 7
control_pdu.add_property('obo', control_pdu.obo)
temp_12 = read_net_16(control_pdu.data[1:3])
control_pdu.seq_zero = (temp_12 >> 2) & 0x1fff
control_pdu.add_property('seq_zero', control_pdu.seq_zero)
control_pdu.block_ack = read_net_32(control_pdu.data[3:7])
control_pdu.add_property('block_ack', control_pdu.block_ack)
# try to add it to access message
inverse_direction = 'RX'
if control_pdu.direction == 'RX':
inverse_direcgtion = 'TX'
tag = segmented_message_tag(control_pdu.dst, inverse_direction)
if tag in segmented_messages:
seg_message = segmented_messages[tag]
seg_message.origins.append(control_pdu)
else:
log_pdu(control_pdu, 0, [])
def mesh_process_access(access_pdu):
log_pdu(access_pdu, 0, [])
@ -472,6 +505,8 @@ def mesh_process_network_pdu_tx(network_pdu_encrypted):
# decrypted network pdu
network_pdu_decrypted = network_pdu(network_pdu_decrypted_data)
network_pdu_decrypted.direction = network_pdu_encrypted.direction
network_pdu_decrypted.add_property('dir', network_pdu_decrypted.direction)
network_pdu_decrypted.origins.append(network_pdu_encrypted)
# print("network pdu (enc)" + network_pdu_encrypted.data.hex())
@ -479,6 +514,8 @@ def mesh_process_network_pdu_tx(network_pdu_encrypted):
# lower transport - reassemble
lower_transport = lower_transport_pdu(network_pdu_decrypted)
lower_transport.direction = network_pdu_decrypted.direction
lower_transport.add_property('dir', lower_transport.direction)
lower_transport.origins.append(network_pdu_decrypted)
if lower_transport.seg:
@ -499,6 +536,7 @@ def mesh_process_network_pdu_tx(network_pdu_encrypted):
log_pdu(message, 0, [])
else:
access = access_pdu(message, access_payload)
access.direction = message.direction
access.origins.append(message)
mesh_process_access(access)
@ -506,6 +544,13 @@ def mesh_process_network_pdu_tx(network_pdu_encrypted):
# print("lower_transport.ctl = " + str(lower_transport.ctl))
if lower_transport.ctl:
control = layer_pdu('Unsegmented Control', lower_transport.data)
control.direction = lower_transport.direction
control.seq = lower_transport.seq
control.src = lower_transport.src
control.dst = lower_transport.dst
control.add_property('seq', lower_transport.seq)
control.add_property('src', lower_transport.src)
control.add_property('dst', lower_transport.dst)
control.origins.append(lower_transport)
mesh_process_control(control)
else:
@ -516,6 +561,7 @@ def mesh_process_network_pdu_tx(network_pdu_encrypted):
else:
access = access_pdu(lower_transport, access_payload)
access.add_property('seq_auth', lower_transport.seq)
access.direction = lower_transport.direction
access.origins.append(lower_transport)
mesh_process_access(access)
@ -530,6 +576,8 @@ def mesh_process_adv(adv_pdu):
network_pdu_encrypted = layer_pdu("Network(encrypted)", adv_data[2:2+ad_len])
network_pdu_encrypted.add_property('ivi', adv_data[2] >> 7)
network_pdu_encrypted.add_property('nid', adv_data[2] & 0x7f)
network_pdu_encrypted.add_property('dir', adv_pdu.direction)
network_pdu_encrypted.direction = adv_pdu.direction
network_pdu_encrypted.origins.append(adv_pdu)
mesh_process_network_pdu_tx(network_pdu_encrypted)
if ad_type == 0x2b:
@ -573,6 +621,8 @@ with open (infile, 'rb') as fin:
continue
adv_data = packet[4:]
adv_pdu = layer_pdu("ADV(TX)", adv_data)
adv_pdu.add_property('dir', 'TX')
adv_pdu.direction = 'TX'
mesh_process_adv(adv_pdu)
elif type == 1:
@ -585,6 +635,8 @@ with open (infile, 'rb') as fin:
continue
adv_data = packet[13:-1]
adv_pdu = layer_pdu("ADV(RX)", adv_data)
adv_pdu.add_property('dir', 'RX')
adv_pdu.direction = 'RX'
mesh_process_adv(adv_pdu)
elif type == 0xfc: