mirror of
https://github.com/bluekitchen/btstack.git
synced 2025-02-14 09:39:55 +00:00
198 lines
4.7 KiB
Python
Executable File
198 lines
4.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
#
|
|
# Copyright 2022 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
import numpy as np
|
|
import scipy.signal as signal
|
|
import scipy.io.wavfile as wavfile
|
|
import struct
|
|
import argparse
|
|
|
|
import lc3
|
|
import tables as T, appendix_c as C
|
|
|
|
import mdct, energy, bwdet, sns, tns, spec, ltpf
|
|
import bitstream
|
|
|
|
### ------------------------------------------------------------------------ ###
|
|
|
|
class Decoder:
|
|
|
|
def __init__(self, dt_ms, sr_hz):
|
|
|
|
dt = { 7.5: T.DT_7M5, 10: T.DT_10M }[dt_ms]
|
|
|
|
sr = { 8000: T.SRATE_8K , 16000: T.SRATE_16K, 24000: T.SRATE_24K,
|
|
32000: T.SRATE_32K, 48000: T.SRATE_48K }[sr_hz]
|
|
|
|
self.sr = sr
|
|
self.ne = T.NE[dt][sr]
|
|
self.ns = T.NS[dt][sr]
|
|
|
|
self.mdct = mdct.MdctInverse(dt, sr)
|
|
|
|
self.bwdet = bwdet.BandwidthDetector(dt, sr)
|
|
self.spec = spec.SpectrumSynthesis(dt, sr)
|
|
self.tns = tns.TnsSynthesis(dt)
|
|
self.sns = sns.SnsSynthesis(dt, sr)
|
|
self.ltpf = ltpf.LtpfSynthesis(dt, sr)
|
|
|
|
def decode(self, data):
|
|
|
|
b = bitstream.BitstreamReader(data)
|
|
|
|
bw = self.bwdet.get(b)
|
|
if bw > self.sr:
|
|
raise ValueError('Invalid bandwidth indication')
|
|
|
|
self.spec.load(b)
|
|
|
|
self.tns.load(b, bw, len(data))
|
|
|
|
pitch = b.read_bit()
|
|
|
|
self.sns.load(b)
|
|
|
|
if pitch:
|
|
self.ltpf.load(b)
|
|
else:
|
|
self.ltpf.disable()
|
|
|
|
x = self.spec.decode(b, bw, len(data))
|
|
|
|
return (x, bw, pitch)
|
|
|
|
def synthesize(self, x, bw, pitch, nbytes):
|
|
|
|
x = self.tns.run(x, bw)
|
|
|
|
x = self.sns.run(x)
|
|
|
|
x = np.append(x, np.zeros(self.ns - self.ne))
|
|
x = self.mdct.run(x)
|
|
|
|
x = self.ltpf.run(x, len(data))
|
|
|
|
return x
|
|
|
|
def run(self, data):
|
|
|
|
(x, bw, pitch) = self.decode(data)
|
|
|
|
x = self.synthesize(x, bw, pitch, len(data))
|
|
|
|
return x
|
|
|
|
### ------------------------------------------------------------------------ ###
|
|
|
|
def check_appendix_c(dt):
|
|
|
|
ok = True
|
|
|
|
dec_c = lc3.setup_decoder(int(T.DT_MS[dt] * 1000), 16000)
|
|
|
|
for i in range(len(C.BYTES_AC[dt])):
|
|
|
|
pcm = lc3.decode(dec_c, bytes(C.BYTES_AC[dt][i]))
|
|
ok = ok and np.max(np.abs(pcm - C.X_HAT_CLIP[dt][i])) < 1
|
|
|
|
return ok
|
|
|
|
def check():
|
|
|
|
ok = True
|
|
|
|
for dt in range(T.NUM_DT):
|
|
ok = ok and check_appendix_c(dt)
|
|
|
|
return ok
|
|
|
|
### ------------------------------------------------------------------------ ###
|
|
|
|
if __name__ == "__main__":
|
|
|
|
parser = argparse.ArgumentParser(description='LC3 Decoder Test Framework')
|
|
parser.add_argument('lc3_file',
|
|
help='Input bitstream file', type=argparse.FileType('r'))
|
|
parser.add_argument('--pyout',
|
|
help='Python output file', type=argparse.FileType('w'))
|
|
parser.add_argument('--cout',
|
|
help='C output file', type=argparse.FileType('w'))
|
|
args = parser.parse_args()
|
|
|
|
### File Header ###
|
|
|
|
f_lc3 = open(args.lc3_file.name, 'rb')
|
|
|
|
header = struct.unpack('=HHHHHHHI', f_lc3.read(18))
|
|
|
|
if header[0] != 0xcc1c:
|
|
raise ValueError('Invalid bitstream file')
|
|
|
|
if header[4] != 1:
|
|
raise ValueError('Unsupported number of channels')
|
|
|
|
sr_hz = header[2] * 100
|
|
bitrate = header[3] * 100
|
|
nchannels = header[4]
|
|
dt_ms = header[5] / 100
|
|
|
|
f_lc3.seek(header[1])
|
|
|
|
### Setup ###
|
|
|
|
dec = Decoder(dt_ms, sr_hz)
|
|
dec_c = lc3.setup_decoder(int(dt_ms * 1000), sr_hz)
|
|
|
|
pcm_c = np.empty(0).astype(np.int16)
|
|
pcm_py = np.empty(0).astype(np.int16)
|
|
|
|
### Decoding loop ###
|
|
|
|
nframes = 0
|
|
|
|
while True:
|
|
|
|
data = f_lc3.read(2)
|
|
if len(data) != 2:
|
|
break
|
|
|
|
(frame_nbytes,) = struct.unpack('=H', data)
|
|
|
|
print('Decoding frame %d' % nframes, end='\r')
|
|
|
|
data = f_lc3.read(frame_nbytes)
|
|
|
|
x = dec.run(data)
|
|
pcm_py = np.append(pcm_py,
|
|
np.clip(np.round(x), -32768, 32767).astype(np.int16))
|
|
|
|
x_c = lc3.decode(dec_c, data)
|
|
pcm_c = np.append(pcm_c, x_c)
|
|
|
|
nframes += 1
|
|
|
|
print('done ! %16s' % '')
|
|
|
|
### Terminate ###
|
|
|
|
if args.pyout:
|
|
wavfile.write(args.pyout.name, sr_hz, pcm_py)
|
|
if args.cout:
|
|
wavfile.write(args.cout.name, sr_hz, pcm_c)
|
|
|
|
### ------------------------------------------------------------------------ ###
|