btstack/tool/dump_gatt.py
2022-01-15 23:43:11 +01:00

266 lines
7.3 KiB
Python
Executable File

#!/usr/bin/env python3
# BlueKitchen GmbH (c) 2022
# parse PacketLogger and reconstruct GATT DB
import sys
import datetime
import struct
def as_hex(data):
str_list = []
for byte in data:
str_list.append("{0:02x} ".format(byte))
return ''.join(str_list)
def as_bd_addr(data):
str_list = []
for byte in data:
str_list.append("{0:02x}".format(byte))
return ':'.join(str_list)
def read_header(f):
bytes_read = f.read(13)
if bytes_read:
return struct.unpack(">IIIB", bytes_read)
else:
return (-1, 0, 0, 0)
def uuid16_at_offset(data, offset):
return "%04x" % struct.unpack_from("<H", data, offset)[0]
def uuid128_at_offset(data, offset):
uuid128 = bytes(reversed(data[offset:offset+16]))
return uuid128[0:4].hex() + "-" + uuid128[4:6].hex() + "-" + uuid128[6:8].hex() + "-" + uuid128[8:10].hex() + "-" + uuid128[10:].hex()
def handle_at_offset(data, offset):
return struct.unpack_from("<H", data, offset)[0];
def bd_addr_at_offset(data, offset):
peer_addr = reversed(data[8:8 + 6])
return as_bd_addr(peer_addr)
class gatt_characteristic:
def __init__(self, uuid, properties, characteristic_handle, value_handle):
self.uuid = uuid
self.properties = properties
self.characteristic_handle = characteristic_handle
self.value_handle = value_handle
def report(self, prefix):
print("%sUUID %-36s, Handle %04x, Properties %02x, Value Handle %04x" % (prefix, self.uuid, self.characteristic_handle, self.properties, self.value_handle))
class gatt_service:
def __init__(self, uuid, start_handle, end_handle):
self.uuid = uuid
self.start_handle = start_handle
self.end_handle = end_handle
self.characteristics = []
def report(self, prefix):
print("%sUUID: %-36s, Start Handle %04x, End Handle %04x" % (prefix, self.uuid, self.start_handle, self.end_handle))
print(" %sCharacteristics:" % prefix)
for characteristic in self.characteristics:
characteristic.report(" " + prefix)
class gatt_server:
primary_services = []
client_opcode = 0
group_type = 0
read_type = 0
mtu = 23
def __init__(self, bd_addr):
self.bd_addr = bd_addr
def service_for_handle(self, handle):
for service in self.primary_services:
if service.start_handle <= handle and handle <= service.end_handle:
return service
return None
def handle_pdu(self, pdu):
opcode = pdu[0]
if opcode == 0x01:
pass
elif opcode == 0x02:
# exchange mtu
pass
elif opcode == 0x03:
# exchange mtu
self.mtu = struct.unpack_from("<H", pdu, 1)[0]
elif opcode == 0x08:
# read by type request
if len(pdu) == 7:
(_,_,self.read_type) = struct.unpack_from("<HHH", pdu, 1)
elif opcode == 0x09:
# read by type response
if self.read_type == 0x2803:
item_len = pdu[1]
pos = 2
while pos < len(pdu):
(characteristic_handle, properties, value_handle) = struct.unpack_from("<HBH", pdu, pos)
if item_len == 11:
uuid = uuid16_at_offset(pdu, pos + 5)
elif item_len == 21:
uuid = uuid128_at_offset(pdu, pos + 5)
service = self.service_for_handle(characteristic_handle)
if service:
service.characteristics.append(gatt_characteristic(uuid, properties, characteristic_handle, value_handle))
pos += item_len
elif opcode == 0x10:
# read by group type request
if len(pdu) == 7:
(_,_,self.group_type) = struct.unpack_from("<HHH", pdu, 1)
elif opcode == 0x11:
# read by group type response
item_len = pdu[1]
pos = 2
while pos < len(pdu):
(start, end) = struct.unpack_from("<HH", pdu, pos)
if self.group_type == 0x2800:
# primary service
if item_len == 6:
uuid = uuid16_at_offset(pdu, pos+4)
elif item_len == 20:
uuid = uuid128_at_offset(pdu, pos+4)
self.primary_services.append(gatt_service(uuid, start, end))
pos += item_len
else:
# print(self.bd_addr, "ATT PDU:", as_hex(pdu))
pass
def report(self):
print("GATT Server on", self.bd_addr)
print("- MTU", self.mtu)
print("- Primary Services:")
for service in self.primary_services:
service.report(" - ")
class l2cap_reassembler:
payload_data = bytes()
payload_len = 0
channel = 0;
def handle_acl(self, pb, data):
if pb in [0, 2]:
(self.payload_len, self.channel) = struct.unpack("<HH", data[0:4])
self.payload_data = data[4:]
if pb == 0x01:
self.payload_data += data[4:]
def l2cap_complete(self):
return len(self.payload_data) == self.payload_len
def l2cap_packet(self):
return (self.channel, self.payload_data)
class hci_connection:
l2cap_in = l2cap_reassembler()
l2cap_out = l2cap_reassembler()
def __init__(self, bd_addr, con_handle):
self.bd_addr = bd_addr
self.con_handle = con_handle;
self.remote_gatt_server = gatt_server(bd_addr)
def handle_att_pdu(self, direction_in, pdu):
opcode = pdu[0]
remote_server = ((opcode & 1) == 1) == direction_in
if (remote_server):
self.remote_gatt_server.handle_pdu(pdu)
else:
local_gatt_server.handle_pdu(pdu)
def handle_acl(self, direction_in, pb, data):
if direction_in:
self.l2cap_in.handle_acl(pb, data)
if self.l2cap_in.l2cap_complete():
(channel, l2cap_data) = self.l2cap_in.l2cap_packet()
if channel == 0x004:
self.handle_att_pdu(direction_in, l2cap_data)
else:
self.l2cap_out.handle_acl(pb, data)
if self.l2cap_out.l2cap_complete():
(channel, l2cap_data) = self.l2cap_out.l2cap_packet()
if channel == 0x004:
self.handle_att_pdu(direction_in, l2cap_data)
def connection_for_handle(con_handle):
if con_handle in connections:
return connections[con_handle]
else:
return None
def handle_cmd(packet):
pass
def handle_evt(event):
if event[0] == 0x05:
# Disconnection Complete
con_handle = handle_at_offset(event, 3)
print("Disconnection Complete: handle 0x%04x" % con_handle)
connection = connections.pop(con_handle, None)
connection.remote_gatt_server.report()
if event[0] == 0x3e:
if event[2] == 0x01:
# LE Connection Complete
con_handle = handle_at_offset(event, 4);
peer_addr = bd_addr_at_offset(event, 8)
connection = hci_connection(peer_addr, con_handle)
connections[con_handle] = connection
print("LE Connection Complete: %s handle 0x%04x" % (peer_addr, con_handle))
def handle_acl(data, direction_in):
(header, hci_len) = struct.unpack("<HH", data[0:4])
pb = (header >> 12) & 0x03
con_handle = header & 0x0FFF
connection_for_handle(con_handle).handle_acl(direction_in, pb, data[4:])
# globals
connections = {}
local_gatt_server = gatt_server("00:00:00:00:00:00")
if len(sys.argv) == 1:
print ('Reconstruct GATT interactions from PacketLogger trace file')
print ('Copyright 2022, BlueKitchen GmbH')
print ('')
print ('Usage: ', sys.argv[0], 'hci_dump.pklg')
exit(0)
infile = sys.argv[1]
with open (infile, 'rb') as fin:
pos = 0
try:
while True:
(entry_len, ts_sec, ts_usec, type) = read_header(fin)
if entry_len < 0:
break
packet_len = entry_len - 9;
if (packet_len > 66000):
print ("Error parsing pklg at offset %u (%x)." % (pos, pos))
break
packet = fin.read(packet_len)
pos = pos + 4 + entry_len
if type == 0x00:
handle_cmd(packet)
elif type == 0x01:
handle_evt(packet)
elif type == 0x02:
handle_acl(packet, False)
elif type == 0x03:
handle_acl(packet, True)
except TypeError as e:
print(e)
print ("Error parsing pklg at offset %u (%x)." % (pos, pos))
for connection in connections:
connection.remote_gatt_server.report()