daemon/binding/python: poc

This commit is contained in:
Matthias Ringwald 2018-11-10 22:16:56 +01:00
parent b85d692e71
commit 0c4cc577b2
4 changed files with 132 additions and 0 deletions

View File

@ -0,0 +1 @@
__pycache__

View File

@ -0,0 +1,69 @@
#!/usr/bin/env python3
import socket
import struct
BTSTACK_SERVER_HOST = "localhost"
BTSTACK_SERVER_TCP_PORT = 13333
# temp
OGF_BTSTACK = 0x3d;
BTSTACK_SET_POWER_MODE = 0x02
# utils
def print_hex(data):
print(" ".join("{:02x}".format(c) for c in data))
def opcode(ogf, ocf):
return ocf | (ogf << 10)
# CommandBuilder - will be auto-generated later
class CommandBuilder(object):
def __init__(self):
pass
class BTstackClient(CommandBuilder):
#
btstack_server_socket = None
#
packet_handler = None
def __init__(self):
pass
def connect(self):
global BTSTACK_SERVER_TCP_PORT
global BTSTACK_SERVER_HOST
print("[+] Connect to server on port %u" % BTSTACK_SERVER_TCP_PORT)
self.btstack_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.btstack_server_socket.connect((BTSTACK_SERVER_HOST, BTSTACK_SERVER_TCP_PORT))
# TODO: handle connection failure
def register_packet_handler(self, callback):
print("[+] Register packet handler")
self.packet_handler = callback
def send_hci_command(self, command):
packet_type = 1
channel = 0
length = len(command)
header = struct.pack("<HHH", packet_type, channel, length)
self.btstack_server_socket.sendall(header + command)
def btstack_set_power_mode(self, on):
# - send: BTstackPowerUp
cmd = struct.pack("<HBB", opcode(OGF_BTSTACK, BTSTACK_SET_POWER_MODE) ,1, 1);
self.send_hci_command(cmd)
def run(self):
print("[+] Run")
while True:
# receive packet header: packet type, channel, len
header = self.btstack_server_socket.recv(6)
(packet_type, channel, length) = struct.unpack("<HHH", header)
print_hex(header)
payload = self.btstack_server_socket.recv(length)
print_hex(payload)

View File

@ -0,0 +1,37 @@
#!/usr/bin/env python3
from threading import Thread
from ctypes import *
# global variables
btstack_server_library = None
class btstackServerThread (Thread):
def run(self):
global btstack_server_library
print ("[+] BTstack Server thread started")
btstack_server_library.btstack_server_run_tcp()
print ("[+] BTstack Server thread quit")
class BTstackServer(object):
def __init__(self):
pass
def load(self, path = None):
global btstack_server_library
if path == None:
path = "../../../../port/daemon/src/"
print("[+] Load BTstack Server from %s" % path)
path += "libBTstackServer.dylib"
# TODO: check OS (mac,linux,windows)
# TODO: construct path to load library from
btstack_server_library = cdll.LoadLibrary("../../../../port/daemon/src/libBTstackServer.dylib")
def set_storage_path(self, path):
global btstack_server_library
print("[+] Set storage path to %s" % path)
btstack_server_library.btstack_server_set_storage_path("/tmp")
def run_tcp(self):
print("[+] Run TCP server")
btstackServerThread(name="BTstackServerThread").start()

View File

@ -0,0 +1,25 @@
#!/usr/bin/env python3
from btstack import btstack_server, btstack_client
def packet_handler(packet):
print("received packet")
print(packet)
# Conrtrol for BTstack Server
btstack_server = btstack_server.BTstackServer()
# start BTstack Server from .dll
btstack_server.load()
# btstack_server.set_storage_path("/tmp")
btstack_server.run_tcp()
# Client for BTstack Server
btstack_client = btstack_client.BTstackClient()
# connect to slient, register for HCI packets and power up
btstack_client.connect()
btstack_client.register_packet_handler(packet_handler)
btstack_client.btstack_set_power_mode(1)
btstack_client.run()