btstack/test/sbc/sbc_encoder.py

236 lines
7.5 KiB
Python
Raw Normal View History

#!/usr/bin/env python
import numpy as np
import wave
import struct
import sys
from sbc import *
X = np.zeros(80, dtype = np.int16)
def fetch_samples_for_next_sbc_frame(fin, frame):
nr_audio_frames = frame.nr_blocks * frame.nr_subbands
raw_data = fin.readframes(nr_audio_frames) # Returns byte data
total_samples = nr_audio_frames * frame.nr_channels
len_raw_data = len(raw_data) / 2
padding = np.zeros(total_samples - len_raw_data, dtype=np.int16)
fmt = "%ih" % len_raw_data # read signed 2 byte shorts
frame.pcm = np.concatenate([np.array(struct.unpack(fmt, raw_data)), padding])
del raw_data
def sbc_frame_analysis(frame, ch, blk, C):
global X
M = frame.nr_subbands
L = 10 * M
M2 = 2*M
L2 = 2*L
Z = np.zeros(L)
Y = np.zeros(M2)
W = np.zeros(shape=(M, M2))
S = np.zeros(M)
for i in range(L-1, M-1, -1):
X[i] = X[i-M]
for i in range(M-1, -1, -1):
X[i] = frame.EX[M-1-i]
for i in range(L):
Z[i] = X[i] * C[i]
for i in range(M2):
for k in range(5):
Y[i] += Z[i+k*8]
for i in range(M):
for k in range(M2):
W[i][k] = np.cos((i+0.5)*(k-2)*np.pi/M)
S[i] += W[i][k] * Y[k]
for sb in range(M):
frame.sb_sample[blk][ch][sb] = S[sb]
def sbc_analysis(frame):
if frame.nr_subbands == 4:
C = Proto_4_40
elif frame.nr_subbands == 8:
C = Proto_8_80
else:
return -1
frame.sb_sample = np.ndarray(shape=(frame.nr_blocks, frame.nr_channels, frame.nr_subbands))
index = 0
for ch in range(frame.nr_channels):
for blk in range(frame.nr_blocks):
for sb in range(frame.nr_subbands):
frame.EX[sb] = np.int16(frame.pcm[index])
index+=1
sbc_frame_analysis(frame, ch, blk, C)
return 0
def sbc_encode(frame):
err = sbc_analysis(frame)
if err >= 0:
err = sbc_quantization(frame)
return err
def calculate_joint_stereo_signal(frame):
sb_sample = np.zeros(shape = (frame.nr_blocks,frame.nr_channels,frame.nr_subbands), dtype = np.uint32)
scale_factor = np.zeros(shape=(frame.nr_channels, frame.nr_subbands), dtype = np.int32)
scalefactor = np.zeros(shape=(frame.nr_channels, frame.nr_subbands), dtype = np.int32)
for sb in range(frame.nr_subbands-1):
for blk in range(frame.nr_blocks):
sb_sample[blk][0][sb] = (frame.sb_sample_f[blk][0][sb] + frame.sb_sample_f[blk][1][sb]) >> 1
sb_sample[blk][1][sb] = (frame.sb_sample_f[blk][0][sb] - frame.sb_sample_f[blk][1][sb]) >> 1
for ch in range(frame.nr_channels):
for sb in range(frame.nr_subbands-1):
frame.scale_factor[ch][sb] = 0
frame.scalefactor[ch][sb] = 2
for blk in range(frame.nr_blocks):
while frame.scalefactor[ch][sb] < abs(frame.sb_sample[blk][ch][sb]):
frame.scale_factor[ch][sb]+=1
frame.scalefactor[ch][sb] *= 2
for sb in range(frame.nr_subbands-1):
if (frame.scalefactor[0][sb] + frame.scalefactor[1][sb]) > (scalefactor[0][sb] + scalefactor[1][sb]):
frame.join[sb] = 1
frame.scale_factor[0][sb] = scale_factor[0][sb]
frame.scale_factor[1][sb] = scale_factor[1][sb]
frame.scalefactor[0][sb] = scalefactor[0][sb]
frame.scalefactor[1][sb] = scalefactor[1][sb]
for blk in range(frame.nr_blocks):
frame.sb_sample[blk][0][sb] = sb_sample[blk][0][sb]
frame.sb_sample[blk][1][sb] = sb_sample[blk][1][sb]
def calculate_scalefactor(max_subbandsample):
x = 0
while True:
y = 1 << x + 1
if y > max_subbandsample:
break
x += 1
return (x,y)
def sbc_quantization(frame):
max_subbandsample = np.zeros(shape = (frame.nr_channels, frame.nr_subbands))
for blk in range(frame.nr_blocks):
for ch in range(frame.nr_channels):
for sb in range(frame.nr_subbands):
m = abs(frame.sb_sample[blk][ch][sb])
if max_subbandsample[ch][sb] < m:
max_subbandsample[ch][sb] = m
for ch in range(frame.nr_channels):
for sb in range(frame.nr_subbands):
frame.scale_factor[ch][sb] = 0
frame.scalefactor[ch][sb] = 2
for blk in range(frame.nr_blocks):
while frame.scalefactor[ch][sb] < abs(frame.sb_sample[blk][ch][sb]):
frame.scale_factor[ch][sb]+=1
frame.scalefactor[ch][sb] *= 2
#(frame.scale_factor[ch][sb], frame.scalefactor[ch][sb]) = calculate_scalefactor(max_subbandsample[ch][sb])
frame.bits = sbc_bit_allocation(frame)
# Reconstruct the Audio Samples
frame.levels = np.zeros(shape=(frame.nr_channels, frame.nr_subbands), dtype = np.int32)
for ch in range(frame.nr_channels):
for sb in range(frame.nr_subbands):
frame.levels[ch][sb] = (1 << frame.bits[ch][sb]) - 1 #pow(2.0, frame.bits[ch][sb]) - 1
frame.syncword = 156
frame.crc_check = calculate_crc(frame)
frame.join = np.zeros(frame.nr_subbands, dtype = np.uint8)
if frame.channel_mode == JOINT_STEREO:
calculate_joint_stereo_signal(frame)
for blk in range(frame.nr_blocks):
for ch in range(frame.nr_channels):
for sb in range(frame.nr_subbands):
if frame.levels[ch][sb] > 0:
SB = frame.sb_sample[blk][ch][sb]
L = frame.levels[ch][sb]
SF = frame.scalefactor[ch][sb]
2016-04-29 17:07:59 +02:00
frame.audio_sample[blk][ch][sb] = np.uint16(((SB * L / SF + L) - 1.0)/2.0)
else:
frame.audio_sample[blk][ch][sb] = 0
return 0
def sbc_write_frame(fout, sbc_encoder_frame):
stream = frame_to_bitstream(sbc_encoder_frame)
barray = bytearray(stream)
fout.write(barray)
if __name__ == "__main__":
usage = '''
Usage: ./sbc_encoder.py input.wav blocks subbands bitpool
Example: ./sbc_encoder.py fanfare.wav 16 4 31
'''
nr_blocks = 0
nr_subbands = 0
if (len(sys.argv) < 5):
print(usage)
sys.exit(1)
try:
infile = sys.argv[1]
if not infile.endswith('.wav'):
print(usage)
sys.exit(1)
sbcfile = infile.replace('.wav', '-encoded.sbc')
nr_blocks = int(sys.argv[2])
nr_subbands = int(sys.argv[3])
bitpool = int(sys.argv[4])
fin = wave.open(infile, 'rb')
nr_channels = fin.getnchannels()
sampling_frequency = fin.getframerate()
nr_audio_frames = fin.getnframes()
subband_frame_count = 0
audio_frame_count = 0
nr_samples = nr_blocks * nr_subbands
fout = open(sbcfile, 'wb')
while audio_frame_count < nr_audio_frames:
if subband_frame_count % 200 == 0:
print("== Frame %d ==" % (subband_frame_count))
sbc_encoder_frame = SBCFrame(nr_blocks, nr_subbands, nr_channels, bitpool, sampling_frequency)
fetch_samples_for_next_sbc_frame(fin, sbc_encoder_frame)
sbc_encode(sbc_encoder_frame)
sbc_write_frame(fout, sbc_encoder_frame)
audio_frame_count += nr_samples
subband_frame_count += 1
fin.close()
fout.close()
print("DONE, WAV file %s encoded into SBC file %s " % (infile, sbcfile))
except IOError as e:
print(usage)
sys.exit(1)