mirror of
https://github.com/google/bumble.git
synced 2026-04-16 00:25:31 +00:00
111 lines
3.4 KiB
Python
111 lines
3.4 KiB
Python
# Copyright 2024 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# https://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Imports
|
|
# -----------------------------------------------------------------------------
|
|
from __future__ import annotations
|
|
|
|
import struct
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
class MediaPacket:
|
|
@staticmethod
|
|
def from_bytes(data: bytes) -> MediaPacket:
|
|
version = (data[0] >> 6) & 0x03
|
|
padding = (data[0] >> 5) & 0x01
|
|
extension = (data[0] >> 4) & 0x01
|
|
csrc_count = data[0] & 0x0F
|
|
marker = (data[1] >> 7) & 0x01
|
|
payload_type = data[1] & 0x7F
|
|
sequence_number = struct.unpack_from('>H', data, 2)[0]
|
|
timestamp = struct.unpack_from('>I', data, 4)[0]
|
|
ssrc = struct.unpack_from('>I', data, 8)[0]
|
|
csrc_list = [
|
|
struct.unpack_from('>I', data, 12 + i)[0] for i in range(csrc_count)
|
|
]
|
|
payload = data[12 + csrc_count * 4 :]
|
|
|
|
return MediaPacket(
|
|
version,
|
|
padding,
|
|
extension,
|
|
marker,
|
|
sequence_number,
|
|
timestamp,
|
|
ssrc,
|
|
csrc_list,
|
|
payload_type,
|
|
payload,
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
version: int,
|
|
padding: int,
|
|
extension: int,
|
|
marker: int,
|
|
sequence_number: int,
|
|
timestamp: int,
|
|
ssrc: int,
|
|
csrc_list: list[int],
|
|
payload_type: int,
|
|
payload: bytes,
|
|
) -> None:
|
|
self.version = version
|
|
self.padding = padding
|
|
self.extension = extension
|
|
self.marker = marker
|
|
self.sequence_number = sequence_number & 0xFFFF
|
|
self.timestamp = timestamp & 0xFFFFFFFF
|
|
self.timestamp_seconds = 0.0
|
|
self.ssrc = ssrc
|
|
self.csrc_list = csrc_list
|
|
self.payload_type = payload_type
|
|
self.payload = payload
|
|
|
|
def __bytes__(self) -> bytes:
|
|
header = bytes(
|
|
[
|
|
self.version << 6
|
|
| self.padding << 5
|
|
| self.extension << 4
|
|
| len(self.csrc_list),
|
|
self.marker << 7 | self.payload_type,
|
|
]
|
|
) + struct.pack(
|
|
'>HII',
|
|
self.sequence_number,
|
|
self.timestamp,
|
|
self.ssrc,
|
|
)
|
|
for csrc in self.csrc_list:
|
|
header += struct.pack('>I', csrc)
|
|
return header + self.payload
|
|
|
|
def __str__(self) -> str:
|
|
return (
|
|
f'RTP(v={self.version},'
|
|
f'p={self.padding},'
|
|
f'x={self.extension},'
|
|
f'm={self.marker},'
|
|
f'pt={self.payload_type},'
|
|
f'sn={self.sequence_number},'
|
|
f'ts={self.timestamp},'
|
|
f'ssrc={self.ssrc},'
|
|
f'csrcs={self.csrc_list},'
|
|
f'payload_size={len(self.payload)})'
|
|
)
|