mirror of
https://github.com/bluekitchen/btstack.git
synced 2025-02-13 15:41:17 +00:00
sbc encoder draft version, analysis not working
This commit is contained in:
parent
d82cd87c8d
commit
c21a9c2f36
476
test/sbc/sbc.py
Normal file
476
test/sbc/sbc.py
Normal file
@ -0,0 +1,476 @@
|
||||
#!/usr/bin/env python
|
||||
import numpy as np
|
||||
import wave
|
||||
import struct
|
||||
import sys
|
||||
|
||||
|
||||
# channel mode
|
||||
MONO = 0
|
||||
DUAL_CHANNEL = 1
|
||||
STEREO = 2
|
||||
JOINT_STEREO = 3
|
||||
|
||||
# allocation method
|
||||
LOUDNESS = 0
|
||||
SNR = 1
|
||||
|
||||
Proto_4_40 = [
|
||||
0.00000000E+00, 5.36548976E-04, 1.49188357E-03, 2.73370904E-03,
|
||||
3.83720193E-03, 3.89205149E-03, 1.86581691E-03, -3.06012286E-03,
|
||||
1.09137620E-02, 2.04385087E-02, 2.88757392E-02, 3.21939290E-02,
|
||||
2.58767811E-02, 6.13245186E-03, -2.88217274E-02, -7.76463494E-02,
|
||||
1.35593274E-01, 1.94987841E-01, 2.46636662E-01, 2.81828203E-01,
|
||||
2.94315332E-01, 2.81828203E-01, 2.46636662E-01, 1.94987841E-01,
|
||||
-1.35593274E-01, -7.76463494E-02, -2.88217274E-02, 6.13245186E-03,
|
||||
2.58767811E-02, 3.21939290E-02, 2.88757392E-02, 2.04385087E-02,
|
||||
-1.09137620E-02, -3.06012286E-03, 1.86581691E-03, 3.89205149E-03,
|
||||
3.83720193E-03, 2.73370904E-03, 1.49188357E-03, 5.36548976E-04
|
||||
]
|
||||
|
||||
Proto_8_80 = [
|
||||
0.00000000E+00, 1.56575398E-04, 3.43256425E-04, 5.54620202E-04,
|
||||
8.23919506E-04, 1.13992507E-03, 1.47640169E-03, 1.78371725E-03,
|
||||
2.01182542E-03, 2.10371989E-03, 1.99454554E-03, 1.61656283E-03,
|
||||
9.02154502E-04, -1.78805361E-04, -1.64973098E-03, -3.49717454E-03,
|
||||
5.65949473E-03, 8.02941163E-03, 1.04584443E-02, 1.27472335E-02,
|
||||
1.46525263E-02, 1.59045603E-02, 1.62208471E-02, 1.53184106E-02,
|
||||
1.29371806E-02, 8.85757540E-03, 2.92408442E-03, -4.91578024E-03,
|
||||
-1.46404076E-02, -2.61098752E-02, -3.90751381E-02, -5.31873032E-02,
|
||||
6.79989431E-02, 8.29847578E-02, 9.75753918E-02, 1.11196689E-01,
|
||||
1.23264548E-01, 1.33264415E-01, 1.40753505E-01, 1.45389847E-01,
|
||||
1.46955068E-01, 1.45389847E-01, 1.40753505E-01, 1.33264415E-01,
|
||||
1.23264548E-01, 1.11196689E-01, 9.75753918E-02, 8.29847578E-02,
|
||||
-6.79989431E-02, -5.31873032E-02, -3.90751381E-02, -2.61098752E-02,
|
||||
-1.46404076E-02, -4.91578024E-03, 2.92408442E-03, 8.85757540E-03,
|
||||
1.29371806E-02, 1.53184106E-02, 1.62208471E-02, 1.59045603E-02,
|
||||
1.46525263E-02, 1.27472335E-02, 1.04584443E-02, 8.02941163E-03,
|
||||
-5.65949473E-03, -3.49717454E-03, -1.64973098E-03, -1.78805361E-04,
|
||||
9.02154502E-04, 1.61656283E-03, 1.99454554E-03, 2.10371989E-03,
|
||||
2.01182542E-03, 1.78371725E-03, 1.47640169E-03, 1.13992507E-03,
|
||||
8.23919506E-04, 5.54620202E-04, 3.43256425E-04, 1.56575398E-04
|
||||
]
|
||||
|
||||
crc_table = [
|
||||
0x00, 0x1D, 0x3A, 0x27, 0x74, 0x69, 0x4E, 0x53,
|
||||
0xE8, 0xF5, 0xD2, 0xCF, 0x9C, 0x81, 0xA6, 0xBB,
|
||||
0xCD, 0xD0, 0xF7, 0xEA, 0xB9, 0xA4, 0x83, 0x9E,
|
||||
0x25, 0x38, 0x1F, 0x02, 0x51, 0x4C, 0x6B, 0x76,
|
||||
0x87, 0x9A, 0xBD, 0xA0, 0xF3, 0xEE, 0xC9, 0xD4,
|
||||
0x6F, 0x72, 0x55, 0x48, 0x1B, 0x06, 0x21, 0x3C,
|
||||
0x4A, 0x57, 0x70, 0x6D, 0x3E, 0x23, 0x04, 0x19,
|
||||
0xA2, 0xBF, 0x98, 0x85, 0xD6, 0xCB, 0xEC, 0xF1,
|
||||
0x13, 0x0E, 0x29, 0x34, 0x67, 0x7A, 0x5D, 0x40,
|
||||
0xFB, 0xE6, 0xC1, 0xDC, 0x8F, 0x92, 0xB5, 0xA8,
|
||||
0xDE, 0xC3, 0xE4, 0xF9, 0xAA, 0xB7, 0x90, 0x8D,
|
||||
0x36, 0x2B, 0x0C, 0x11, 0x42, 0x5F, 0x78, 0x65,
|
||||
0x94, 0x89, 0xAE, 0xB3, 0xE0, 0xFD, 0xDA, 0xC7,
|
||||
0x7C, 0x61, 0x46, 0x5B, 0x08, 0x15, 0x32, 0x2F,
|
||||
0x59, 0x44, 0x63, 0x7E, 0x2D, 0x30, 0x17, 0x0A,
|
||||
0xB1, 0xAC, 0x8B, 0x96, 0xC5, 0xD8, 0xFF, 0xE2,
|
||||
0x26, 0x3B, 0x1C, 0x01, 0x52, 0x4F, 0x68, 0x75,
|
||||
0xCE, 0xD3, 0xF4, 0xE9, 0xBA, 0xA7, 0x80, 0x9D,
|
||||
0xEB, 0xF6, 0xD1, 0xCC, 0x9F, 0x82, 0xA5, 0xB8,
|
||||
0x03, 0x1E, 0x39, 0x24, 0x77, 0x6A, 0x4D, 0x50,
|
||||
0xA1, 0xBC, 0x9B, 0x86, 0xD5, 0xC8, 0xEF, 0xF2,
|
||||
0x49, 0x54, 0x73, 0x6E, 0x3D, 0x20, 0x07, 0x1A,
|
||||
0x6C, 0x71, 0x56, 0x4B, 0x18, 0x05, 0x22, 0x3F,
|
||||
0x84, 0x99, 0xBE, 0xA3, 0xF0, 0xED, 0xCA, 0xD7,
|
||||
0x35, 0x28, 0x0F, 0x12, 0x41, 0x5C, 0x7B, 0x66,
|
||||
0xDD, 0xC0, 0xE7, 0xFA, 0xA9, 0xB4, 0x93, 0x8E,
|
||||
0xF8, 0xE5, 0xC2, 0xDF, 0x8C, 0x91, 0xB6, 0xAB,
|
||||
0x10, 0x0D, 0x2A, 0x37, 0x64, 0x79, 0x5E, 0x43,
|
||||
0xB2, 0xAF, 0x88, 0x95, 0xC6, 0xDB, 0xFC, 0xE1,
|
||||
0x5A, 0x47, 0x60, 0x7D, 0x2E, 0x33, 0x14, 0x09,
|
||||
0x7F, 0x62, 0x45, 0x58, 0x0B, 0x16, 0x31, 0x2C,
|
||||
0x97, 0x8A, 0xAD, 0xB0, 0xE3, 0xFE, 0xD9, 0xC4
|
||||
]
|
||||
# offset table for 4-subbands
|
||||
offset4 = np.array([[ -1, 0, 0, 0 ],
|
||||
[ -2, 0, 0, 1 ],
|
||||
[ -2, 0, 0, 1 ],
|
||||
[ -2, 0, 0, 1 ]
|
||||
])
|
||||
|
||||
# offset tables for 8-subbands
|
||||
offset8 = np.array([[ -2, 0, 0, 0, 0, 0, 0, 1 ],
|
||||
[ -3, 0, 0, 0, 0, 0, 1, 2 ],
|
||||
[ -4, 0, 0, 0, 0, 0, 1, 2 ],
|
||||
[ -4, 0, 0, 0, 0, 0, 1, 2 ]
|
||||
])
|
||||
|
||||
nr_blocks = [4, 8, 12, 16]
|
||||
nr_subbands = [4, 8]
|
||||
sampling_frequency =[16000, 32000, 44100, 48000]
|
||||
|
||||
class SBCFrame:
|
||||
syncword = 0
|
||||
sampling_frequency = 0
|
||||
nr_blocks = 0
|
||||
channel_mode = 0
|
||||
nr_channels = 0
|
||||
allocation_method = 0
|
||||
nr_subbands = 0
|
||||
bitpool = 0
|
||||
crc_check = 0
|
||||
# pro subband - 1
|
||||
join = np.zeros(8, dtype = np.uint8)
|
||||
scale_factor = np.zeros(shape=(2, 8), dtype = np.int32)
|
||||
scalefactor = np.zeros(shape=(2, 8), dtype = np.int32)
|
||||
audio_sample = np.zeros(shape = (16,2,8), dtype = np.uint16)
|
||||
sb_sample = np.zeros(shape = (16,2,8), dtype = np.uint16)
|
||||
X = np.zeros(8, dtype = np.int16)
|
||||
EX = np.zeros(8)
|
||||
pcm = np.array([], dtype = np.int16)
|
||||
bits = np.zeros(shape=(2, 8))
|
||||
levels = np.zeros(shape=(2, 8), dtype = np.int32)
|
||||
|
||||
def __init__(self, nr_blocks, nr_subbands, nr_channels, sampling_frequency, bitpool):
|
||||
self.nr_blocks = nr_blocks
|
||||
self.nr_subbands = nr_subbands
|
||||
self.nr_channels = nr_channels
|
||||
self.sampling_frequency = sampling_frequency
|
||||
self.bitpool = bitpool
|
||||
self.scale_factor = np.zeros(shape=(nr_channels, nr_subbands), dtype = np.int32)
|
||||
self.scalefactor = np.zeros(shape=(nr_channels, nr_subbands), dtype = np.int32)
|
||||
self.audio_sample = np.zeros(shape=(nr_blocks, nr_channels, nr_subbands), dtype = np.uint16)
|
||||
self.sb_sample = np.zeros(shape=(nr_blocks, nr_channels, nr_subbands), dtype = np.uint16)
|
||||
self.levels = np.zeros(shape=(nr_channels, nr_subbands), dtype = np.int32)
|
||||
self.EX = np.zeros(nr_subbands)
|
||||
return
|
||||
|
||||
def __str__(self):
|
||||
res = "SBCFrameHeader:"
|
||||
res = res + "\n - syncword %d" % self.syncword
|
||||
res = res + "\n - sample frequency %d" % self.sampling_frequency
|
||||
res = res + "\n - nr blocks %d" % self.nr_blocks
|
||||
|
||||
if self.channel_mode == MONO:
|
||||
res = res + "\n - channel mode MONO"
|
||||
elif self.channel_mode == DUAL_CHANNEL:
|
||||
res = res + "\n - channel mode DUAL CHANNEL"
|
||||
elif self.channel_mode == STEREO:
|
||||
res = res + "\n - channel mode STEREO"
|
||||
elif self.channel_mode == JOINT_STEREO:
|
||||
res = res + "\n - channel mode JOINT STEREO"
|
||||
else:
|
||||
res = res + "\n - channel mode %d" % self.channel_mode
|
||||
|
||||
res = res + "\n - nr channels %d" % self.nr_channels
|
||||
|
||||
if self.allocation_method == 1:
|
||||
res = res + "\n - allocation method SNR"
|
||||
elif self.allocation_method == 0:
|
||||
res = res + "\n - allocation method LOUNDNESS"
|
||||
else:
|
||||
res = res + "\n - allocation method %d" % self.allocation_method
|
||||
|
||||
res = res + "\n - nr subbands %d" % self.nr_subbands
|
||||
res = res + "\n - bitpool %d" % self.bitpool
|
||||
res = res + "\n - crc check %d" % self.crc_check
|
||||
|
||||
return res
|
||||
|
||||
|
||||
|
||||
def sbc_bit_allocation_stereo_joint(frame, ch):
|
||||
bitneed = np.zeros(shape=(frame.nr_channels, frame.nr_subbands))
|
||||
bits = np.zeros(shape=(frame.nr_channels, frame.nr_subbands))
|
||||
loudness = 0
|
||||
|
||||
if frame.allocation_method == SNR:
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
bitneed[ch][sb] = frame.scale_factor[ch][sb]
|
||||
else:
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
if frame.scale_factor[ch][sb] == 0:
|
||||
bitneed[ch][sb] = -5
|
||||
else:
|
||||
if frame.nr_subbands == 4:
|
||||
loudness = scale_factor[ch][sb] - offset4[frame.sampling_frequency][sb]
|
||||
else:
|
||||
if frame.nr_subbands == 4:
|
||||
loudness = frame.scale_factor[ch][sb] - offset4[frame.sampling_frequency][sb]
|
||||
else:
|
||||
loudness = frame.scale_factor[ch][sb] - offset8[frame.sampling_frequency][sb]
|
||||
if loudness > 0:
|
||||
bitneed[ch][sb] = loudness/2
|
||||
else:
|
||||
bitneed[ch][sb] = loudness
|
||||
|
||||
# search the maximum bitneed index
|
||||
max_bitneed = 0
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
if bitneed[ch][sb] > max_bitneed:
|
||||
max_bitneed = bitneed[ch][sb]
|
||||
|
||||
# # print "max_bitneed: ", max_bitneed
|
||||
|
||||
# calculate how many bitslices fit into the bitpool
|
||||
bitcount = 0
|
||||
slicecount = 0
|
||||
bitslice = max_bitneed + 1 #/* init just above the largest sf */
|
||||
|
||||
while True:
|
||||
bitslice = bitslice - 1
|
||||
bitcount = bitcount + slicecount
|
||||
slicecount = 0
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
if (bitneed[ch][sb] > bitslice+1) and (bitneed[ch][sb] < bitslice+16):
|
||||
slicecount = slicecount + 1
|
||||
elif bitneed[ch][sb] == bitslice + 1:
|
||||
slicecount = slicecount + 2
|
||||
if bitcount + slicecount >= frame.bitpool:
|
||||
break
|
||||
|
||||
# print "bitcount %d, slicecount %d" % (bitcount, slicecount)
|
||||
|
||||
if bitcount + slicecount == frame.bitpool:
|
||||
bitcount = bitcount + slicecount
|
||||
bitslice = bitslice - 1
|
||||
|
||||
# bits are distributed until the last bitslice is reached
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
if bitneed[ch][sb] < bitslice+2 :
|
||||
bits[ch][sb]=0;
|
||||
else:
|
||||
bits[ch][sb] = min(bitneed[ch][sb]-bitslice,16)
|
||||
|
||||
|
||||
ch = 0
|
||||
sb = 0
|
||||
while bitcount < frame.bitpool and sb < frame.nr_subbands:
|
||||
if bits[ch][sb] >= 2 and bits[ch][sb] < 16:
|
||||
bits[ch][sb] = bits[ch][sb] + 1
|
||||
bitcount = bitcount + 1
|
||||
|
||||
elif (bitneed[ch][sb] == bitslice+1) and (frame.bitpool > bitcount+1):
|
||||
bits[ch][sb] = 2
|
||||
bitcount += 2
|
||||
|
||||
if ch == 1:
|
||||
ch = 0
|
||||
sb = sb + 1
|
||||
else:
|
||||
ch = 1
|
||||
|
||||
ch = 0
|
||||
sb = 0
|
||||
while bitcount < frame.bitpool and sb < frame.nr_subbands:
|
||||
if bits[ch][sb] < 16:
|
||||
bits[ch][sb] = bits[ch][sb] + 1
|
||||
bitcount = bitcount + 1
|
||||
if ch == 1:
|
||||
ch = 0
|
||||
sb = sb + 1
|
||||
else:
|
||||
ch = 1
|
||||
|
||||
return bits
|
||||
|
||||
|
||||
def sbc_bit_allocation_mono_dual(frame):
|
||||
#print "Bit allocation for mono/dual channel"
|
||||
bitneed = np.zeros(shape=(frame.nr_channels, frame.nr_subbands), dtype = np.int32)
|
||||
bits = np.zeros(shape=(frame.nr_channels, frame.nr_subbands), dtype = np.int32)
|
||||
loudness = 0
|
||||
|
||||
for ch in range(frame.nr_channels):
|
||||
# bitneed values are derived from the scale factors
|
||||
if frame.allocation_method == SNR:
|
||||
for sb in range(frame.nr_subbands):
|
||||
bitneed[ch][sb] = frame.scale_factor[ch][sb]
|
||||
else:
|
||||
for sb in range(frame.nr_subbands):
|
||||
if frame.scale_factor[ch][sb] == 0:
|
||||
bitneed[ch][sb] = -5
|
||||
else:
|
||||
if frame.nr_subbands == 4:
|
||||
loudness = frame.scale_factor[ch][sb] - offset4[frame.sampling_frequency][sb]
|
||||
else:
|
||||
loudness = frame.scale_factor[ch][sb] - offset8[frame.sampling_frequency][sb]
|
||||
if loudness > 0:
|
||||
bitneed[ch][sb] = loudness/2
|
||||
else:
|
||||
bitneed[ch][sb] = loudness
|
||||
|
||||
# search the maximum bitneed index
|
||||
max_bitneed = 0
|
||||
for sb in range(frame.nr_subbands):
|
||||
if bitneed[ch][sb] > max_bitneed:
|
||||
max_bitneed = bitneed[ch][sb]
|
||||
|
||||
#print "max_bitneed: ", max_bitneed
|
||||
|
||||
# calculate how many bitslices fit into the bitpool
|
||||
bitcount = 0
|
||||
slicecount = 0
|
||||
bitslice = max_bitneed + 1 #/* init just above the largest sf */
|
||||
|
||||
while True:
|
||||
bitslice = bitslice - 1
|
||||
bitcount = bitcount + slicecount
|
||||
slicecount = 0
|
||||
for sb in range(frame.nr_subbands):
|
||||
if (bitneed[ch][sb] > bitslice+1) and (bitneed[ch][sb] < bitslice+16):
|
||||
slicecount = slicecount + 1
|
||||
elif bitneed[ch][sb] == bitslice + 1:
|
||||
slicecount = slicecount + 2
|
||||
if bitcount + slicecount >= frame.bitpool:
|
||||
break
|
||||
|
||||
#print "bitcount %d, slicecount %d" % (bitcount, slicecount)
|
||||
|
||||
if bitcount + slicecount == frame.bitpool:
|
||||
bitcount = bitcount + slicecount
|
||||
bitslice = bitslice - 1
|
||||
|
||||
# bits are distributed until the last bitslice is reached
|
||||
for sb in range(frame.nr_subbands):
|
||||
if bitneed[ch][sb] < bitslice+2 :
|
||||
bits[ch][sb]=0;
|
||||
else:
|
||||
bits[ch][sb] = min(bitneed[ch][sb]-bitslice,16)
|
||||
|
||||
# The remaining bits are allocated starting at subband 0.
|
||||
sb = 0
|
||||
while bitcount < frame.bitpool and sb < frame.nr_subbands:
|
||||
if bits[ch][sb] >= 2 and bits[ch][sb] < 16:
|
||||
bits[ch][sb] = bits[ch][sb] + 1
|
||||
bitcount = bitcount + 1
|
||||
|
||||
elif (bitneed[ch][sb] == bitslice+1) and (frame.bitpool > bitcount+1):
|
||||
bits[ch][sb] = 2
|
||||
bitcount += 2
|
||||
|
||||
sb = sb + 1
|
||||
|
||||
|
||||
sb = 0
|
||||
while bitcount < frame.bitpool and sb < frame.nr_subbands:
|
||||
if bits[ch][sb] < 16:
|
||||
bits[ch][sb] = bits[ch][sb] + 1
|
||||
bitcount = bitcount + 1
|
||||
sb = sb + 1
|
||||
|
||||
return bits
|
||||
|
||||
def sbc_bit_allocation(frame):
|
||||
if frame.channel_mode == MONO or frame.channel_mode == DUAL_CHANNEL:
|
||||
return sbc_bit_allocation_mono_dual(frame)
|
||||
elif frame.channel_mode == STEREO or frame.channel_mode == JOINT_STEREO:
|
||||
return sbc_bit_allocation_stereo_joint(frame)
|
||||
else:
|
||||
print "Wrong channel mode ", frame.channel_mode
|
||||
return -1
|
||||
|
||||
def sbc_sampling_frequency_index(sample_rate):
|
||||
sbc_sampling_frequency_index = 0
|
||||
for i in range(len(sampling_frequency)):
|
||||
if sample_rate == sampling_frequency[i]:
|
||||
sbc_sampling_frequency_index = i
|
||||
break
|
||||
return sbc_sampling_frequency_index
|
||||
|
||||
# static uint8_t sbc_crc8(const uint8_t * data, size_t len)
|
||||
# 158 {
|
||||
# 159 uint8_t crc = 0x0f;
|
||||
# 160 size_t i;
|
||||
# 161 uint8_t octet;
|
||||
# 162
|
||||
# 163 for (i = 0; i < len / 8; i++)
|
||||
# 164 crc = crc_table[crc ^ data[i]];
|
||||
# 165
|
||||
# 166 octet = data[i];
|
||||
# 167 for (i = 0; i < len % 8; i++) {
|
||||
# 168 char bit = ((octet ^ crc) & 0x80) >> 7;
|
||||
# 169
|
||||
# 170 crc = ((crc & 0x7f) << 1) ^ (bit ? 0x1d : 0);
|
||||
# 171
|
||||
# 172 octet = octet << 1;
|
||||
# 173 }
|
||||
# 174
|
||||
# 175 return crc;
|
||||
# 176 }
|
||||
|
||||
def sbc_crc8(data, data_len):
|
||||
crc = 0x0f
|
||||
|
||||
j = 0
|
||||
for i in range(data_len / 8):
|
||||
crc = crc_table[crc ^ data[i]]
|
||||
j = i + 1
|
||||
|
||||
bits_left = data_len%8
|
||||
if bits_left:
|
||||
octet = data[j]
|
||||
for i in range(data_len%8):
|
||||
bit = ((octet ^ crc) & 0x80) >> 7
|
||||
if bit:
|
||||
bit = 0x1d
|
||||
crc = ((crc & 0x7f) << 1) ^ bit
|
||||
octet = octet << 1
|
||||
return crc
|
||||
|
||||
|
||||
bitstream = []
|
||||
bitstream_index = -1
|
||||
bitstream_bits_available = 0
|
||||
|
||||
def init_bitstream():
|
||||
global bitstream, bitstream_bits_available, bitstream_index
|
||||
bitstream = []
|
||||
bitstream_index = -1
|
||||
bitstream_bits_available = 0
|
||||
|
||||
def add_bit(bit):
|
||||
global bitstream, bitstream_bits_available, bitstream_index
|
||||
if bitstream_bits_available == 0:
|
||||
bitstream.append(0)
|
||||
bitstream_bits_available = 8
|
||||
bitstream_index += 1
|
||||
|
||||
bitstream[bitstream_index] |= bit << (bitstream_bits_available - 1)
|
||||
bitstream_bits_available -= 1
|
||||
|
||||
|
||||
def add_bits(bits, len):
|
||||
global bitstream, bitstream_bits_available
|
||||
for i in range(len):
|
||||
add_bit((bits >> (len-1-i)) & 1)
|
||||
|
||||
|
||||
def calculate_crc(frame):
|
||||
global bitstream, bitstream_bits_available, bitstream_index
|
||||
init_bitstream()
|
||||
|
||||
add_bits(frame.sampling_frequency, 2)
|
||||
add_bits(frame.nr_blocks/4-1, 2)
|
||||
add_bits(frame.channel_mode, 2)
|
||||
add_bits(frame.allocation_method, 1)
|
||||
add_bits(frame.nr_subbands/4-1, 1)
|
||||
add_bits(frame.bitpool, 8)
|
||||
|
||||
if frame.channel_mode == JOINT_STEREO:
|
||||
for sb in range(frame.nr_subbands):
|
||||
add_bits(frame.join[sb],1)
|
||||
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
add_bits(frame.scale_factor[ch][sb], 4)
|
||||
# bitstream_len = 16 + frame.nr_subbands + frame.nr_channels * frame.nr_subbands * 4
|
||||
bitstream_len = (bitstream_index + 1) * 8
|
||||
if bitstream_bits_available:
|
||||
bitstream_len += (8-bitstream_bits_available)
|
||||
return sbc_crc8(bitstream, bitstream_len)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -3,131 +3,14 @@ import numpy as np
|
||||
import wave
|
||||
import struct
|
||||
import sys
|
||||
from sbc import *
|
||||
|
||||
# channel mode
|
||||
MONO = 0
|
||||
DUAL_CHANNEL = 1
|
||||
STEREO = 2
|
||||
JOINT_STEREO = 3
|
||||
|
||||
# allocation method
|
||||
LOUDNESS = 0
|
||||
SNR = 1
|
||||
|
||||
# offset table for 4-subbands
|
||||
offset4 = np.array([[ -1, 0, 0, 0 ],
|
||||
[ -2, 0, 0, 1 ],
|
||||
[ -2, 0, 0, 1 ],
|
||||
[ -2, 0, 0, 1 ]
|
||||
])
|
||||
|
||||
# offset tables for 8-subbands
|
||||
offset8 = np.array([[ -2, 0, 0, 0, 0, 0, 0, 1 ],
|
||||
[ -3, 0, 0, 0, 0, 0, 1, 2 ],
|
||||
[ -4, 0, 0, 0, 0, 0, 1, 2 ],
|
||||
[ -4, 0, 0, 0, 0, 0, 1, 2 ]
|
||||
])
|
||||
|
||||
nr_blocks = [4, 8, 12, 16]
|
||||
nr_subbands = [4, 8]
|
||||
sampling_frequency =[16000, 32000, 44100, 48000]
|
||||
V = np.zeros(shape = (2, 10*2*8))
|
||||
|
||||
ibuffer = None
|
||||
ibuffer_count = 0
|
||||
|
||||
sb_sample = np.ndarray(shape=(16, 2, 8), dtype = np.int32)
|
||||
V = np.zeros(shape = (2, 10*2*8))
|
||||
|
||||
Proto_4_40 = [
|
||||
0.00000000E+00, 5.36548976E-04, 1.49188357E-03, 2.73370904E-03,
|
||||
3.83720193E-03, 3.89205149E-03, 1.86581691E-03, -3.06012286E-03,
|
||||
1.09137620E-02, 2.04385087E-02, 2.88757392E-02, 3.21939290E-02,
|
||||
2.58767811E-02, 6.13245186E-03, -2.88217274E-02, -7.76463494E-02,
|
||||
1.35593274E-01, 1.94987841E-01, 2.46636662E-01, 2.81828203E-01,
|
||||
2.94315332E-01, 2.81828203E-01, 2.46636662E-01, 1.94987841E-01,
|
||||
-1.35593274E-01, -7.76463494E-02, -2.88217274E-02, 6.13245186E-03,
|
||||
2.58767811E-02, 3.21939290E-02, 2.88757392E-02, 2.04385087E-02,
|
||||
-1.09137620E-02, -3.06012286E-03, 1.86581691E-03, 3.89205149E-03,
|
||||
3.83720193E-03, 2.73370904E-03, 1.49188357E-03, 5.36548976E-04
|
||||
]
|
||||
|
||||
Proto_8_80 = [
|
||||
0.00000000E+00, 1.56575398E-04, 3.43256425E-04, 5.54620202E-04,
|
||||
8.23919506E-04, 1.13992507E-03, 1.47640169E-03, 1.78371725E-03,
|
||||
2.01182542E-03, 2.10371989E-03, 1.99454554E-03, 1.61656283E-03,
|
||||
9.02154502E-04, -1.78805361E-04, -1.64973098E-03, -3.49717454E-03,
|
||||
5.65949473E-03, 8.02941163E-03, 1.04584443E-02, 1.27472335E-02,
|
||||
1.46525263E-02, 1.59045603E-02, 1.62208471E-02, 1.53184106E-02,
|
||||
1.29371806E-02, 8.85757540E-03, 2.92408442E-03, -4.91578024E-03,
|
||||
-1.46404076E-02, -2.61098752E-02, -3.90751381E-02, -5.31873032E-02,
|
||||
6.79989431E-02, 8.29847578E-02, 9.75753918E-02, 1.11196689E-01,
|
||||
1.23264548E-01, 1.33264415E-01, 1.40753505E-01, 1.45389847E-01,
|
||||
1.46955068E-01, 1.45389847E-01, 1.40753505E-01, 1.33264415E-01,
|
||||
1.23264548E-01, 1.11196689E-01, 9.75753918E-02, 8.29847578E-02,
|
||||
-6.79989431E-02, -5.31873032E-02, -3.90751381E-02, -2.61098752E-02,
|
||||
-1.46404076E-02, -4.91578024E-03, 2.92408442E-03, 8.85757540E-03,
|
||||
1.29371806E-02, 1.53184106E-02, 1.62208471E-02, 1.59045603E-02,
|
||||
1.46525263E-02, 1.27472335E-02, 1.04584443E-02, 8.02941163E-03,
|
||||
-5.65949473E-03, -3.49717454E-03, -1.64973098E-03, -1.78805361E-04,
|
||||
9.02154502E-04, 1.61656283E-03, 1.99454554E-03, 2.10371989E-03,
|
||||
2.01182542E-03, 1.78371725E-03, 1.47640169E-03, 1.13992507E-03,
|
||||
8.23919506E-04, 5.54620202E-04, 3.43256425E-04, 1.56575398E-04
|
||||
]
|
||||
|
||||
class SBCFrame:
|
||||
syncword = 0
|
||||
sampling_frequency = 0
|
||||
nr_blocks = 0
|
||||
channel_mode = 0
|
||||
nr_channels = 0
|
||||
allocation_method = 0
|
||||
nr_subbands = 0
|
||||
bitpool = 0
|
||||
crc_check = 0
|
||||
# pro subband - 1
|
||||
join = np.zeros(8, dtype = np.uint8)
|
||||
scale_factor = np.zeros(shape=(2, 8), dtype = np.int32)
|
||||
scalefactor = np.zeros(shape=(2, 8), dtype = np.int32)
|
||||
audio_sample = np.ndarray(shape = (16,2,8), dtype = np.uint16)
|
||||
X = np.zeros(8, dtype = np.int16)
|
||||
pcm = np.array([], dtype = np.int16)
|
||||
|
||||
def __init__(self):
|
||||
return
|
||||
|
||||
def __str__(self):
|
||||
res = "SBCFrameHeader:"
|
||||
res = res + "\n - syncword %d" % self.syncword
|
||||
res = res + "\n - sample frequency %d" % self.sampling_frequency
|
||||
res = res + "\n - nr blocks %d" % self.nr_blocks
|
||||
|
||||
if self.channel_mode == MONO:
|
||||
res = res + "\n - channel mode MONO"
|
||||
elif self.channel_mode == DUAL_CHANNEL:
|
||||
res = res + "\n - channel mode DUAL CHANNEL"
|
||||
elif self.channel_mode == STEREO:
|
||||
res = res + "\n - channel mode STEREO"
|
||||
elif self.channel_mode == JOINT_STEREO:
|
||||
res = res + "\n - channel mode JOINT STEREO"
|
||||
else:
|
||||
res = res + "\n - channel mode %d" % self.channel_mode
|
||||
|
||||
res = res + "\n - nr channels %d" % self.nr_channels
|
||||
|
||||
if self.allocation_method == 1:
|
||||
res = res + "\n - allocation method SNR"
|
||||
elif self.allocation_method == 0:
|
||||
res = res + "\n - allocation method LOUNDNESS"
|
||||
else:
|
||||
res = res + "\n - allocation method %d" % self.allocation_method
|
||||
|
||||
res = res + "\n - nr subbands %d" % self.nr_subbands
|
||||
res = res + "\n - bitpool %d" % self.bitpool
|
||||
res = res + "\n - crc check %d" % self.crc_check
|
||||
|
||||
return res
|
||||
|
||||
|
||||
def get_bit(fin):
|
||||
global ibuffer, ibuffer_count
|
||||
if ibuffer_count == 0:
|
||||
@ -156,207 +39,7 @@ def get_frame_sample_frequences(fin, bit_count):
|
||||
for i in range(bit_count):
|
||||
get_bit(fin)
|
||||
|
||||
|
||||
def sbc_bit_allocation_stereo_joint(fin, frame, ch):
|
||||
bitneed = np.zeros(shape=(frame.nr_channels, frame.nr_subbands))
|
||||
bits = np.zeros(shape=(frame.nr_channels, frame.nr_subbands))
|
||||
loudness = 0
|
||||
|
||||
if frame.allocation_method == SNR:
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
bitneed[ch][sb] = frame.scale_factor[ch][sb]
|
||||
else:
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
if frame.scale_factor[ch][sb] == 0:
|
||||
bitneed[ch][sb] = -5
|
||||
else:
|
||||
if frame.nr_subbands == 4:
|
||||
loudness = scale_factor[ch][sb] - offset4[frame.sampling_frequency][sb]
|
||||
|
||||
else:
|
||||
if frame.nr_subbands == 4:
|
||||
loudness = frame.scale_factor[ch][sb] - offset4[frame.sampling_frequency][sb]
|
||||
else:
|
||||
loudness = frame.scale_factor[ch][sb] - offset8[frame.sampling_frequency][sb]
|
||||
if loudness > 0:
|
||||
bitneed[ch][sb] = loudness/2
|
||||
else:
|
||||
bitneed[ch][sb] = loudness
|
||||
|
||||
# search the maximum bitneed index
|
||||
max_bitneed = 0
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
if bitneed[ch][sb] > max_bitneed:
|
||||
max_bitneed = bitneed[ch][sb]
|
||||
|
||||
# # print "max_bitneed: ", max_bitneed
|
||||
|
||||
# calculate how many bitslices fit into the bitpool
|
||||
bitcount = 0
|
||||
slicecount = 0
|
||||
bitslice = max_bitneed + 1 #/* init just above the largest sf */
|
||||
|
||||
while True:
|
||||
bitslice = bitslice - 1
|
||||
bitcount = bitcount + slicecount
|
||||
slicecount = 0
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
if (bitneed[ch][sb] > bitslice+1) and (bitneed[ch][sb] < bitslice+16):
|
||||
slicecount = slicecount + 1
|
||||
elif bitneed[ch][sb] == bitslice + 1:
|
||||
slicecount = slicecount + 2
|
||||
if bitcount + slicecount >= frame.bitpool:
|
||||
break
|
||||
|
||||
# print "bitcount %d, slicecount %d" % (bitcount, slicecount)
|
||||
|
||||
if bitcount + slicecount == frame.bitpool:
|
||||
bitcount = bitcount + slicecount
|
||||
bitslice = bitslice - 1
|
||||
|
||||
# bits are distributed until the last bitslice is reached
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
if bitneed[ch][sb] < bitslice+2 :
|
||||
bits[ch][sb]=0;
|
||||
else:
|
||||
bits[ch][sb] = min(bitneed[ch][sb]-bitslice,16)
|
||||
|
||||
|
||||
ch = 0
|
||||
sb = 0
|
||||
while bitcount < frame.bitpool and sb < frame.nr_subbands:
|
||||
if bits[ch][sb] >= 2 and bits[ch][sb] < 16:
|
||||
bits[ch][sb] = bits[ch][sb] + 1
|
||||
bitcount = bitcount + 1
|
||||
|
||||
elif (bitneed[ch][sb] == bitslice+1) and (frame.bitpool > bitcount+1):
|
||||
bits[ch][sb] = 2
|
||||
bitcount += 2
|
||||
|
||||
if ch == 1:
|
||||
ch = 0
|
||||
sb = sb + 1
|
||||
else:
|
||||
ch = 1
|
||||
|
||||
ch = 0
|
||||
sb = 0
|
||||
while bitcount < frame.bitpool and sb < frame.nr_subbands:
|
||||
if bits[ch][sb] < 16:
|
||||
bits[ch][sb] = bits[ch][sb] + 1
|
||||
bitcount = bitcount + 1
|
||||
if ch == 1:
|
||||
ch = 0
|
||||
sb = sb + 1
|
||||
else:
|
||||
ch = 1
|
||||
|
||||
return bits
|
||||
|
||||
|
||||
def sbc_bit_allocation_mono_dual(fin, frame):
|
||||
#print "Bit allocation for mono/dual channel"
|
||||
bitneed = np.zeros(shape=(frame.nr_channels, frame.nr_subbands), dtype = np.int32)
|
||||
bits = np.zeros(shape=(frame.nr_channels, frame.nr_subbands), dtype = np.int32)
|
||||
loudness = 0
|
||||
|
||||
for ch in range(frame.nr_channels):
|
||||
# bitneed values are derived from the scale factors
|
||||
if frame.allocation_method == SNR:
|
||||
for sb in range(frame.nr_subbands):
|
||||
bitneed[ch][sb] = frame.scale_factor[ch][sb]
|
||||
else:
|
||||
for sb in range(frame.nr_subbands):
|
||||
if frame.scale_factor[ch][sb] == 0:
|
||||
bitneed[ch][sb] = -5
|
||||
else:
|
||||
if frame.nr_subbands == 4:
|
||||
loudness = frame.scale_factor[ch][sb] - offset4[frame.sampling_frequency][sb]
|
||||
else:
|
||||
loudness = frame.scale_factor[ch][sb] - offset8[frame.sampling_frequency][sb]
|
||||
if loudness > 0:
|
||||
bitneed[ch][sb] = loudness/2
|
||||
else:
|
||||
bitneed[ch][sb] = loudness
|
||||
|
||||
# search the maximum bitneed index
|
||||
max_bitneed = 0
|
||||
for sb in range(frame.nr_subbands):
|
||||
if bitneed[ch][sb] > max_bitneed:
|
||||
max_bitneed = bitneed[ch][sb]
|
||||
|
||||
#print "max_bitneed: ", max_bitneed
|
||||
|
||||
# calculate how many bitslices fit into the bitpool
|
||||
bitcount = 0
|
||||
slicecount = 0
|
||||
bitslice = max_bitneed + 1 #/* init just above the largest sf */
|
||||
|
||||
while True:
|
||||
bitslice = bitslice - 1
|
||||
bitcount = bitcount + slicecount
|
||||
slicecount = 0
|
||||
for sb in range(frame.nr_subbands):
|
||||
if (bitneed[ch][sb] > bitslice+1) and (bitneed[ch][sb] < bitslice+16):
|
||||
slicecount = slicecount + 1
|
||||
elif bitneed[ch][sb] == bitslice + 1:
|
||||
slicecount = slicecount + 2
|
||||
if bitcount + slicecount >= frame.bitpool:
|
||||
break
|
||||
|
||||
#print "bitcount %d, slicecount %d" % (bitcount, slicecount)
|
||||
|
||||
if bitcount + slicecount == frame.bitpool:
|
||||
bitcount = bitcount + slicecount
|
||||
bitslice = bitslice - 1
|
||||
|
||||
# bits are distributed until the last bitslice is reached
|
||||
for sb in range(frame.nr_subbands):
|
||||
if bitneed[ch][sb] < bitslice+2 :
|
||||
bits[ch][sb]=0;
|
||||
else:
|
||||
bits[ch][sb] = min(bitneed[ch][sb]-bitslice,16)
|
||||
|
||||
# The remaining bits are allocated starting at subband 0.
|
||||
sb = 0
|
||||
while bitcount < frame.bitpool and sb < frame.nr_subbands:
|
||||
if bits[ch][sb] >= 2 and bits[ch][sb] < 16:
|
||||
bits[ch][sb] = bits[ch][sb] + 1
|
||||
bitcount = bitcount + 1
|
||||
|
||||
elif (bitneed[ch][sb] == bitslice+1) and (frame.bitpool > bitcount+1):
|
||||
bits[ch][sb] = 2
|
||||
bitcount += 2
|
||||
|
||||
sb = sb + 1
|
||||
|
||||
|
||||
sb = 0
|
||||
while bitcount < frame.bitpool and sb < frame.nr_subbands:
|
||||
if bits[ch][sb] < 16:
|
||||
bits[ch][sb] = bits[ch][sb] + 1
|
||||
bitcount = bitcount + 1
|
||||
sb = sb + 1
|
||||
|
||||
return bits
|
||||
|
||||
def sbc_bit_allocation(fin, frame):
|
||||
if frame.channel_mode == MONO or frame.channel_mode == DUAL_CHANNEL:
|
||||
return sbc_bit_allocation_mono_dual(fin,frame)
|
||||
elif frame.channel_mode == STEREO or frame.channel_mode == JOINT_STEREO:
|
||||
return sbc_bit_allocation_stereo_joint(fin, frame)
|
||||
else:
|
||||
print "Wrong channel mode ", frame.channel_mode
|
||||
return -1
|
||||
|
||||
def sbc_process_frame(fin, frame):
|
||||
global sb_sample
|
||||
|
||||
def sbc_unpack_frame(fin, frame):
|
||||
frame.syncword = get_bits(fin,8)
|
||||
if frame.syncword != 156:
|
||||
print "incorrect syncword ", frame.syncword
|
||||
@ -372,7 +55,6 @@ def sbc_process_frame(fin, frame):
|
||||
|
||||
frame.allocation_method = get_bits(fin,1)
|
||||
frame.nr_subbands = nr_subbands[get_bits(fin,1)]
|
||||
|
||||
frame.bitpool = get_bits(fin,8)
|
||||
frame.crc_check = get_bits(fin,8)
|
||||
|
||||
@ -387,6 +69,7 @@ def sbc_process_frame(fin, frame):
|
||||
# print frame
|
||||
|
||||
frame.scale_factor = np.zeros(shape=(frame.nr_channels, frame.nr_subbands), dtype = np.int32)
|
||||
frame.scalefactor = np.zeros(shape=(frame.nr_channels, frame.nr_subbands), dtype = np.int32)
|
||||
frame.audio_sample = np.ndarray(shape=(frame.nr_blocks, frame.nr_channels, frame.nr_subbands), dtype = np.uint16)
|
||||
|
||||
# print frame.audio_sample
|
||||
@ -394,19 +77,24 @@ def sbc_process_frame(fin, frame):
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
frame.scale_factor[ch][sb] = get_bits(fin, 4)
|
||||
|
||||
crc = calculate_crc(frame)
|
||||
if crc != frame.crc_check:
|
||||
print "error, crc not equal: ", crc, frame.crc_check
|
||||
exit(1)
|
||||
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
frame.scalefactor[ch][sb] = 1 << (frame.scale_factor[ch][sb] + 1)
|
||||
|
||||
bits = sbc_bit_allocation(fin, frame)
|
||||
frame.bits = sbc_bit_allocation(frame)
|
||||
# print "bits: ", bits
|
||||
#print "Nr blocks ", frame.nr_blocks, frame.nr_channels, frame.nr_subbands
|
||||
|
||||
for blk in range(frame.nr_blocks):
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
frame.audio_sample[blk][ch][sb] = get_bits(fin, bits[ch][sb])
|
||||
frame.audio_sample[blk][ch][sb] = get_bits(fin, frame.bits[ch][sb])
|
||||
# print "block %2d - audio sample: %s" % (blk, frame.audio_sample[blk][0])
|
||||
|
||||
# add padding
|
||||
@ -414,46 +102,54 @@ def sbc_process_frame(fin, frame):
|
||||
|
||||
# Reconstruct the Subband Samples
|
||||
|
||||
levels = np.zeros(shape=(frame.nr_channels, frame.nr_subbands), dtype = np.int32)
|
||||
sb_sample = np.ndarray(shape=(frame.nr_blocks, frame.nr_channels, frame.nr_subbands))
|
||||
frame.levels = np.zeros(shape=(frame.nr_channels, frame.nr_subbands), dtype = np.int32)
|
||||
frame.sb_sample = np.zeros(shape=(frame.nr_blocks, frame.nr_channels, frame.nr_subbands))
|
||||
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
levels[ch][sb] = pow(2.0, bits[ch][sb]) - 1
|
||||
# print "Levels: ", levels
|
||||
|
||||
frame.levels[ch][sb] = pow(2.0, frame.bits[ch][sb]) - 1
|
||||
|
||||
for blk in range(frame.nr_blocks):
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
if levels[ch][sb] > 0:
|
||||
sb_sample[blk][ch][sb] = frame.scalefactor[ch][sb] * ((frame.audio_sample[blk][ch][sb]*2.0+1.0) / levels[ch][sb] -1.0 )
|
||||
if frame.levels[ch][sb] > 0:
|
||||
AS = frame.audio_sample[blk][ch][sb]
|
||||
SF = frame.scalefactor[ch][sb]
|
||||
L = frame.levels[ch][sb]
|
||||
|
||||
SB = SF * ((AS*2.0+1.0) / L -1.0 )
|
||||
# if sb == 3:
|
||||
# print "decoder SF, L, AS ", SF, L, AS
|
||||
frame.sb_sample[blk][ch][sb] = SB
|
||||
# tmpa = (((frame.audio_sample[blk][ch][sb] << 16) | 0x8000) / levels[ch][sb] ) - 0x8000
|
||||
# tmpb = tmpa >> 3
|
||||
# sb_sample[blk][ch][sb] = tmpb * frame.scalefactor[ch][sb]
|
||||
# frame.sb_sample[blk][ch][sb] = tmpb * frame.scalefactor[ch][sb]
|
||||
|
||||
else:
|
||||
sb_sample[blk][ch][sb] = 0
|
||||
frame.sb_sample[blk][ch][sb] = 0
|
||||
|
||||
# sythesis filter
|
||||
if frame.channel_mode == JOINT_STEREO:
|
||||
for blk in range(frame.nr_blocks):
|
||||
for sb in range(frame.nr_subbands):
|
||||
if frame.join[sb]==1:
|
||||
ch_a = sb_sample[blk][0][sb] + sb_sample[blk][1][sb]
|
||||
ch_b = sb_sample[blk][0][sb] - sb_sample[blk][1][sb]
|
||||
sb_sample[blk][0][sb] = ch_a
|
||||
sb_sample[blk][1][sb] = ch_b
|
||||
ch_a = frame.sb_sample[blk][0][sb] + frame.sb_sample[blk][1][sb]
|
||||
ch_b = frame.sb_sample[blk][0][sb] - frame.sb_sample[blk][1][sb]
|
||||
frame.sb_sample[blk][0][sb] = ch_a
|
||||
frame.sb_sample[blk][1][sb] = ch_b
|
||||
|
||||
|
||||
# print "Scale factors ", frame.scale_factor[0]
|
||||
|
||||
# print "\nReconstructed subband samples: "
|
||||
# for blk in range(frame.nr_blocks):
|
||||
# print "block %2d - recon. sample: %s" % (blk, sb_sample[blk][0])
|
||||
# print "block %2d - recon. sample: %s" % (blk, frame.sb_sample[blk][0])
|
||||
# print
|
||||
return 0
|
||||
|
||||
|
||||
def sbc_synthesis(frame, ch, blk, proto_table):
|
||||
global V, sb_sample
|
||||
global V
|
||||
M = frame.nr_subbands
|
||||
L = 10 * M
|
||||
M2 = 2*M
|
||||
@ -465,7 +161,7 @@ def sbc_synthesis(frame, ch, blk, proto_table):
|
||||
frame.X = np.zeros(M)
|
||||
|
||||
for i in range(M):
|
||||
S[i] = sb_sample[blk][ch][i]
|
||||
S[i] = frame.sb_sample[blk][ch][i]
|
||||
|
||||
for i in range(L2-1, M2-1,-1):
|
||||
V[ch][i] = V[ch][i-M2]
|
||||
@ -518,59 +214,59 @@ def write_wav_file(fout, sample):
|
||||
fout.writeframes(value_str)
|
||||
|
||||
|
||||
usage = '''
|
||||
Usage: ./sbc_decoder.py input.sbc
|
||||
'''
|
||||
# usage = '''
|
||||
# Usage: ./sbc_decoder.py input.sbc
|
||||
# '''
|
||||
|
||||
if (len(sys.argv) < 2):
|
||||
print(usage)
|
||||
sys.exit(1)
|
||||
try:
|
||||
infile = sys.argv[1]
|
||||
if not infile.endswith('.sbc'):
|
||||
print(usage)
|
||||
sys.exit(1)
|
||||
# if (len(sys.argv) < 2):
|
||||
# print(usage)
|
||||
# sys.exit(1)
|
||||
# try:
|
||||
# infile = sys.argv[1]
|
||||
# if not infile.endswith('.sbc'):
|
||||
# print(usage)
|
||||
# sys.exit(1)
|
||||
|
||||
wavfile = infile.replace('.sbc', '-decoded.wav')
|
||||
# wavfile = infile.replace('.sbc', '-decoded.wav')
|
||||
|
||||
with open (infile, 'rb') as fin:
|
||||
try:
|
||||
frame_count = 0
|
||||
while True:
|
||||
sbc_frame = SBCFrame()
|
||||
if frame_count % 200 == 0:
|
||||
print "== Frame %d ==" % (frame_count)
|
||||
err = sbc_process_frame(fin, sbc_frame)
|
||||
if err:
|
||||
print "error, frame_count: ", frame_count
|
||||
break
|
||||
# with open (infile, 'rb') as fin:
|
||||
# try:
|
||||
# frame_count = 0
|
||||
# while True:
|
||||
# sbc_frame = SBCFrame(0,0,0,0,0)
|
||||
# if frame_count % 200 == 0:
|
||||
# print "== Frame %d ==" % (frame_count)
|
||||
# err = sbc_unpack_frame(fin, sbc_frame)
|
||||
# if err:
|
||||
# print "error, frame_count: ", frame_count
|
||||
# break
|
||||
|
||||
sbc_decode(sbc_frame)
|
||||
frame_count += 1
|
||||
# print sbc_frame.pcm
|
||||
# sbc_decode(sbc_frame)
|
||||
# # print sbc_frame.pcm
|
||||
|
||||
if frame_count == 1:
|
||||
fout = wave.open(wavfile, 'w')
|
||||
fout.setnchannels(sbc_frame.nr_channels)
|
||||
fout.setsampwidth(2)
|
||||
fout.setframerate(sampling_frequency[sbc_frame.sampling_frequency])
|
||||
fout.setnframes(0)
|
||||
fout.setcomptype = 'NONE'
|
||||
# if frame_count == 0:
|
||||
# fout = wave.open(wavfile, 'w')
|
||||
# fout.setnchannels(sbc_frame.nr_channels)
|
||||
# fout.setsampwidth(2)
|
||||
# fout.setframerate(sampling_frequency[sbc_frame.sampling_frequency])
|
||||
# fout.setnframes(0)
|
||||
# fout.setcomptype = 'NONE'
|
||||
|
||||
write_wav_file(fout, sbc_frame.pcm)
|
||||
# write_wav_file(fout, sbc_frame.pcm)
|
||||
# frame_count += 1
|
||||
|
||||
# # if frame_count == 8:
|
||||
# # fout.close()
|
||||
# # break
|
||||
|
||||
# if frame_count == 8:
|
||||
# fout.close()
|
||||
# break
|
||||
# except TypeError:
|
||||
# fout.close()
|
||||
# print "DONE, SBC file %s decoded into WAV file %s ", (infile, wavfile)
|
||||
# exit(0)
|
||||
|
||||
except TypeError:
|
||||
fout.close()
|
||||
print "DONE, SBC file %s decoded into WAV file %s ", (infile, wavfile)
|
||||
exit(0)
|
||||
|
||||
except IOError as e:
|
||||
print(usage)
|
||||
sys.exit(1)
|
||||
# except IOError as e:
|
||||
# print(usage)
|
||||
# sys.exit(1)
|
||||
|
||||
|
||||
|
||||
|
238
test/sbc/sbc_encoder.py
Executable file
238
test/sbc/sbc_encoder.py
Executable file
@ -0,0 +1,238 @@
|
||||
#!/usr/bin/env python
|
||||
import numpy as np
|
||||
import wave
|
||||
import struct
|
||||
import sys
|
||||
from sbc import *
|
||||
|
||||
X = np.zeros(40)
|
||||
|
||||
def fetch_samples_for_next_sbc_frame(fin, nr_audio_frames, frame):
|
||||
raw_data = fin.readframes(nr_audio_frames) # Returns byte data
|
||||
|
||||
total_samples = nr_audio_frames * frame.nr_channels
|
||||
fmt = "%ih" % total_samples # read signed 2 byte shorts
|
||||
|
||||
frame.pcm = np.array(struct.unpack(fmt, raw_data))
|
||||
del raw_data
|
||||
|
||||
# channels = [ [] for ch in range(frame.nr_channels) ]
|
||||
# for index, value in enumerate(integer_data):
|
||||
# bucket = index % nr_channels
|
||||
# channels[bucket].append(value)
|
||||
|
||||
|
||||
def sbc_analyse(frame, ch, blk, C, debug):
|
||||
global X
|
||||
M = frame.nr_subbands
|
||||
L = 10 * M
|
||||
M2 = 2*M
|
||||
L2 = 2*L
|
||||
|
||||
Z = np.zeros(L)
|
||||
Y = np.zeros(M2)
|
||||
W = np.zeros(shape=(M, M2))
|
||||
S = np.zeros(M)
|
||||
|
||||
for i in range(L-1, M-1, -1):
|
||||
X[i] = X[i-M]
|
||||
for i in range(M-1, -1, -1):
|
||||
X[i] = frame.EX[M-1-i]
|
||||
|
||||
for i in range(L):
|
||||
Z[i] = X[i] * C[i]
|
||||
|
||||
for i in range(M2):
|
||||
for k in range(5):
|
||||
Y[i] += Z[i+k*8]
|
||||
|
||||
for i in range(M):
|
||||
for k in range(M2):
|
||||
W[i][k] = np.cos((i+0.5)*(k-2)*np.pi/M)
|
||||
S[i] += W[i][k] * Y[k]
|
||||
|
||||
if debug:
|
||||
#print "EX:", frame.EX
|
||||
print "X:", X
|
||||
print "Z:"
|
||||
print "Y:", Y
|
||||
print "W:", W
|
||||
print "S:", S
|
||||
|
||||
for sb in range(M):
|
||||
frame.sb_sample[blk][ch][sb] = S[sb]
|
||||
|
||||
|
||||
def sbc_encode(frame,debug):
|
||||
if frame.nr_subbands == 4:
|
||||
proto_table = Proto_4_40
|
||||
elif frame.nr_subbands == 8:
|
||||
proto_table = Proto_8_80
|
||||
else:
|
||||
return -1
|
||||
|
||||
frame.sb_sample = np.ndarray(shape=(frame.nr_blocks, frame.nr_channels, frame.nr_subbands))
|
||||
|
||||
# channels = [ [] for ch in range(frame.nr_channels) ]
|
||||
# for index, value in enumerate(frame.pcm):
|
||||
# bucket = index % frame.nr_channels
|
||||
# channels[bucket].append(value)
|
||||
|
||||
# print "encoder pcm ", frame.pcm
|
||||
|
||||
index = 0
|
||||
for ch in range(frame.nr_channels):
|
||||
for blk in range(frame.nr_blocks):
|
||||
for sb in range(frame.nr_subbands):
|
||||
frame.EX[sb] = frame.pcm[index] #channels[ch][blk * frame.nr_subbands + sb]
|
||||
index+=1
|
||||
sbc_analyse(frame, ch, blk, proto_table,debug)
|
||||
sbc_quantization(frame)
|
||||
|
||||
|
||||
def should_use_joint_coding(frame):
|
||||
return False
|
||||
|
||||
def calculate_scalefactor(max_subbandsample):
|
||||
x = 0
|
||||
while True:
|
||||
y = 1 << x + 1
|
||||
if y > max_subbandsample:
|
||||
break
|
||||
x += 1
|
||||
return (x,y)
|
||||
|
||||
|
||||
def frame_to_bitstream(frame):
|
||||
global bitstream, bitstream_bits_available
|
||||
init_bitstream()
|
||||
|
||||
add_bits(frame.syncword, 8)
|
||||
add_bits(frame.sampling_frequency, 2)
|
||||
add_bits(frame.nr_blocks/4-1, 2)
|
||||
add_bits(frame.channel_mode, 2)
|
||||
add_bits(frame.allocation_method, 1)
|
||||
add_bits(frame.nr_subbands/4-1, 1)
|
||||
add_bits(frame.bitpool, 8)
|
||||
add_bits(frame.crc_check, 8)
|
||||
|
||||
for sb in range(frame.nr_subbands):
|
||||
add_bits(frame.join[sb],1)
|
||||
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
add_bits(frame.scale_factor[ch][sb], 4)
|
||||
|
||||
for blk in range(frame.nr_blocks):
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
add_bits(frame.audio_sample[blk][ch][sb], frame.bits[ch][sb])
|
||||
|
||||
# bitstream_len = 16 + frame.nr_subbands + frame.nr_channels * frame.nr_subbands * 4
|
||||
return bitstream
|
||||
|
||||
def sbc_quantization(frame):
|
||||
|
||||
frame.join = np.zeros(frame.nr_subbands, dtype = np.uint8)
|
||||
if should_use_joint_coding(frame):
|
||||
return
|
||||
|
||||
max_subbandsample = np.zeros(shape = (frame.nr_channels, frame.nr_subbands))
|
||||
|
||||
for blk in range(frame.nr_blocks):
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
m = abs(frame.sb_sample[blk][ch][sb])
|
||||
if max_subbandsample[ch][sb] < m:
|
||||
max_subbandsample[ch][sb] = m
|
||||
|
||||
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
(frame.scale_factor[ch][sb], frame.scalefactor[ch][sb]) = calculate_scalefactor(max_subbandsample[ch][sb])
|
||||
|
||||
frame.bits = sbc_bit_allocation(frame)
|
||||
|
||||
# Reconstruct the Audio Samples
|
||||
frame.levels = np.zeros(shape=(frame.nr_channels, frame.nr_subbands), dtype = np.int32)
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
frame.levels[ch][sb] = pow(2.0, frame.bits[ch][sb]) - 1
|
||||
|
||||
frame.syncword = 156
|
||||
frame.crc_check = calculate_crc(frame)
|
||||
|
||||
for blk in range(frame.nr_blocks):
|
||||
for ch in range(frame.nr_channels):
|
||||
for sb in range(frame.nr_subbands):
|
||||
if frame.levels[ch][sb] > 0:
|
||||
SB = frame.sb_sample[blk][ch][sb]
|
||||
SF = frame.scalefactor[ch][sb]
|
||||
L = frame.levels[ch][sb]
|
||||
AS = np.uint16(((SB * L / SF + L) - 1.0)/2.0)
|
||||
frame.audio_sample[blk][ch][sb] = AS
|
||||
else:
|
||||
frame.audio_sample[blk][ch][sb] = 0
|
||||
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
# usage = '''
|
||||
# Usage: ./sbc_encoder.py input.wav block_size nr_subbands bitpool
|
||||
# '''
|
||||
# nr_blocks = 0
|
||||
# nr_subbands = 0
|
||||
|
||||
# if (len(sys.argv) < 5):
|
||||
# print(usage)
|
||||
# sys.exit(1)
|
||||
# try:
|
||||
# infile = sys.argv[1]
|
||||
# if not infile.endswith('.wav'):
|
||||
# print(usage)
|
||||
# sys.exit(1)
|
||||
# nr_blocks = int(sys.argv[2])
|
||||
# nr_subbands = int(sys.argv[3])
|
||||
# bitpool = int(sys.argv[4])
|
||||
# sbcfile = infile.replace('.wav', '-encoded.sbc')
|
||||
|
||||
# fin = wave.open(infile, 'rb')
|
||||
|
||||
# wav_nr_channels = fin.getnchannels()
|
||||
# wav_sample_rate = fin.getframerate()
|
||||
# wav_nr_frames = fin.getnframes()
|
||||
# sbc_sampling_frequency = sbc_sampling_frequency_index(wav_sample_rate)
|
||||
|
||||
# sbc_frame_count = 0
|
||||
# audio_frame_count = 0
|
||||
|
||||
# while audio_frame_count < wav_nr_frames:
|
||||
# if sbc_frame_count % 200 == 0:
|
||||
# print "== Frame %d ==" % (sbc_frame_count)
|
||||
|
||||
# sbc_encoder_frame = SBCFrame(nr_blocks, nr_subbands, wav_nr_channels, sbc_sampling_frequency, bitpool)
|
||||
|
||||
# wav_nr_audio_frames = sbc_encoder_frame.nr_blocks * sbc_encoder_frame.nr_subbands
|
||||
# fetch_samples_for_next_sbc_frame(fin, wav_nr_audio_frames, sbc_encoder_frame)
|
||||
# sbc_encode(sbc_encoder_frame)
|
||||
|
||||
# # stream = frame_to_bitstream(frame)
|
||||
# audio_frame_count += wav_nr_audio_frames
|
||||
# sbc_frame_count += 1
|
||||
|
||||
# # except TypeError:
|
||||
# # fin.close()
|
||||
# # print "DONE, WAV file %s encoded into SBC file %s ", (infile, sbcfile)
|
||||
|
||||
# #channels, num_audio_frames, wav_nr_channels, wav_sample_rate = read_waw_file(wavfile)
|
||||
|
||||
|
||||
# except IOError as e:
|
||||
# print(usage)
|
||||
# sys.exit(1)
|
||||
|
||||
|
||||
|
||||
|
||||
|
184
test/sbc/sbc_test.py
Executable file
184
test/sbc/sbc_test.py
Executable file
@ -0,0 +1,184 @@
|
||||
#!/usr/bin/env python
|
||||
import numpy as np
|
||||
import wave
|
||||
import struct
|
||||
import sys
|
||||
from sbc import *
|
||||
from sbc_encoder import *
|
||||
from sbc_decoder import *
|
||||
|
||||
error = 100
|
||||
max_error = -1
|
||||
|
||||
def dump_samples(a,b,nr_blocks):
|
||||
for blk in range(nr_blocks):
|
||||
print "D%02d %s" %(blk, b[blk][0])
|
||||
print "E%02d %s" %(blk, a[blk][0])
|
||||
print "F%02d %s" %(blk, a[blk][0] - b[blk][0])
|
||||
print
|
||||
|
||||
def mse(a,b):
|
||||
count = 1
|
||||
for i in a.shape:
|
||||
count *= i
|
||||
delta = a - b
|
||||
sqr = delta ** 2
|
||||
# print "count", count
|
||||
# print "sqr", sqr
|
||||
# print "sum", sqr.sum()
|
||||
res = sqr.sum()*1.0/count
|
||||
# print "mse", res
|
||||
return res
|
||||
|
||||
def check_equal(encoder, decoder, frame_count):
|
||||
global max_error
|
||||
M = mse(encoder.sb_sample, decoder.sb_sample)
|
||||
# print "sb_sample error", M
|
||||
# dump_samples(encoder.sb_sample, decoder.sb_sample, encoder.nr_blocks)
|
||||
if M > max_error:
|
||||
max_error = M
|
||||
print "sb_sample error ", frame_count, max_error, M
|
||||
if M > error:
|
||||
#print "sb_sample error ", frame_count, M
|
||||
#dump_samples(encoder.sb_sample, decoder.sb_sample, encoder.nr_blocks)
|
||||
return -1
|
||||
|
||||
return
|
||||
|
||||
if mse(encoder.pcm, decoder.pcm) > error:
|
||||
print "pcm wrong ", encoder.pcm, decoder.pcm
|
||||
return -1
|
||||
|
||||
if encoder.syncword != decoder.syncword:
|
||||
print "syncword wrong ", encoder.syncword
|
||||
return -1
|
||||
|
||||
if encoder.sampling_frequency != decoder.sampling_frequency:
|
||||
print "sampling_frequency wrong ", encoder.sampling_frequency
|
||||
return -1
|
||||
|
||||
if encoder.nr_blocks != decoder.nr_blocks:
|
||||
print "nr_blocks wrong ", encoder.nr_blocks
|
||||
return -1
|
||||
|
||||
if encoder.channel_mode != decoder.channel_mode:
|
||||
print "channel_mode wrong ", encoder.channel_mode
|
||||
return -1
|
||||
|
||||
if encoder.nr_channels != decoder.nr_channels:
|
||||
print "nr_channels wrong ", encoder.nr_channels
|
||||
return -1
|
||||
|
||||
if encoder.allocation_method != decoder.allocation_method:
|
||||
print "allocation_method wrong ", encoder.allocation_method
|
||||
return -1
|
||||
|
||||
if encoder.nr_subbands != decoder.nr_subbands:
|
||||
print "nr_subbands wrong ", encoder.nr_subbands
|
||||
return -1
|
||||
|
||||
if encoder.bitpool != decoder.bitpool:
|
||||
print "bitpool wrong (E: %d, D: %d)" % (encoder.bitpool, decoder.bitpool)
|
||||
return -1
|
||||
|
||||
|
||||
if mse(encoder.join, decoder.join) > 0:
|
||||
print "join error \nE:\n %s \nD:\n %s" % (encoder.join, decoder.join)
|
||||
return -1
|
||||
|
||||
if mse(encoder.scale_factor, decoder.scale_factor) > 0:
|
||||
print "scale_factor error \nE:\n %s \nD:\n %s" % (encoder.scale_factor, decoder.scale_factor)
|
||||
return -1
|
||||
|
||||
if mse(encoder.scalefactor, decoder.scalefactor) > 0:
|
||||
print "scalefactor error \nE:\n %s \nD:\n %s" % (encoder.scalefactor, decoder.scalefactor)
|
||||
return -1
|
||||
|
||||
if mse(encoder.bits, decoder.bits) > 0:
|
||||
print "bits error \nE:\n %s \nD:\n %s" % (encoder.bits, decoder.bits)
|
||||
return -1
|
||||
|
||||
if mse(encoder.levels, decoder.levels) > 0:
|
||||
print "levels error \nE:\n %s \nD:\n %s" % (encoder.levels, decoder.levels)
|
||||
return -1
|
||||
if mse(encoder.audio_sample, decoder.audio_sample) > error:
|
||||
#print "audio_sample diff %s" % (encoder.audio_sample-decoder.audio_sample)
|
||||
#print "audio_sample error \nE:\n %s \nD:\n %s" % (encoder.audio_sample, decoder.audio_sample)
|
||||
print "audio_sample error"
|
||||
dump_samples(encoder.audio_sample, decoder.audio_sample, encoder.nr_blocks)
|
||||
return -1
|
||||
|
||||
if encoder.crc_check != decoder.crc_check:
|
||||
print "crc_check wrong (E: %d, D: %d)" % (encoder.crc_check, decoder.crc_check)
|
||||
return -1
|
||||
|
||||
return
|
||||
|
||||
|
||||
usage = '''
|
||||
Usage: ./sbc_test.py input.sbc
|
||||
'''
|
||||
|
||||
if (len(sys.argv) < 2):
|
||||
print(usage)
|
||||
sys.exit(1)
|
||||
try:
|
||||
infile = sys.argv[1]
|
||||
if not infile.endswith('.sbc'):
|
||||
print(usage)
|
||||
sys.exit(1)
|
||||
|
||||
with open (infile, 'rb') as fin:
|
||||
try:
|
||||
frame_count = 0
|
||||
while True:
|
||||
sbc_decoder_frame = SBCFrame(0,0,0,0,0)
|
||||
if frame_count % 200 == 0:
|
||||
print "== Frame %d ==" % (frame_count)
|
||||
|
||||
err = sbc_unpack_frame(fin, sbc_decoder_frame)
|
||||
if err:
|
||||
print "error, frame_count: ", frame_count
|
||||
break
|
||||
|
||||
sbc_decode(sbc_decoder_frame) # => pcm
|
||||
|
||||
|
||||
sbc_encoder_frame = SBCFrame( sbc_decoder_frame.nr_blocks,
|
||||
sbc_decoder_frame.nr_subbands,
|
||||
sbc_decoder_frame.nr_channels,
|
||||
sbc_decoder_frame.sampling_frequency,
|
||||
sbc_decoder_frame.bitpool)
|
||||
|
||||
sbc_encoder_frame.pcm = np.array(sbc_decoder_frame.pcm)
|
||||
# TODO: joi field
|
||||
# TODO: clear memory
|
||||
|
||||
# sbc_encoder_frame.sb_sample = np.array(sbc_decoder_frame.sb_sample)
|
||||
# sbc_quantization(sbc_encoder_frame)
|
||||
|
||||
# print "encoder pcm ", sbc_encoder_frame.pcm
|
||||
sbc_encode(sbc_encoder_frame,frame_count >86)
|
||||
|
||||
# test
|
||||
err = check_equal(sbc_encoder_frame, sbc_decoder_frame, frame_count)
|
||||
if err:
|
||||
print "error, frames not equal, frame_count: ", frame_count
|
||||
break
|
||||
|
||||
# if frame_count == 0:
|
||||
# break
|
||||
frame_count += 1
|
||||
|
||||
except TypeError:
|
||||
print "DONE"
|
||||
exit(0)
|
||||
|
||||
except IOError as e:
|
||||
print(usage)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user