mirror of
https://github.com/bluekitchen/btstack.git
synced 2025-03-28 19:20:54 +00:00
sbc: fix write wav file, read sbc file for stereo
This commit is contained in:
parent
af086c983e
commit
1522543de8
124
test/sbc/sbc.py
124
test/sbc/sbc.py
@ -124,6 +124,76 @@ offset8 = np.array([[ -2, 0, 0, 0, 0, 0, 0, 1 ],
|
|||||||
[ -4, 0, 0, 0, 0, 0, 1, 2 ]
|
[ -4, 0, 0, 0, 0, 0, 1, 2 ]
|
||||||
])
|
])
|
||||||
|
|
||||||
|
def calculate_scalefactor(max_subbandsample):
|
||||||
|
x = 0
|
||||||
|
while True:
|
||||||
|
y = 1 << x + 1
|
||||||
|
if y > max_subbandsample:
|
||||||
|
break
|
||||||
|
x += 1
|
||||||
|
return (x,y)
|
||||||
|
|
||||||
|
|
||||||
|
def calculate_max_subbandsample(nr_blocks, nr_channels, nr_subbands, sb_sample):
|
||||||
|
max_subbandsample = np.zeros(shape = (nr_channels, nr_subbands))
|
||||||
|
|
||||||
|
for blk in range(nr_blocks):
|
||||||
|
for ch in range(nr_channels):
|
||||||
|
for sb in range(nr_subbands):
|
||||||
|
m = abs(sb_sample[blk][ch][sb])
|
||||||
|
if max_subbandsample[ch][sb] < m:
|
||||||
|
max_subbandsample[ch][sb] = m
|
||||||
|
return max_subbandsample
|
||||||
|
|
||||||
|
def calculate_scalefactors(nr_blocks, nr_channels, nr_subbands, sb_sample):
|
||||||
|
scale_factor = np.zeros(shape=(nr_channels, nr_subbands), dtype = np.int32)
|
||||||
|
scalefactor = np.zeros(shape=(nr_channels, nr_subbands), dtype = np.int32)
|
||||||
|
|
||||||
|
# max_subbandsample = calculate_max_subbandsample(nr_blocks, nr_channels, nr_subbands, sb_sample)
|
||||||
|
# for ch in range(nr_channels):
|
||||||
|
# for sb in range(nr_subbands):
|
||||||
|
# (scale_factor[ch][sb], scalefactor[ch][sb]) = calculate_scalefactor(max_subbandsample[ch][sb])
|
||||||
|
|
||||||
|
for ch in range(nr_channels):
|
||||||
|
for sb in range(nr_subbands):
|
||||||
|
scale_factor[ch][sb] = 0
|
||||||
|
scalefactor[ch][sb] = 2
|
||||||
|
for blk in range(nr_blocks):
|
||||||
|
while scalefactor[ch][sb] < abs(sb_sample[blk][ch][sb]):
|
||||||
|
scale_factor[ch][sb]+=1
|
||||||
|
scalefactor[ch][sb] *= 2
|
||||||
|
|
||||||
|
return scale_factor, scalefactor
|
||||||
|
|
||||||
|
def calculate_scalefactors_and_channel_mode(frame):
|
||||||
|
frame.scale_factor, frame.scalefactor = calculate_scalefactors(frame.nr_blocks, frame.nr_channels, frame.nr_subbands, frame.sb_sample)
|
||||||
|
#print "calculate_scalefactors_and_channel_mode1 ", frame.scale_factor
|
||||||
|
|
||||||
|
if frame.nr_channels == 1:
|
||||||
|
frame.channel_mode = MONO
|
||||||
|
else:
|
||||||
|
sb_sample1 = np.zeros(shape = (frame.nr_blocks,2,frame.nr_subbands), dtype = np.uint16)
|
||||||
|
|
||||||
|
for blk in range(frame.nr_blocks):
|
||||||
|
for sb in range(frame.nr_subbands):
|
||||||
|
sb_sample1[blk][0][sb] = frame.sb_sample[blk][0][sb] + frame.sb_sample[blk][1][sb]
|
||||||
|
sb_sample1[blk][1][sb] = frame.sb_sample[blk][0][sb] - frame.sb_sample[blk][1][sb]
|
||||||
|
|
||||||
|
scale_factor, scalefactor = calculate_scalefactors(frame.nr_blocks, frame.nr_channels, frame.nr_subbands, sb_sample1)
|
||||||
|
#print "calculate_scalefactors_and_channel_mode 2", scale_factor
|
||||||
|
sumb = 0
|
||||||
|
suma = 0
|
||||||
|
for sb in range(frame.nr_subbands):
|
||||||
|
suma += frame.scale_factor[0][sb] + frame.scale_factor[1][sb]
|
||||||
|
sumb += scale_factor[0][sb] + scale_factor[1][sb]
|
||||||
|
|
||||||
|
#print "calculate_scalefactors_and_channel_mode 3", suma, sumb
|
||||||
|
if suma > sumb:
|
||||||
|
frame.channel_mode = JOINT_STEREO
|
||||||
|
else:
|
||||||
|
frame.channel_mode = STEREO
|
||||||
|
|
||||||
|
|
||||||
class SBCFrame:
|
class SBCFrame:
|
||||||
syncword = 0
|
syncword = 0
|
||||||
sampling_frequency = 0
|
sampling_frequency = 0
|
||||||
@ -142,7 +212,7 @@ class SBCFrame:
|
|||||||
sb_sample = np.zeros(shape = (16,2,8), dtype = np.uint16)
|
sb_sample = np.zeros(shape = (16,2,8), dtype = np.uint16)
|
||||||
X = np.zeros(8, dtype = np.int16)
|
X = np.zeros(8, dtype = np.int16)
|
||||||
EX = np.zeros(8)
|
EX = np.zeros(8)
|
||||||
pcm = np.array([], dtype = np.int16)
|
pcm = np.zeros(shape=(2, 8*16), dtype = np.int16)
|
||||||
bits = np.zeros(shape=(2, 8))
|
bits = np.zeros(shape=(2, 8))
|
||||||
levels = np.zeros(shape=(2, 8), dtype = np.int32)
|
levels = np.zeros(shape=(2, 8), dtype = np.int32)
|
||||||
|
|
||||||
@ -196,9 +266,10 @@ class SBCFrame:
|
|||||||
return res
|
return res
|
||||||
|
|
||||||
|
|
||||||
def sbc_bit_allocation_stereo_joint(frame, ch):
|
def sbc_bit_allocation_stereo_joint(frame):
|
||||||
bitneed = np.zeros(shape=(frame.nr_channels, frame.nr_subbands))
|
bitneed = np.zeros(shape=(frame.nr_channels, frame.nr_subbands), dtype = np.int32)
|
||||||
bits = np.zeros(shape=(frame.nr_channels, frame.nr_subbands))
|
bits = np.zeros(shape=(frame.nr_channels, frame.nr_subbands), dtype = np.int32)
|
||||||
|
|
||||||
loudness = 0
|
loudness = 0
|
||||||
|
|
||||||
if frame.allocation_method == SNR:
|
if frame.allocation_method == SNR:
|
||||||
@ -214,14 +285,12 @@ def sbc_bit_allocation_stereo_joint(frame, ch):
|
|||||||
if frame.nr_subbands == 4:
|
if frame.nr_subbands == 4:
|
||||||
loudness = scale_factor[ch][sb] - offset4[frame.sampling_frequency][sb]
|
loudness = scale_factor[ch][sb] - offset4[frame.sampling_frequency][sb]
|
||||||
else:
|
else:
|
||||||
if frame.nr_subbands == 4:
|
loudness = frame.scale_factor[ch][sb] - offset8[frame.sampling_frequency][sb]
|
||||||
loudness = frame.scale_factor[ch][sb] - offset4[frame.sampling_frequency][sb]
|
|
||||||
else:
|
if loudness > 0:
|
||||||
loudness = frame.scale_factor[ch][sb] - offset8[frame.sampling_frequency][sb]
|
bitneed[ch][sb] = loudness/2
|
||||||
if loudness > 0:
|
else:
|
||||||
bitneed[ch][sb] = loudness/2
|
bitneed[ch][sb] = loudness
|
||||||
else:
|
|
||||||
bitneed[ch][sb] = loudness
|
|
||||||
|
|
||||||
# search the maximum bitneed index
|
# search the maximum bitneed index
|
||||||
max_bitneed = 0
|
max_bitneed = 0
|
||||||
@ -236,21 +305,21 @@ def sbc_bit_allocation_stereo_joint(frame, ch):
|
|||||||
bitslice = max_bitneed + 1 #/* init just above the largest sf */
|
bitslice = max_bitneed + 1 #/* init just above the largest sf */
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
bitslice = bitslice - 1
|
bitslice -= 1
|
||||||
bitcount = bitcount + slicecount
|
bitcount += slicecount
|
||||||
slicecount = 0
|
slicecount = 0
|
||||||
for ch in range(frame.nr_channels):
|
for ch in range(frame.nr_channels):
|
||||||
for sb in range(frame.nr_subbands):
|
for sb in range(frame.nr_subbands):
|
||||||
if (bitneed[ch][sb] > bitslice+1) and (bitneed[ch][sb] < bitslice+16):
|
if (bitneed[ch][sb] > bitslice+1) and (bitneed[ch][sb] < bitslice+16):
|
||||||
slicecount = slicecount + 1
|
slicecount += 1
|
||||||
elif bitneed[ch][sb] == bitslice + 1:
|
elif bitneed[ch][sb] == bitslice + 1:
|
||||||
slicecount = slicecount + 2
|
slicecount += 2
|
||||||
if bitcount + slicecount >= frame.bitpool:
|
if bitcount + slicecount >= frame.bitpool:
|
||||||
break
|
break
|
||||||
|
|
||||||
if bitcount + slicecount == frame.bitpool:
|
if bitcount + slicecount == frame.bitpool:
|
||||||
bitcount = bitcount + slicecount
|
bitcount += slicecount
|
||||||
bitslice = bitslice - 1
|
bitslice -= 1
|
||||||
|
|
||||||
# bits are distributed until the last bitslice is reached
|
# bits are distributed until the last bitslice is reached
|
||||||
for ch in range(frame.nr_channels):
|
for ch in range(frame.nr_channels):
|
||||||
@ -264,31 +333,18 @@ def sbc_bit_allocation_stereo_joint(frame, ch):
|
|||||||
sb = 0
|
sb = 0
|
||||||
while bitcount < frame.bitpool and sb < frame.nr_subbands:
|
while bitcount < frame.bitpool and sb < frame.nr_subbands:
|
||||||
if bits[ch][sb] >= 2 and bits[ch][sb] < 16:
|
if bits[ch][sb] >= 2 and bits[ch][sb] < 16:
|
||||||
bits[ch][sb] = bits[ch][sb] + 1
|
bits[ch][sb] += 1
|
||||||
bitcount = bitcount + 1
|
bitcount += 1
|
||||||
|
|
||||||
elif (bitneed[ch][sb] == bitslice+1) and (frame.bitpool > bitcount+1):
|
elif (bitneed[ch][sb] == bitslice+1) and (frame.bitpool > bitcount+1):
|
||||||
bits[ch][sb] = 2
|
bits[ch][sb] = 2
|
||||||
bitcount += 2
|
bitcount += 2
|
||||||
|
|
||||||
if ch == 1:
|
if ch == 1:
|
||||||
ch = 0
|
ch = 0
|
||||||
sb = sb + 1
|
sb += 1
|
||||||
else:
|
else:
|
||||||
ch = 1
|
ch = 1
|
||||||
|
|
||||||
ch = 0
|
|
||||||
sb = 0
|
|
||||||
while bitcount < frame.bitpool and sb < frame.nr_subbands:
|
|
||||||
if bits[ch][sb] < 16:
|
|
||||||
bits[ch][sb] = bits[ch][sb] + 1
|
|
||||||
bitcount = bitcount + 1
|
|
||||||
if ch == 1:
|
|
||||||
ch = 0
|
|
||||||
sb = sb + 1
|
|
||||||
else:
|
|
||||||
ch = 1
|
|
||||||
|
|
||||||
return bits
|
return bits
|
||||||
|
|
||||||
|
|
||||||
|
@ -44,6 +44,7 @@ def sbc_unpack_frame(fin, available_bytes, frame):
|
|||||||
for ch in range(frame.nr_channels):
|
for ch in range(frame.nr_channels):
|
||||||
for sb in range(frame.nr_subbands):
|
for sb in range(frame.nr_subbands):
|
||||||
frame.scale_factor[ch][sb] = get_bits(fin, 4)
|
frame.scale_factor[ch][sb] = get_bits(fin, 4)
|
||||||
|
|
||||||
crc = calculate_crc(frame)
|
crc = calculate_crc(frame)
|
||||||
if crc != frame.crc_check:
|
if crc != frame.crc_check:
|
||||||
print frame
|
print frame
|
||||||
@ -65,7 +66,6 @@ def sbc_unpack_frame(fin, available_bytes, frame):
|
|||||||
frame.audio_sample[blk][ch][sb] = get_bits(fin, frame.bits[ch][sb])
|
frame.audio_sample[blk][ch][sb] = get_bits(fin, frame.bits[ch][sb])
|
||||||
# print "block %2d - audio sample: %s" % (blk, frame.audio_sample[blk][0])
|
# print "block %2d - audio sample: %s" % (blk, frame.audio_sample[blk][0])
|
||||||
|
|
||||||
# add padding
|
|
||||||
drop_remaining_bits()
|
drop_remaining_bits()
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
@ -136,11 +136,13 @@ def sbc_frame_synthesis(frame, ch, blk, proto_table):
|
|||||||
W[i] = U[i]*D
|
W[i] = U[i]*D
|
||||||
|
|
||||||
|
|
||||||
|
offset = blk*M
|
||||||
|
|
||||||
for j in range(M):
|
for j in range(M):
|
||||||
for i in range(10):
|
for i in range(10):
|
||||||
frame.X[j] += W[j+M*i]
|
frame.X[j] += W[j+M*i]
|
||||||
|
|
||||||
frame.pcm = np.concatenate([frame.pcm, np.int16(frame.X)])
|
frame.pcm[ch][offset + j] = np.int16(frame.X[j])
|
||||||
|
|
||||||
|
|
||||||
def sbc_synthesis(frame):
|
def sbc_synthesis(frame):
|
||||||
@ -166,14 +168,16 @@ def sbc_decode(frame):
|
|||||||
|
|
||||||
def write_wav_file(fout, frame):
|
def write_wav_file(fout, frame):
|
||||||
values = []
|
values = []
|
||||||
for i in range(len(frame.pcm)):
|
|
||||||
try:
|
for i in range(frame.nr_subbands * frame.nr_blocks):
|
||||||
packed_value = struct.pack('h', frame.pcm[i])
|
for ch in range(frame.nr_channels):
|
||||||
values.append(packed_value)
|
try:
|
||||||
except struct.error:
|
packed_value = struct.pack('h', frame.pcm[ch][i])
|
||||||
print frame
|
values.append(packed_value)
|
||||||
print i, frame.pcm[i], frame.pcm
|
except struct.error:
|
||||||
exit(1)
|
print frame
|
||||||
|
print i, frame.pcm[ch][i], frame.pcm[ch]
|
||||||
|
exit(1)
|
||||||
|
|
||||||
value_str = ''.join(values)
|
value_str = ''.join(values)
|
||||||
fout.writeframes(value_str)
|
fout.writeframes(value_str)
|
||||||
|
@ -12,14 +12,14 @@ max_error = -1
|
|||||||
|
|
||||||
def sbc_compare_pcm(frame_count, actual_frame, expected_frame):
|
def sbc_compare_pcm(frame_count, actual_frame, expected_frame):
|
||||||
global error, max_error
|
global error, max_error
|
||||||
|
for ch in range(actual_frame.nr_channels):
|
||||||
|
M = mse(actual_frame.pcm[ch], expected_frame.pcm[ch])
|
||||||
|
if M > max_error:
|
||||||
|
max_error = M
|
||||||
|
|
||||||
M = mse(actual_frame.pcm, expected_frame.pcm)
|
if M > error:
|
||||||
if M > max_error:
|
print "pcm error (%d, %d ) " % (frame_count, M)
|
||||||
max_error = M
|
return -1
|
||||||
|
|
||||||
if M > error:
|
|
||||||
print "pcm error (%d, %d ) " % (frame_count, M)
|
|
||||||
return -1
|
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
@ -66,6 +66,7 @@ def get_actual_frame(fin):
|
|||||||
def get_expected_frame(fin_expected, nr_blocks, nr_subbands, nr_channels, sampling_frequency, bitpool, allocation_method):
|
def get_expected_frame(fin_expected, nr_blocks, nr_subbands, nr_channels, sampling_frequency, bitpool, allocation_method):
|
||||||
expected_frame = SBCFrame(nr_blocks, nr_subbands, nr_channels, sampling_frequency, bitpool, allocation_method)
|
expected_frame = SBCFrame(nr_blocks, nr_subbands, nr_channels, sampling_frequency, bitpool, allocation_method)
|
||||||
fetch_samples_for_next_sbc_frame(fin_expected, expected_frame)
|
fetch_samples_for_next_sbc_frame(fin_expected, expected_frame)
|
||||||
|
calculate_scalefactors_and_channel_mode(expected_frame)
|
||||||
return expected_frame
|
return expected_frame
|
||||||
|
|
||||||
usage = '''
|
usage = '''
|
||||||
@ -110,7 +111,6 @@ try:
|
|||||||
actual_frame.bitpool, sampling_frequency,
|
actual_frame.bitpool, sampling_frequency,
|
||||||
actual_frame.allocation_method)
|
actual_frame.allocation_method)
|
||||||
|
|
||||||
|
|
||||||
err = sbc_compare_headers(subband_frame_count, actual_frame, expected_frame)
|
err = sbc_compare_headers(subband_frame_count, actual_frame, expected_frame)
|
||||||
if err < 0:
|
if err < 0:
|
||||||
print ("Headers differ \n%s\n%s" % (actual_frame, expected_frame))
|
print ("Headers differ \n%s\n%s" % (actual_frame, expected_frame))
|
||||||
|
@ -9,18 +9,22 @@ X = np.zeros(80, dtype = np.int16)
|
|||||||
|
|
||||||
|
|
||||||
def fetch_samples_for_next_sbc_frame(fin, frame):
|
def fetch_samples_for_next_sbc_frame(fin, frame):
|
||||||
nr_audio_frames = frame.nr_blocks * frame.nr_subbands
|
nr_samples = frame.nr_blocks * frame.nr_subbands * frame.nr_channels
|
||||||
raw_data = fin.readframes(nr_audio_frames) # Returns byte data
|
raw_data = fin.readframes(nr_samples) # Returns byte data
|
||||||
|
|
||||||
total_samples = nr_audio_frames * frame.nr_channels
|
|
||||||
len_raw_data = len(raw_data) / 2
|
len_raw_data = len(raw_data) / 2
|
||||||
|
|
||||||
padding = np.zeros(total_samples - len_raw_data, dtype=np.int16)
|
|
||||||
|
|
||||||
fmt = "%ih" % len_raw_data # read signed 2 byte shorts
|
fmt = "%ih" % len_raw_data # read signed 2 byte shorts
|
||||||
|
|
||||||
frame.pcm = np.concatenate([np.array(struct.unpack(fmt, raw_data)), padding])
|
data = struct.unpack(fmt, raw_data)
|
||||||
del raw_data
|
len_data = len(data)
|
||||||
|
|
||||||
|
for i in range(frame.nr_blocks * frame.nr_subbands):
|
||||||
|
for ch in range(frame.nr_channels):
|
||||||
|
index = i*2 + ch
|
||||||
|
if index < len_data:
|
||||||
|
frame.pcm[ch][i] = data[i*2 + ch]
|
||||||
|
else:
|
||||||
|
frame.pcm[ch][i] = 0
|
||||||
|
|
||||||
|
|
||||||
def sbc_frame_analysis(frame, ch, blk, C):
|
def sbc_frame_analysis(frame, ch, blk, C):
|
||||||
@ -69,7 +73,7 @@ def sbc_analysis(frame):
|
|||||||
for ch in range(frame.nr_channels):
|
for ch in range(frame.nr_channels):
|
||||||
for blk in range(frame.nr_blocks):
|
for blk in range(frame.nr_blocks):
|
||||||
for sb in range(frame.nr_subbands):
|
for sb in range(frame.nr_subbands):
|
||||||
frame.EX[sb] = np.int16(frame.pcm[index])
|
frame.EX[sb] = np.int16(frame.pcm[ch][index])
|
||||||
index+=1
|
index+=1
|
||||||
sbc_frame_analysis(frame, ch, blk, C)
|
sbc_frame_analysis(frame, ch, blk, C)
|
||||||
return 0
|
return 0
|
||||||
@ -110,38 +114,9 @@ def calculate_joint_stereo_signal(frame):
|
|||||||
frame.sb_sample[blk][0][sb] = sb_sample[blk][0][sb]
|
frame.sb_sample[blk][0][sb] = sb_sample[blk][0][sb]
|
||||||
frame.sb_sample[blk][1][sb] = sb_sample[blk][1][sb]
|
frame.sb_sample[blk][1][sb] = sb_sample[blk][1][sb]
|
||||||
|
|
||||||
def calculate_scalefactor(max_subbandsample):
|
|
||||||
x = 0
|
|
||||||
while True:
|
|
||||||
y = 1 << x + 1
|
|
||||||
if y > max_subbandsample:
|
|
||||||
break
|
|
||||||
x += 1
|
|
||||||
return (x,y)
|
|
||||||
|
|
||||||
|
|
||||||
def sbc_quantization(frame):
|
def sbc_quantization(frame):
|
||||||
max_subbandsample = np.zeros(shape = (frame.nr_channels, frame.nr_subbands))
|
calculate_scalefactors_and_channel_mode(frame)
|
||||||
|
|
||||||
for blk in range(frame.nr_blocks):
|
|
||||||
for ch in range(frame.nr_channels):
|
|
||||||
for sb in range(frame.nr_subbands):
|
|
||||||
m = abs(frame.sb_sample[blk][ch][sb])
|
|
||||||
if max_subbandsample[ch][sb] < m:
|
|
||||||
max_subbandsample[ch][sb] = m
|
|
||||||
|
|
||||||
|
|
||||||
for ch in range(frame.nr_channels):
|
|
||||||
for sb in range(frame.nr_subbands):
|
|
||||||
frame.scale_factor[ch][sb] = 0
|
|
||||||
frame.scalefactor[ch][sb] = 2
|
|
||||||
for blk in range(frame.nr_blocks):
|
|
||||||
while frame.scalefactor[ch][sb] < abs(frame.sb_sample[blk][ch][sb]):
|
|
||||||
frame.scale_factor[ch][sb]+=1
|
|
||||||
frame.scalefactor[ch][sb] *= 2
|
|
||||||
|
|
||||||
#(frame.scale_factor[ch][sb], frame.scalefactor[ch][sb]) = calculate_scalefactor(max_subbandsample[ch][sb])
|
|
||||||
|
|
||||||
frame.bits = sbc_bit_allocation(frame)
|
frame.bits = sbc_bit_allocation(frame)
|
||||||
|
|
||||||
# Reconstruct the Audio Samples
|
# Reconstruct the Audio Samples
|
||||||
@ -154,6 +129,8 @@ def sbc_quantization(frame):
|
|||||||
frame.crc_check = calculate_crc(frame)
|
frame.crc_check = calculate_crc(frame)
|
||||||
|
|
||||||
frame.join = np.zeros(frame.nr_subbands, dtype = np.uint8)
|
frame.join = np.zeros(frame.nr_subbands, dtype = np.uint8)
|
||||||
|
|
||||||
|
|
||||||
if frame.channel_mode == JOINT_STEREO:
|
if frame.channel_mode == JOINT_STEREO:
|
||||||
calculate_joint_stereo_signal(frame)
|
calculate_joint_stereo_signal(frame)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user