mirror of
https://github.com/bluekitchen/btstack.git
synced 2025-02-14 18:40:59 +00:00
186 lines
4.8 KiB
Python
186 lines
4.8 KiB
Python
#
|
|
# Copyright 2022 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
import numpy as np
|
|
|
|
import lc3
|
|
import tables as T, appendix_c as C
|
|
|
|
|
|
### ------------------------------------------------------------------------ ###
|
|
|
|
class AttackDetector:
|
|
|
|
def __init__(self, dt, sr):
|
|
|
|
self.dt = dt
|
|
self.sr = sr
|
|
self.ms = T.DT_MS[dt]
|
|
|
|
self.xn1 = 0
|
|
self.xn2 = 0
|
|
self.en1 = 0
|
|
self.an1 = 0
|
|
self.p_att = 0
|
|
|
|
def is_enabled(self, nbytes):
|
|
|
|
c1 = self.dt == T.DT_10M and \
|
|
self.sr == T.SRATE_32K and nbytes > 80
|
|
|
|
c2 = self.dt == T.DT_10M and \
|
|
self.sr >= T.SRATE_48K and nbytes >= 100
|
|
|
|
c3 = self.dt == T.DT_7M5 and \
|
|
self.sr == T.SRATE_32K and nbytes >= 61 and nbytes < 150
|
|
|
|
c4 = self.dt == T.DT_7M5 and \
|
|
self.sr >= T.SRATE_48K and nbytes >= 75 and nbytes < 150
|
|
|
|
return c1 or c2 or c3 or c4
|
|
|
|
def run(self, nbytes, x):
|
|
|
|
### 3.3.6.2 Downsampling and filtering input
|
|
|
|
mf = int(16 * self.ms)
|
|
|
|
r = len(x) // mf
|
|
x_att = np.array([ np.sum(x[i*r:(i+1)*r]) for i in range(mf) ])
|
|
|
|
x_hp = np.empty(mf)
|
|
x_hp[0 ] = 0.375 * x_att[0 ] - 0.5 * self.xn1 + 0.125 * self.xn2
|
|
x_hp[1 ] = 0.375 * x_att[1 ] - 0.5 * x_att[0 ] + 0.125 * self.xn1
|
|
x_hp[2:] = 0.375 * x_att[2:] - 0.5 * x_att[1:-1] + 0.125 * x_att[0:-2]
|
|
self.xn2 = x_att[-2]
|
|
self.xn1 = x_att[-1]
|
|
|
|
### 3.3.6.3 Energy calculation
|
|
|
|
nb = int(self.ms / 2.5)
|
|
|
|
e_att = np.array([ np.sum(np.square(x_hp[40*i:40*(i+1)]))
|
|
for i in range(nb) ])
|
|
|
|
a_att = np.empty(nb)
|
|
a_att[0] = np.maximum(0.25 * self.an1, self.en1)
|
|
for i in range(1,nb):
|
|
a_att[i] = np.maximum(0.25 * a_att[i-1], e_att[i-1])
|
|
self.en1 = e_att[-1]
|
|
self.an1 = a_att[-1]
|
|
|
|
### 3.3.6.4 Attack Detection
|
|
|
|
p_att = -1
|
|
flags = [ (e_att[i] > 8.5 * a_att[i]) for i in range(nb) ]
|
|
|
|
for (i, f) in enumerate(flags):
|
|
if f: p_att = i
|
|
|
|
f_att = p_att >= 0 or self.p_att - 1 >= nb // 2
|
|
self.p_att = 1 + p_att
|
|
|
|
return self.is_enabled(nbytes) and f_att
|
|
|
|
|
|
def initial_state():
|
|
return { 'en1': 0.0, 'an1': 0.0, 'p_att': 0 }
|
|
|
|
### ------------------------------------------------------------------------ ###
|
|
|
|
def check_enabling(rng, dt):
|
|
|
|
ok = True
|
|
|
|
for sr in range(T.SRATE_16K, T.NUM_SRATE):
|
|
|
|
attdet = AttackDetector(dt, sr)
|
|
|
|
for nbytes in [ 61, 61-1, 75-1, 75, 80, 80+1, 100-1, 100, 150-1, 150 ]:
|
|
|
|
f_att = lc3.attdet_run(dt, sr, nbytes,
|
|
initial_state(), 2 * rng.random(T.NS[dt][sr]+6) - 1)
|
|
|
|
ok = ok and f_att == attdet.is_enabled(nbytes)
|
|
|
|
return ok
|
|
|
|
def check_unit(rng, dt, sr):
|
|
|
|
ns = T.NS[dt][sr]
|
|
ok = True
|
|
|
|
attdet = AttackDetector(dt, sr)
|
|
|
|
state_c = initial_state()
|
|
x_c = np.zeros(ns+6)
|
|
|
|
for run in range(100):
|
|
|
|
### Generate noise, and an attack at random point
|
|
|
|
x = ((2 * rng.random(ns)) - 1) * (2 ** 8 - 1)
|
|
x[(ns * rng.random()).astype(int)] *= 2 ** 7
|
|
|
|
### Check Implementation
|
|
|
|
f_att = attdet.run(100, x)
|
|
|
|
x_c = np.append(x_c[-6:], x)
|
|
f_att_c = lc3.attdet_run(dt, sr, 100, state_c, x_c)
|
|
|
|
ok = ok and f_att_c == f_att
|
|
ok = ok and np.amax(np.abs(1 - state_c['en1']/attdet.en1)) < 2
|
|
ok = ok and np.amax(np.abs(1 - state_c['an1']/attdet.an1)) < 2
|
|
ok = ok and state_c['p_att'] == attdet.p_att
|
|
|
|
return ok
|
|
|
|
def check_appendix_c(dt):
|
|
|
|
sr = T.SRATE_48K
|
|
|
|
state = initial_state()
|
|
|
|
x = np.append(np.zeros(6), C.X_PCM_ATT[dt][0])
|
|
f_att = lc3.attdet_run(dt, sr, C.NBYTES_ATT[dt], state, x)
|
|
ok = f_att == C.F_ATT[dt][0]
|
|
|
|
x = np.append(x[-6:], C.X_PCM_ATT[dt][1])
|
|
f_att = lc3.attdet_run(dt, sr, C.NBYTES_ATT[dt], state, x)
|
|
ok = f_att == C.F_ATT[dt][1]
|
|
|
|
return ok
|
|
|
|
def check():
|
|
|
|
rng = np.random.default_rng(1234)
|
|
ok = True
|
|
|
|
for dt in range(T.NUM_DT):
|
|
ok and check_enabling(rng, dt)
|
|
|
|
for dt in range(T.NUM_DT):
|
|
for sr in range(T.SRATE_32K, T.NUM_SRATE):
|
|
ok = ok and check_unit(rng, dt, sr)
|
|
|
|
for dt in range(T.NUM_DT):
|
|
ok = ok and check_appendix_c(dt)
|
|
|
|
return ok
|
|
|
|
### ------------------------------------------------------------------------ ###
|