fix concurrent mass storage test conflict, use pyfatfs to access disk dev by usb id instead of mounted in /media/

This commit is contained in:
hathach 2024-08-13 13:16:46 +07:00
parent 45f50ebaa8
commit 61725a5263
No known key found for this signature in database
GPG Key ID: 26FAB84F615C3C52

View File

@ -33,8 +33,8 @@ import serial
import subprocess
import json
import glob
import platform
from multiprocessing import Pool
import fs
ENUM_TIMEOUT = 30
@ -53,9 +53,8 @@ def get_serial_dev(id, vendor_str, product_str, ifnum):
return port_list[0]
# Currently not used, left as reference
# get usb disk by id
def get_disk_dev(id, vendor_str, lun):
# get usb disk by id
return f'/dev/disk/by-id/usb-{vendor_str}_Mass_Storage_{id}-0:{lun}'
@ -72,34 +71,34 @@ def open_serial_dev(port):
# slight delay since kernel may occupy the port briefly
time.sleep(0.5)
timeout = timeout - 0.5
ser = serial.Serial(port, timeout=1)
ser = serial.Serial(port, timeout=2)
break
except serial.SerialException:
pass
time.sleep(0.5)
timeout = timeout - 0.5
assert timeout, 'Device not available or Cannot open port'
assert timeout, f'Cannot open port f{port}' if os.path.exists(port) else f'Port {port} not existed'
return ser
def read_disk_file(id, fname):
# on different self-hosted, the mount point is different
file_list = [
f'/media/blkUSB_{id[-8:]}.02/{fname}',
f'/media/{os.getenv("USER")}/TinyUSB MSC/{fname}'
]
def read_disk_file(uid, lun, fname):
# open_fs("fat://{dev}) require 'pip install pyfatfs'
dev = get_disk_dev(uid, 'TinyUSB', lun)
timeout = ENUM_TIMEOUT
while timeout:
for file in file_list:
if os.path.isfile(file):
with open(file, 'rb') as f:
data = f.read()
return data
if os.path.exists(dev):
fat = fs.open_fs(f'fat://{dev}')
try:
assert fat.exists(fname), f'File {fname} not found in {dev}'
with fat.open(fname, 'rb') as f:
return f.read()
finally:
fat.close()
time.sleep(1)
timeout = timeout - 1
assert timeout, 'Device not available'
assert timeout, f'Storage {dev} not existed'
return None
@ -238,7 +237,7 @@ def test_cdc_msc(board):
assert ser.read(100) == str, 'CDC wrong data'
# Block test
data = read_disk_file(uid, 'README.TXT')
data = read_disk_file(uid,0,'README.TXT')
readme = \
b"This is tinyusb's MassStorage Class demo.\r\n\r\n\
If you find any bugs or get any questions, feel free to file an\r\n\
@ -397,6 +396,7 @@ def test_board(board):
return err_count
def main():
"""
Hardware test on specified boards