mirror of
https://github.com/bluekitchen/btstack.git
synced 2025-04-16 08:42:28 +00:00
add test files for sbc encoder and decoder, fix encoder and decoder
This commit is contained in:
parent
ba114a9841
commit
5f21c38a78
BIN
test/sbc/fanfare-4sb-decoded.wav
Normal file
BIN
test/sbc/fanfare-4sb-decoded.wav
Normal file
Binary file not shown.
BIN
test/sbc/fanfare-8sb-decoded.wav
Normal file
BIN
test/sbc/fanfare-8sb-decoded.wav
Normal file
Binary file not shown.
132
test/sbc/sbc_decoder_test.py
Executable file
132
test/sbc/sbc_decoder_test.py
Executable file
@ -0,0 +1,132 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
import numpy as np
|
||||||
|
import wave
|
||||||
|
import struct
|
||||||
|
import sys
|
||||||
|
from sbc import *
|
||||||
|
from sbc_encoder import *
|
||||||
|
from sbc_decoder import *
|
||||||
|
|
||||||
|
error = 0.99
|
||||||
|
max_error = -1
|
||||||
|
|
||||||
|
def sbc_compare_pcm(frame_count, actual_frame, expected_frame):
|
||||||
|
global error, max_error
|
||||||
|
|
||||||
|
M = mse(actual_frame.pcm, expected_frame.pcm)
|
||||||
|
if M > max_error:
|
||||||
|
max_error = M
|
||||||
|
|
||||||
|
if M > error:
|
||||||
|
print "pcm error (%d, %d ) " % (frame_count, M)
|
||||||
|
return -1
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def sbc_compare_headers(frame_count, actual_frame, expected_frame):
|
||||||
|
if actual_frame.sampling_frequency != expected_frame.sampling_frequency:
|
||||||
|
print "sampling_frequency wrong ", actual_frame.sampling_frequency
|
||||||
|
return -1
|
||||||
|
|
||||||
|
if actual_frame.nr_blocks != expected_frame.nr_blocks:
|
||||||
|
print "nr_blocks wrong ", actual_frame.nr_blocks
|
||||||
|
return -1
|
||||||
|
|
||||||
|
if actual_frame.channel_mode != expected_frame.channel_mode:
|
||||||
|
print "channel_mode wrong ", actual_frame.channel_mode
|
||||||
|
return -1
|
||||||
|
|
||||||
|
if actual_frame.nr_channels != expected_frame.nr_channels:
|
||||||
|
print "nr_channels wrong ", actual_frame.nr_channels
|
||||||
|
return -1
|
||||||
|
|
||||||
|
if actual_frame.allocation_method != expected_frame.allocation_method:
|
||||||
|
print "allocation_method wrong ", actual_frame.allocation_method
|
||||||
|
return -1
|
||||||
|
|
||||||
|
if actual_frame.nr_subbands != expected_frame.nr_subbands:
|
||||||
|
print "nr_subbands wrong ", actual_frame.nr_subbands
|
||||||
|
return -1
|
||||||
|
|
||||||
|
if actual_frame.bitpool != expected_frame.bitpool:
|
||||||
|
print "bitpool wrong (E: %d, D: %d)" % (actual_frame.bitpool, expected_frame.bitpool)
|
||||||
|
return -1
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def get_actual_frame(fin):
|
||||||
|
actual_frame = SBCFrame()
|
||||||
|
sbc_unpack_frame(fin, actual_frame)
|
||||||
|
sbc_reconstruct_subband_samples(actual_frame)
|
||||||
|
sbc_synthesis(actual_frame)
|
||||||
|
return actual_frame
|
||||||
|
|
||||||
|
def get_expected_frame(fin_expected, nr_blocks, nr_subbands, nr_channels, sampling_frequency, bitpool):
|
||||||
|
expected_frame = SBCFrame(nr_blocks, nr_subbands, nr_channels, sampling_frequency, bitpool)
|
||||||
|
fetch_samples_for_next_sbc_frame(fin_expected, expected_frame)
|
||||||
|
return expected_frame
|
||||||
|
|
||||||
|
usage = '''
|
||||||
|
Usage: ./sbc_decoder_test.py decoder_input.sbc decoder_expected_output.wav
|
||||||
|
Example: ./sbc_decoder_test.py fanfare-4sb.sbc fanfare-4sb-decoded.wav
|
||||||
|
'''
|
||||||
|
|
||||||
|
if (len(sys.argv) < 3):
|
||||||
|
print(usage)
|
||||||
|
sys.exit(1)
|
||||||
|
try:
|
||||||
|
decoder_input_sbc = sys.argv[1]
|
||||||
|
decoder_expected_wav = sys.argv[2]
|
||||||
|
|
||||||
|
if not decoder_input_sbc.endswith('.sbc'):
|
||||||
|
print(usage)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if not decoder_expected_wav.endswith('.wav'):
|
||||||
|
print(usage)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
fin_expected = wave.open(decoder_expected_wav, 'rb')
|
||||||
|
nr_channels, sampwidth, sampling_frequency, nr_audio_frames, comptype, compname = fin_expected.getparams()
|
||||||
|
|
||||||
|
# print nr_channels, sampwidth, sampling_frequency, nr_audio_frames, comptype, compname
|
||||||
|
|
||||||
|
with open(decoder_input_sbc, 'rb') as fin:
|
||||||
|
try:
|
||||||
|
subband_frame_count = 0
|
||||||
|
while True:
|
||||||
|
if subband_frame_count % 200 == 0:
|
||||||
|
print "== Frame %d ==" % (subband_frame_count)
|
||||||
|
|
||||||
|
actual_frame = get_actual_frame(fin)
|
||||||
|
|
||||||
|
expected_frame = get_expected_frame(fin_expected, actual_frame.nr_blocks,
|
||||||
|
actual_frame.nr_subbands, nr_channels,
|
||||||
|
actual_frame.bitpool, sampling_frequency)
|
||||||
|
|
||||||
|
|
||||||
|
err = sbc_compare_headers(subband_frame_count, actual_frame, expected_frame)
|
||||||
|
if err < 0:
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
err = sbc_compare_pcm(subband_frame_count, actual_frame, expected_frame)
|
||||||
|
if err < 0:
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
subband_frame_count += 1
|
||||||
|
|
||||||
|
except TypeError:
|
||||||
|
fin_expected.close()
|
||||||
|
fin.close()
|
||||||
|
print "DONE, max MSE PCM error %d", max_error
|
||||||
|
exit(0)
|
||||||
|
|
||||||
|
except IOError as e:
|
||||||
|
print(usage)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
155
test/sbc/sbc_encoder_test.py
Executable file
155
test/sbc/sbc_encoder_test.py
Executable file
@ -0,0 +1,155 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
import numpy as np
|
||||||
|
import wave
|
||||||
|
import struct
|
||||||
|
import sys
|
||||||
|
from sbc import *
|
||||||
|
from sbc_encoder import *
|
||||||
|
from sbc_decoder import *
|
||||||
|
|
||||||
|
error = 0.99
|
||||||
|
max_error = -1
|
||||||
|
|
||||||
|
def sbc_compare_audio_frames(frame_count, actual_frame, expected_frame):
|
||||||
|
global error, max_error
|
||||||
|
|
||||||
|
M = mse(actual_frame.audio_sample, expected_frame.audio_sample)
|
||||||
|
if M > max_error:
|
||||||
|
max_error = M
|
||||||
|
|
||||||
|
if M > error:
|
||||||
|
print "audio_sample error (%d, %d ) " % (frame_count, M)
|
||||||
|
return -1
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def sbc_compare_headers(frame_count, actual_frame, expected_frame):
|
||||||
|
if actual_frame.syncword != expected_frame.syncword:
|
||||||
|
print "syncword wrong ", actual_frame.syncword
|
||||||
|
return -1
|
||||||
|
|
||||||
|
if actual_frame.sampling_frequency != expected_frame.sampling_frequency:
|
||||||
|
print "sampling_frequency wrong ", actual_frame.sampling_frequency
|
||||||
|
return -1
|
||||||
|
|
||||||
|
if actual_frame.nr_blocks != expected_frame.nr_blocks:
|
||||||
|
print "nr_blocks wrong ", actual_frame.nr_blocks
|
||||||
|
return -1
|
||||||
|
|
||||||
|
if actual_frame.channel_mode != expected_frame.channel_mode:
|
||||||
|
print "channel_mode wrong ", actual_frame.channel_mode
|
||||||
|
return -1
|
||||||
|
|
||||||
|
if actual_frame.nr_channels != expected_frame.nr_channels:
|
||||||
|
print "nr_channels wrong ", actual_frame.nr_channels
|
||||||
|
return -1
|
||||||
|
|
||||||
|
if actual_frame.allocation_method != expected_frame.allocation_method:
|
||||||
|
print "allocation_method wrong ", actual_frame.allocation_method
|
||||||
|
return -1
|
||||||
|
|
||||||
|
if actual_frame.nr_subbands != expected_frame.nr_subbands:
|
||||||
|
print "nr_subbands wrong ", actual_frame.nr_subbands
|
||||||
|
return -1
|
||||||
|
|
||||||
|
if actual_frame.bitpool != expected_frame.bitpool:
|
||||||
|
print "bitpool wrong (E: %d, D: %d)" % (actual_frame.bitpool, expected_frame.bitpool)
|
||||||
|
return -1
|
||||||
|
|
||||||
|
if mse(actual_frame.join, expected_frame.join) > 0:
|
||||||
|
print "join error \nE:\n %s \nD:\n %s" % (actual_frame.join, expected_frame.join)
|
||||||
|
return -1
|
||||||
|
|
||||||
|
if mse(actual_frame.scale_factor, expected_frame.scale_factor) > 0:
|
||||||
|
print "scale_factor error \nE:\n %s \nD:\n %s" % (actual_frame.scale_factor, expected_frame.scale_factor)
|
||||||
|
return -1
|
||||||
|
|
||||||
|
if mse(actual_frame.scalefactor, expected_frame.scalefactor) > 0:
|
||||||
|
print "scalefactor error \nE:\n %s \nD:\n %s" % (actual_frame.scalefactor, expected_frame.scalefactor)
|
||||||
|
return -1
|
||||||
|
|
||||||
|
if mse(actual_frame.bits, expected_frame.bits) > 0:
|
||||||
|
print "bits error \nE:\n %s \nD:\n %s" % (actual_frame.bits, expected_frame.bits)
|
||||||
|
return -1
|
||||||
|
|
||||||
|
if actual_frame.crc_check != expected_frame.crc_check:
|
||||||
|
print "crc_check wrong (E: %d, D: %d)" % (actual_frame.crc_check, expected_frame.crc_check)
|
||||||
|
return -1
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def get_actual_frame(fin, nr_blocks, nr_subbands, nr_channels, sampling_frequency, bitpool):
|
||||||
|
actual_frame = SBCFrame(nr_blocks, nr_subbands, nr_channels, sampling_frequency, bitpool)
|
||||||
|
fetch_samples_for_next_sbc_frame(fin, actual_frame)
|
||||||
|
sbc_encode(actual_frame)
|
||||||
|
return actual_frame
|
||||||
|
|
||||||
|
def get_expected_frame(fin_expected):
|
||||||
|
expected_frame = SBCFrame()
|
||||||
|
sbc_unpack_frame(fin_expected, expected_frame)
|
||||||
|
return expected_frame
|
||||||
|
|
||||||
|
usage = '''
|
||||||
|
Usage: ./sbc_encoder_test.py encoder_input.wav blocks subbands bitpool encoder_expected_output.sbc
|
||||||
|
Example: ./sbc_encoder_test.py fanfare.wav 16 4 31 fanfare-4sb.sbc
|
||||||
|
'''
|
||||||
|
|
||||||
|
if (len(sys.argv) < 6):
|
||||||
|
print(usage)
|
||||||
|
sys.exit(1)
|
||||||
|
try:
|
||||||
|
encoder_input_wav = sys.argv[1]
|
||||||
|
nr_blocks = int(sys.argv[2])
|
||||||
|
nr_subbands = int(sys.argv[3])
|
||||||
|
bitpool = int(sys.argv[4])
|
||||||
|
encoder_expected_sbc = sys.argv[5]
|
||||||
|
sampling_frequency = 44100
|
||||||
|
|
||||||
|
if not encoder_input_wav.endswith('.wav'):
|
||||||
|
print(usage)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if not encoder_expected_sbc.endswith('.sbc'):
|
||||||
|
print(usage)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
fin = wave.open(encoder_input_wav, 'rb')
|
||||||
|
nr_channels = fin.getnchannels()
|
||||||
|
sampling_frequency = fin.getframerate()
|
||||||
|
nr_audio_frames = fin.getnframes()
|
||||||
|
|
||||||
|
fin_expected = open(encoder_expected_sbc, 'rb')
|
||||||
|
subband_frame_count = 0
|
||||||
|
audio_frame_count = 0
|
||||||
|
nr_samples = nr_blocks * nr_subbands
|
||||||
|
|
||||||
|
while audio_frame_count < nr_audio_frames:
|
||||||
|
if subband_frame_count % 200 == 0:
|
||||||
|
print("== Frame %d ==" % (subband_frame_count))
|
||||||
|
|
||||||
|
actual_frame = get_actual_frame(fin, nr_blocks, nr_subbands, nr_channels, bitpool, sampling_frequency)
|
||||||
|
expected_frame = get_expected_frame(fin_expected)
|
||||||
|
|
||||||
|
err = sbc_compare_headers(subband_frame_count, actual_frame, expected_frame)
|
||||||
|
if err < 0:
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
err = sbc_compare_audio_frames(subband_frame_count, actual_frame, expected_frame)
|
||||||
|
if err < 0:
|
||||||
|
exit(1)
|
||||||
|
audio_frame_count += nr_samples
|
||||||
|
subband_frame_count += 1
|
||||||
|
|
||||||
|
print "DONE, max MSE audio sample error %d", max_error
|
||||||
|
fin.close()
|
||||||
|
fin_expected.close()
|
||||||
|
|
||||||
|
except IOError:
|
||||||
|
print(usage)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -1,184 +0,0 @@
|
|||||||
#!/usr/bin/env python
|
|
||||||
import numpy as np
|
|
||||||
import wave
|
|
||||||
import struct
|
|
||||||
import sys
|
|
||||||
from sbc import *
|
|
||||||
from sbc_encoder import *
|
|
||||||
from sbc_decoder import *
|
|
||||||
|
|
||||||
error = 100
|
|
||||||
max_error = -1
|
|
||||||
|
|
||||||
def dump_samples(a,b,nr_blocks):
|
|
||||||
for blk in range(nr_blocks):
|
|
||||||
print "D%02d %s" %(blk, b[blk][0])
|
|
||||||
print "E%02d %s" %(blk, a[blk][0])
|
|
||||||
print "F%02d %s" %(blk, a[blk][0] - b[blk][0])
|
|
||||||
print
|
|
||||||
|
|
||||||
def mse(a,b):
|
|
||||||
count = 1
|
|
||||||
for i in a.shape:
|
|
||||||
count *= i
|
|
||||||
delta = a - b
|
|
||||||
sqr = delta ** 2
|
|
||||||
# print "count", count
|
|
||||||
# print "sqr", sqr
|
|
||||||
# print "sum", sqr.sum()
|
|
||||||
res = sqr.sum()*1.0/count
|
|
||||||
# print "mse", res
|
|
||||||
return res
|
|
||||||
|
|
||||||
def check_equal(encoder, decoder, frame_count):
|
|
||||||
global max_error
|
|
||||||
M = mse(encoder.sb_sample, decoder.sb_sample)
|
|
||||||
# print "sb_sample error", M
|
|
||||||
# dump_samples(encoder.sb_sample, decoder.sb_sample, encoder.nr_blocks)
|
|
||||||
if M > max_error:
|
|
||||||
max_error = M
|
|
||||||
print "sb_sample error ", frame_count, max_error, M
|
|
||||||
if M > error:
|
|
||||||
#print "sb_sample error ", frame_count, M
|
|
||||||
#dump_samples(encoder.sb_sample, decoder.sb_sample, encoder.nr_blocks)
|
|
||||||
return -1
|
|
||||||
|
|
||||||
return
|
|
||||||
|
|
||||||
if mse(encoder.pcm, decoder.pcm) > error:
|
|
||||||
print "pcm wrong ", encoder.pcm, decoder.pcm
|
|
||||||
return -1
|
|
||||||
|
|
||||||
if encoder.syncword != decoder.syncword:
|
|
||||||
print "syncword wrong ", encoder.syncword
|
|
||||||
return -1
|
|
||||||
|
|
||||||
if encoder.sampling_frequency != decoder.sampling_frequency:
|
|
||||||
print "sampling_frequency wrong ", encoder.sampling_frequency
|
|
||||||
return -1
|
|
||||||
|
|
||||||
if encoder.nr_blocks != decoder.nr_blocks:
|
|
||||||
print "nr_blocks wrong ", encoder.nr_blocks
|
|
||||||
return -1
|
|
||||||
|
|
||||||
if encoder.channel_mode != decoder.channel_mode:
|
|
||||||
print "channel_mode wrong ", encoder.channel_mode
|
|
||||||
return -1
|
|
||||||
|
|
||||||
if encoder.nr_channels != decoder.nr_channels:
|
|
||||||
print "nr_channels wrong ", encoder.nr_channels
|
|
||||||
return -1
|
|
||||||
|
|
||||||
if encoder.allocation_method != decoder.allocation_method:
|
|
||||||
print "allocation_method wrong ", encoder.allocation_method
|
|
||||||
return -1
|
|
||||||
|
|
||||||
if encoder.nr_subbands != decoder.nr_subbands:
|
|
||||||
print "nr_subbands wrong ", encoder.nr_subbands
|
|
||||||
return -1
|
|
||||||
|
|
||||||
if encoder.bitpool != decoder.bitpool:
|
|
||||||
print "bitpool wrong (E: %d, D: %d)" % (encoder.bitpool, decoder.bitpool)
|
|
||||||
return -1
|
|
||||||
|
|
||||||
|
|
||||||
if mse(encoder.join, decoder.join) > 0:
|
|
||||||
print "join error \nE:\n %s \nD:\n %s" % (encoder.join, decoder.join)
|
|
||||||
return -1
|
|
||||||
|
|
||||||
if mse(encoder.scale_factor, decoder.scale_factor) > 0:
|
|
||||||
print "scale_factor error \nE:\n %s \nD:\n %s" % (encoder.scale_factor, decoder.scale_factor)
|
|
||||||
return -1
|
|
||||||
|
|
||||||
if mse(encoder.scalefactor, decoder.scalefactor) > 0:
|
|
||||||
print "scalefactor error \nE:\n %s \nD:\n %s" % (encoder.scalefactor, decoder.scalefactor)
|
|
||||||
return -1
|
|
||||||
|
|
||||||
if mse(encoder.bits, decoder.bits) > 0:
|
|
||||||
print "bits error \nE:\n %s \nD:\n %s" % (encoder.bits, decoder.bits)
|
|
||||||
return -1
|
|
||||||
|
|
||||||
if mse(encoder.levels, decoder.levels) > 0:
|
|
||||||
print "levels error \nE:\n %s \nD:\n %s" % (encoder.levels, decoder.levels)
|
|
||||||
return -1
|
|
||||||
if mse(encoder.audio_sample, decoder.audio_sample) > error:
|
|
||||||
#print "audio_sample diff %s" % (encoder.audio_sample-decoder.audio_sample)
|
|
||||||
#print "audio_sample error \nE:\n %s \nD:\n %s" % (encoder.audio_sample, decoder.audio_sample)
|
|
||||||
print "audio_sample error"
|
|
||||||
dump_samples(encoder.audio_sample, decoder.audio_sample, encoder.nr_blocks)
|
|
||||||
return -1
|
|
||||||
|
|
||||||
if encoder.crc_check != decoder.crc_check:
|
|
||||||
print "crc_check wrong (E: %d, D: %d)" % (encoder.crc_check, decoder.crc_check)
|
|
||||||
return -1
|
|
||||||
|
|
||||||
return
|
|
||||||
|
|
||||||
|
|
||||||
usage = '''
|
|
||||||
Usage: ./sbc_test.py input.sbc
|
|
||||||
'''
|
|
||||||
|
|
||||||
if (len(sys.argv) < 2):
|
|
||||||
print(usage)
|
|
||||||
sys.exit(1)
|
|
||||||
try:
|
|
||||||
infile = sys.argv[1]
|
|
||||||
if not infile.endswith('.sbc'):
|
|
||||||
print(usage)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
with open (infile, 'rb') as fin:
|
|
||||||
try:
|
|
||||||
frame_count = 0
|
|
||||||
while True:
|
|
||||||
sbc_decoder_frame = SBCFrame(0,0,0,0,0)
|
|
||||||
if frame_count % 200 == 0:
|
|
||||||
print "== Frame %d ==" % (frame_count)
|
|
||||||
|
|
||||||
err = sbc_unpack_frame(fin, sbc_decoder_frame)
|
|
||||||
if err:
|
|
||||||
print "error, frame_count: ", frame_count
|
|
||||||
break
|
|
||||||
|
|
||||||
sbc_decode(sbc_decoder_frame) # => pcm
|
|
||||||
|
|
||||||
|
|
||||||
sbc_encoder_frame = SBCFrame( sbc_decoder_frame.nr_blocks,
|
|
||||||
sbc_decoder_frame.nr_subbands,
|
|
||||||
sbc_decoder_frame.nr_channels,
|
|
||||||
sbc_decoder_frame.sampling_frequency,
|
|
||||||
sbc_decoder_frame.bitpool)
|
|
||||||
sbc_encoder_frame.pcm = np.array(sbc_decoder_frame.pcm)
|
|
||||||
# TODO: join field
|
|
||||||
# TODO: clear memory
|
|
||||||
|
|
||||||
# sbc_encoder_frame.sb_sample = np.array(sbc_decoder_frame.sb_sample)
|
|
||||||
# sbc_quantization(sbc_encoder_frame)
|
|
||||||
|
|
||||||
# print "encoder pcm ", sbc_encoder_frame.pcm
|
|
||||||
# if frame_count > 86:
|
|
||||||
print sbc_decoder_frame.sb_sample[0][0]
|
|
||||||
sbc_encode(sbc_encoder_frame,frame_count >= 0)
|
|
||||||
# test
|
|
||||||
err = check_equal(sbc_encoder_frame, sbc_decoder_frame, frame_count)
|
|
||||||
if err:
|
|
||||||
print "error, frames not equal, frame_count: ", frame_count
|
|
||||||
break
|
|
||||||
|
|
||||||
# if frame_count == 0:
|
|
||||||
# break
|
|
||||||
frame_count += 1
|
|
||||||
|
|
||||||
except TypeError:
|
|
||||||
print "DONE"
|
|
||||||
exit(0)
|
|
||||||
|
|
||||||
except IOError as e:
|
|
||||||
print(usage)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
x
Reference in New Issue
Block a user