mirror of
https://github.com/google/bumble.git
synced 2026-04-16 00:25:31 +00:00
429 lines
9.6 KiB
Python
429 lines
9.6 KiB
Python
# Copyright 2023 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# https://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Constants
|
|
# -----------------------------------------------------------------------------
|
|
# fmt: off
|
|
|
|
WL = [-60, -30, 58, 172, 334, 538, 1198, 3042]
|
|
RL42 = [0, 7, 6, 5, 4, 3, 2, 1, 7, 6, 5, 4, 3, 2, 1, 0]
|
|
ILB = [
|
|
2048,
|
|
2093,
|
|
2139,
|
|
2186,
|
|
2233,
|
|
2282,
|
|
2332,
|
|
2383,
|
|
2435,
|
|
2489,
|
|
2543,
|
|
2599,
|
|
2656,
|
|
2714,
|
|
2774,
|
|
2834,
|
|
2896,
|
|
2960,
|
|
3025,
|
|
3091,
|
|
3158,
|
|
3228,
|
|
3298,
|
|
3371,
|
|
3444,
|
|
3520,
|
|
3597,
|
|
3676,
|
|
3756,
|
|
3838,
|
|
3922,
|
|
4008,
|
|
]
|
|
WH = [0, -214, 798]
|
|
RH2 = [2, 1, 2, 1]
|
|
# Values in QM2/QM4/QM6 left shift three bits than original g722 specification.
|
|
QM2 = [-7408, -1616, 7408, 1616]
|
|
QM4 = [
|
|
0,
|
|
-20456,
|
|
-12896,
|
|
-8968,
|
|
-6288,
|
|
-4240,
|
|
-2584,
|
|
-1200,
|
|
20456,
|
|
12896,
|
|
8968,
|
|
6288,
|
|
4240,
|
|
2584,
|
|
1200,
|
|
0,
|
|
]
|
|
QM6 = [
|
|
-136,
|
|
-136,
|
|
-136,
|
|
-136,
|
|
-24808,
|
|
-21904,
|
|
-19008,
|
|
-16704,
|
|
-14984,
|
|
-13512,
|
|
-12280,
|
|
-11192,
|
|
-10232,
|
|
-9360,
|
|
-8576,
|
|
-7856,
|
|
-7192,
|
|
-6576,
|
|
-6000,
|
|
-5456,
|
|
-4944,
|
|
-4464,
|
|
-4008,
|
|
-3576,
|
|
-3168,
|
|
-2776,
|
|
-2400,
|
|
-2032,
|
|
-1688,
|
|
-1360,
|
|
-1040,
|
|
-728,
|
|
24808,
|
|
21904,
|
|
19008,
|
|
16704,
|
|
14984,
|
|
13512,
|
|
12280,
|
|
11192,
|
|
10232,
|
|
9360,
|
|
8576,
|
|
7856,
|
|
7192,
|
|
6576,
|
|
6000,
|
|
5456,
|
|
4944,
|
|
4464,
|
|
4008,
|
|
3576,
|
|
3168,
|
|
2776,
|
|
2400,
|
|
2032,
|
|
1688,
|
|
1360,
|
|
1040,
|
|
728,
|
|
432,
|
|
136,
|
|
-432,
|
|
-136,
|
|
]
|
|
QMF_COEFFS = [3, -11, 12, 32, -210, 951, 3876, -805, 362, -156, 53, -11]
|
|
|
|
# fmt: on
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Classes
|
|
# -----------------------------------------------------------------------------
|
|
class G722Decoder(object):
|
|
"""G.722 decoder with bitrate 64kbit/s.
|
|
|
|
For the Blocks in the sub-band decoders, please refer to the G.722
|
|
specification for the required information. G722 specification:
|
|
https://www.itu.int/rec/T-REC-G.722-201209-I
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._x = [0] * 24
|
|
self._band = [Band(), Band()]
|
|
# The initial value in BLOCK 3L
|
|
self._band[0].det = 32
|
|
# The initial value in BLOCK 3H
|
|
self._band[1].det = 8
|
|
|
|
def decode_frame(self, encoded_data) -> bytearray:
|
|
result_array = bytearray(len(encoded_data) * 4)
|
|
self.g722_decode(result_array, encoded_data)
|
|
return result_array
|
|
|
|
def g722_decode(self, result_array, encoded_data) -> int:
|
|
"""Decode the data frame using g722 decoder."""
|
|
rlow = 0
|
|
rhigh = 0
|
|
xout1 = 0
|
|
xout2 = 0
|
|
code = 0
|
|
result_length = 0
|
|
j = 0
|
|
|
|
for j in range(0, len(encoded_data)):
|
|
code = encoded_data[j]
|
|
higher_bits = (code >> 6) & 0x03
|
|
lower_bits = code & 0x3F
|
|
|
|
rlow = self.lower_sub_band_decoder(lower_bits)
|
|
rhigh = self.higher_sub_band_decoder(higher_bits)
|
|
|
|
# Apply the receive QMF
|
|
for i in range(22):
|
|
self._x[i] = self._x[i + 2]
|
|
|
|
self._x[22] = rlow + rhigh
|
|
self._x[23] = rlow - rhigh
|
|
xout1 = 0
|
|
xout2 = 0
|
|
|
|
for i in range(12):
|
|
xout2 += self._x[2 * i] * QMF_COEFFS[i]
|
|
xout1 += self._x[2 * i + 1] * QMF_COEFFS[11 - i]
|
|
|
|
result_length = self.update_decoded_result(
|
|
xout1, result_length, result_array
|
|
)
|
|
result_length = self.update_decoded_result(
|
|
xout2, result_length, result_array
|
|
)
|
|
|
|
return result_length
|
|
|
|
def update_decoded_result(self, xout, byte_length, byte_array) -> int:
|
|
result = (int)(xout >> 11)
|
|
bytes_result = result.to_bytes(2, 'little', signed=True)
|
|
byte_array[byte_length] = bytes_result[0]
|
|
byte_array[byte_length + 1] = bytes_result[1]
|
|
return byte_length + 2
|
|
|
|
def lower_sub_band_decoder(self, lower_bits) -> int:
|
|
"""Lower sub-band decoder for last six bits."""
|
|
|
|
# Block 5L
|
|
# INVQBL
|
|
wd1 = lower_bits
|
|
wd2 = QM6[wd1]
|
|
wd1 >>= 2
|
|
wd2 = (self._band[0].det * wd2) >> 15
|
|
# RECONS
|
|
rlow = self._band[0].s + wd2
|
|
|
|
# Block 6L
|
|
# LIMIT
|
|
if rlow > 16383:
|
|
rlow = 16383
|
|
elif rlow < -16384:
|
|
rlow = -16384
|
|
|
|
# Block 2L
|
|
# INVQAL
|
|
wd2 = QM4[wd1]
|
|
dlowt = (self._band[0].det * wd2) >> 15
|
|
|
|
# Block 3L
|
|
# LOGSCL
|
|
wd2 = RL42[wd1]
|
|
wd1 = (self._band[0].nb * 127) >> 7
|
|
wd1 += WL[wd2]
|
|
|
|
if wd1 < 0:
|
|
wd1 = 0
|
|
elif wd1 > 18432:
|
|
wd1 = 18432
|
|
|
|
self._band[0].nb = wd1
|
|
|
|
# SCALEL
|
|
wd1 = (self._band[0].nb >> 6) & 31
|
|
wd2 = 8 - (self._band[0].nb >> 11)
|
|
|
|
if wd2 < 0:
|
|
wd3 = ILB[wd1] << -wd2
|
|
else:
|
|
wd3 = ILB[wd1] >> wd2
|
|
|
|
self._band[0].det = wd3 << 2
|
|
|
|
# Block 4L
|
|
self._band[0].block4(dlowt)
|
|
|
|
return rlow
|
|
|
|
def higher_sub_band_decoder(self, higher_bits) -> int:
|
|
"""Higher sub-band decoder for first two bits."""
|
|
|
|
# Block 2H
|
|
# INVQAH
|
|
wd2 = QM2[higher_bits]
|
|
dhigh = (self._band[1].det * wd2) >> 15
|
|
|
|
# Block 5H
|
|
# RECONS
|
|
rhigh = dhigh + self._band[1].s
|
|
|
|
# Block 6H
|
|
# LIMIT
|
|
if rhigh > 16383:
|
|
rhigh = 16383
|
|
elif rhigh < -16384:
|
|
rhigh = -16384
|
|
|
|
# Block 3H
|
|
# LOGSCH
|
|
wd2 = RH2[higher_bits]
|
|
wd1 = (self._band[1].nb * 127) >> 7
|
|
wd1 += WH[wd2]
|
|
|
|
if wd1 < 0:
|
|
wd1 = 0
|
|
elif wd1 > 22528:
|
|
wd1 = 22528
|
|
self._band[1].nb = wd1
|
|
|
|
# SCALEH
|
|
wd1 = (self._band[1].nb >> 6) & 31
|
|
wd2 = 10 - (self._band[1].nb >> 11)
|
|
|
|
if wd2 < 0:
|
|
wd3 = ILB[wd1] << -wd2
|
|
else:
|
|
wd3 = ILB[wd1] >> wd2
|
|
self._band[1].det = wd3 << 2
|
|
|
|
# Block 4H
|
|
self._band[1].block4(dhigh)
|
|
|
|
return rhigh
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
class Band(object):
|
|
"""Structure for G722 decode proccessing."""
|
|
|
|
s: int = 0
|
|
nb: int = 0
|
|
det: int = 0
|
|
|
|
def __init__(self):
|
|
self._sp = 0
|
|
self._sz = 0
|
|
self._r = [0] * 3
|
|
self._a = [0] * 3
|
|
self._ap = [0] * 3
|
|
self._p = [0] * 3
|
|
self._d = [0] * 7
|
|
self._b = [0] * 7
|
|
self._bp = [0] * 7
|
|
self._sg = [0] * 7
|
|
|
|
def saturate(self, amp: int) -> int:
|
|
if amp > 32767:
|
|
return 32767
|
|
elif amp < -32768:
|
|
return -32768
|
|
else:
|
|
return amp
|
|
|
|
def block4(self, d: int) -> None:
|
|
"""Block4 for both lower and higher sub-band decoder."""
|
|
wd1 = 0
|
|
wd2 = 0
|
|
wd3 = 0
|
|
|
|
# RECONS
|
|
self._d[0] = d
|
|
self._r[0] = self.saturate(self.s + d)
|
|
|
|
# PARREC
|
|
self._p[0] = self.saturate(self._sz + d)
|
|
|
|
# UPPOL2
|
|
for i in range(3):
|
|
self._sg[i] = (self._p[i]) >> 15
|
|
wd1 = self.saturate((self._a[1]) << 2)
|
|
wd2 = -wd1 if self._sg[0] == self._sg[1] else wd1
|
|
|
|
if wd2 > 32767:
|
|
wd2 = 32767
|
|
|
|
wd3 = 128 if self._sg[0] == self._sg[2] else -128
|
|
wd3 += wd2 >> 7
|
|
wd3 += (self._a[2] * 32512) >> 15
|
|
|
|
if wd3 > 12288:
|
|
wd3 = 12288
|
|
elif wd3 < -12288:
|
|
wd3 = -12288
|
|
self._ap[2] = wd3
|
|
|
|
# UPPOL1
|
|
self._sg[0] = (self._p[0]) >> 15
|
|
self._sg[1] = (self._p[1]) >> 15
|
|
wd1 = 192 if self._sg[0] == self._sg[1] else -192
|
|
wd2 = (self._a[1] * 32640) >> 15
|
|
|
|
self._ap[1] = self.saturate(wd1 + wd2)
|
|
wd3 = self.saturate(15360 - self._ap[2])
|
|
|
|
if self._ap[1] > wd3:
|
|
self._ap[1] = wd3
|
|
elif self._ap[1] < -wd3:
|
|
self._ap[1] = -wd3
|
|
|
|
# UPZERO
|
|
wd1 = 0 if d == 0 else 128
|
|
self._sg[0] = d >> 15
|
|
for i in range(1, 7):
|
|
self._sg[i] = (self._d[i]) >> 15
|
|
wd2 = wd1 if self._sg[i] == self._sg[0] else -wd1
|
|
wd3 = (self._b[i] * 32640) >> 15
|
|
self._bp[i] = self.saturate(wd2 + wd3)
|
|
|
|
# DELAYA
|
|
for i in range(6, 0, -1):
|
|
self._d[i] = self._d[i - 1]
|
|
self._b[i] = self._bp[i]
|
|
|
|
for i in range(2, 0, -1):
|
|
self._r[i] = self._r[i - 1]
|
|
self._p[i] = self._p[i - 1]
|
|
self._a[i] = self._ap[i]
|
|
|
|
# FILTEP
|
|
self._sp = 0
|
|
for i in range(1, 3):
|
|
wd1 = self.saturate(self._r[i] + self._r[i])
|
|
self._sp += (self._a[i] * wd1) >> 15
|
|
self._sp = self.saturate(self._sp)
|
|
|
|
# FILTEZ
|
|
self._sz = 0
|
|
for i in range(6, 0, -1):
|
|
wd1 = self.saturate(self._d[i] + self._d[i])
|
|
self._sz += (self._b[i] * wd1) >> 15
|
|
self._sz = self.saturate(self._sz)
|
|
|
|
# PREDIC
|
|
self.s = self.saturate(self._sp + self._sz)
|