test/mesh: fix parsing received advs, create new segmented message on new seq_zero

This commit is contained in:
Matthias Ringwald 2019-10-22 19:11:35 +02:00
parent 32ee5dd707
commit eed276207a

View File

@ -162,7 +162,7 @@ class lower_transport_pdu(layer_pdu):
self.akf = self.data[0] & 0x40 == 0x040
self.add_property('akf', self.akf)
if self.seg:
if self.ctl:
if not self.ctl:
self.szmic = self.data[1] & 0x80 == 0x80
self.add_property('szmic', self.szmic)
temp_12 = struct.unpack('>H', self.data[1:3])[0]
@ -176,10 +176,11 @@ class lower_transport_pdu(layer_pdu):
self.segment = self.data[4:]
self.add_property('segment', self.segment)
else:
self.seq_auth = self.seq
self.upper_transport = self.data[1:]
self.add_property('upper_transport', self.upper_transport)
class uppert_transport_pdu(layer_pdu):
class upper_transport_pdu(layer_pdu):
def __init__(self, segment):
if segment.ctl:
super().__init__('Segmented Control', b'')
@ -208,6 +209,8 @@ class uppert_transport_pdu(layer_pdu):
self.add_property('src', self.src)
self.add_property('dst', self.dst)
self.add_property('aid', self.aid)
self.add_property('akf', self.akf)
self.add_property('segment_len', self.segment_len)
def add_segment(self, network_pdu):
@ -241,19 +244,23 @@ class access_pdu(layer_pdu):
self.dst = lower_pdu.dst
self.akf = lower_pdu.akf
self.aid = lower_pdu.aid
self.seq_auth = lower_pdu.seq_auth
self.data = data
self.add_property('src', self.src)
self.add_property('dst', self.dst)
self.add_property('akf', self.akf)
self.add_property('aid', self.aid)
self.add_property('seq_auth', self.seq_auth)
def segmented_message_for_pdu(pdu):
if pdu.src in segmented_messages:
seg_message = segmented_messages[pdu.src]
# check seq zero
else:
seg_message = uppert_transport_pdu(pdu)
segmented_messages[pdu.src] = seg_message
if pdu.seq_zero == seg_message.seq_zero:
return seg_message
# print("new segmented message: src %04x, seq_zero %04x" % (pdu.src, pdu.seq_zero))
seg_message = upper_transport_pdu(pdu)
segmented_messages[pdu.src] = seg_message
return seg_message
def mesh_set_iv_index(iv_index):
@ -323,7 +330,7 @@ def mesh_process_control(control_pdu):
# TODO add Seg Ack to sender access message origins
log_pdu(control_pdu, 0, [])
def mesh_proess_access(access_pdu):
def mesh_process_access(access_pdu):
log_pdu(access_pdu, 0, [])
def mesh_process_network_pdu_tx(network_pdu_encrypted):
@ -343,6 +350,9 @@ def mesh_process_network_pdu_tx(network_pdu_encrypted):
network_pdu_decrypted = network_pdu(network_pdu_decrypted_data)
network_pdu_decrypted.origins.append(network_pdu_encrypted)
# print("network pdu (enc)" + network_pdu_encrypted.data.hex())
# print("network pdu (dec)" + network_pdu_decrypted_data.hex())
# lower transport - reassemble
lower_transport = lower_transport_pdu(network_pdu_decrypted)
lower_transport.origins.append(network_pdu_decrypted)
@ -366,12 +376,13 @@ def mesh_process_network_pdu_tx(network_pdu_encrypted):
else:
access = access_pdu(message, access_payload)
access.origins.append(message)
mesh_proess_access(access)
mesh_process_access(access)
else:
# print("lower_transport.ctl = " + str(lower_transport.ctl))
if lower_transport.ctl:
control = layer_pdu('Unsegmented Control', lower_transport.data)
control.origins.add(lower_transport)
control.origins.append(lower_transport)
mesh_process_control(control)
else:
access_payload = mesh_upper_transport_decrypt(lower_transport, lower_transport.upper_transport)
@ -382,22 +393,23 @@ def mesh_process_network_pdu_tx(network_pdu_encrypted):
access = access_pdu(lower_transport, access_payload)
access.add_property('seq_auth', lower_transport.seq)
access.origins.append(lower_transport)
mesh_proess_access(access)
mesh_process_access(access)
def mesh_process_beacon_pdu(adv_pdu):
log_pdu(adv_pdu, 0, [])
def mesh_process_adv(adv_pdu):
ad_len = adv_pdu.data[0] - 1
ad_type = adv_pdu.data[1]
if ad_type == 0x2A:
network_pdu_encrypted = layer_pdu("Network(encrypted)", adv_data[2:])
network_pdu_encrypted = layer_pdu("Network(encrypted)", adv_data[2:2+ad_len])
network_pdu_encrypted.add_property('ivi', adv_data[2] >> 7)
network_pdu_encrypted.add_property('nid', adv_data[2] & 0x7f)
network_pdu_encrypted.origins.append(adv_pdu)
mesh_process_network_pdu_tx(network_pdu_encrypted)
if ad_type == 0x2b:
beacon_pdu = layer_pdu("Beacon", adv_data[2:])
beacon_pdu = layer_pdu("Beacon", adv_data[2:2+ad_len])
beacon_pdu.origins.append(adv_pdu)
mesh_process_beacon_pdu(beacon_pdu)