Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions openpilot/system/webrtc/device/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
import time

import av
from teleoprtc.tracks import TiciVideoStreamTrack
from aiortc import MediaStreamError
from teleoprtc.tracks import MediaStreamError, TiciVideoStreamTrack

from openpilot.cereal import messaging
from openpilot.common.realtime import DT_MDL
Expand Down Expand Up @@ -55,6 +54,9 @@ def enable(self, enabled: bool):
if not enabled:
self._seen_keyframe = False

def request_keyframe(self) -> None:
self.params.put("LivestreamRequestKeyframe", True, block=False)

def _build_frame_data(self, msg) -> bytes:
encode_data = getattr(msg, msg.which())
if not self.timing_sei_enabled:
Expand Down
11 changes: 3 additions & 8 deletions openpilot/system/webrtc/tests/test_stream_session.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
import asyncio
import json
import time
# for aiortc and its dependencies
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=RuntimeWarning) # TODO: remove this when google-crc32c publish a python3.12 wheel

from aiortc import RTCDataChannel
from aiortc.mediastreams import VIDEO_CLOCK_RATE, VIDEO_TIME_BASE
import capnp
from openpilot.cereal import messaging, log
from teleoprtc.tracks import VIDEO_CLOCK_RATE, VIDEO_TIME_BASE

from openpilot.system.webrtc.webrtcd import CerealOutgoingMessageProxy, CerealIncomingMessageProxy
from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack
Expand All @@ -31,7 +26,8 @@ def test_outgoing_proxy(self, mocker):
expected_dict = {"type": "customReservedRawData0", "logMonoTime": 123, "valid": True, "data": "test"}
expected_json = json.dumps(expected_dict).encode()

channel = mocker.Mock(spec=RTCDataChannel)
channel = mocker.Mock()
channel.is_open.return_value = True
proxy = CerealOutgoingMessageProxy(["customReservedRawData0"])
def mocked_update(t):
proxy.sm.update_msgs(0, [test_msg])
Expand Down Expand Up @@ -83,4 +79,3 @@ def test_livestream_track(self, mocker):
start_pts = packet.pts
assert abs(i + packet.pts - (start_pts + (((time.monotonic_ns() - start_ns) * VIDEO_CLOCK_RATE) // 1_000_000_000))) < 450 #5ms
assert packet.size == 0

59 changes: 21 additions & 38 deletions openpilot/system/webrtc/webrtcd.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,9 @@
from urllib.parse import urlparse, parse_qs
from typing import Any, TYPE_CHECKING

# aiortc and its dependencies have lots of internal warnings :(
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=RuntimeWarning) # TODO: remove this when google-crc32c publish a python3.12 wheel

import capnp
if TYPE_CHECKING:
from aiortc.rtcdatachannel import RTCDataChannel
import aioice.ice
from teleoprtc.stream import RTCDataChannelAdapter

from openpilot.system.webrtc.helpers import StreamRequestBody
from openpilot.system.webrtc.schema import generate_field
Expand All @@ -44,17 +38,6 @@ def _default_route_ip() -> str | None:
finally:
s.close()

# aioice patch: gather ICE candidates only on the default-route interface
_get_host_addresses = aioice.ice.get_host_addresses
def _primary_host_addresses(use_ipv4: bool, use_ipv6: bool) -> list[str]:
addresses = _get_host_addresses(use_ipv4, use_ipv6)
primary = _default_route_ip()
if primary not in addresses:
return addresses
return [primary, ]
aioice.ice.get_host_addresses = _primary_host_addresses


class AsyncTaskRunner:
def __init__(self):
self.is_running = False
Expand Down Expand Up @@ -86,10 +69,10 @@ def __init__(self, services: list[str], enabled: bool = True):
super().__init__()
self.services = list(services)
self.sm = messaging.SubMaster(self.services)
self.channels: list[RTCDataChannel] = []
self.channels: list[RTCDataChannelAdapter] = []
self._enabled = enabled

def add_channel(self, channel: 'RTCDataChannel'):
def add_channel(self, channel: 'RTCDataChannelAdapter'):
self.channels.append(channel)

def enable(self, enable: bool):
Expand Down Expand Up @@ -118,20 +101,17 @@ def update(self):
outgoing_msg = {"type": service, "logMonoTime": mono_time, "valid": valid, "data": msg_dict}
encoded_msg = json.dumps(outgoing_msg).encode()
for channel in self.channels:
if hasattr(channel, "is_open") and not channel.is_open():
continue
channel.send(encoded_msg)

async def run(self):
from aiortc.exceptions import InvalidStateError

while True:
if not self._enabled:
await asyncio.sleep(0.01)
continue
try:
self.update()
except InvalidStateError:
self.logger.warning("Cereal outgoing proxy invalid state (connection closed)")
break
except Exception:
self.logger.exception("Cereal outgoing proxy failure")
await asyncio.sleep(0.01)
Expand Down Expand Up @@ -217,6 +197,8 @@ async def run(self):
self._publish(self.bitrates[self.level])

async def _sample(self) -> float | None:
if not hasattr(self.pc, "getStats"):
return None
report = await self.pc.getStats()
packets_lost = packets_sent = 0
for s in report.values():
Expand Down Expand Up @@ -248,17 +230,15 @@ class StreamSession:
shared_pub_master = DynamicPubMaster([])

def __init__(self, body: StreamRequestBody, debug_mode: bool = False):
if debug_mode:
from aiortc.mediastreams import VideoStreamTrack
from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack
from teleoprtc.builder import WebRTCAnswerBuilder

self.identifier = str(uuid.uuid4())
self.params = Params()
builder = WebRTCAnswerBuilder(body.sdp)
builder = WebRTCAnswerBuilder(body.sdp, bind_address=_default_route_ip())

self.enabled = body.enabled
self.video_track = LiveStreamVideoStreamTrack(body.init_camera, self.enabled) if not debug_mode else VideoStreamTrack()
self.video_track = LiveStreamVideoStreamTrack(body.init_camera, self.enabled)
builder.add_video_stream(body.init_camera, self.video_track)
self.stream = builder.stream()

Expand Down Expand Up @@ -305,13 +285,16 @@ def message_handler(self, message: bytes):
case "livestreamCameraSwitch":
self.video_track.switch_camera(payload["data"]["camera"])
case "livestreamSettings":
self.bitrate_controller.set_quality(payload["data"]["quality"])
if self.bitrate_controller is not None:
self.bitrate_controller.set_quality(payload["data"]["quality"])
case "livestreamVideoEnable":
enabled = payload["data"]["enabled"]
self.enabled = enabled
self.video_track.enable(enabled)
self.outgoing_bridge.enable(enabled)
self.bitrate_controller.enable(enabled)
if self.outgoing_bridge is not None:
self.outgoing_bridge.enable(enabled)
if self.bitrate_controller is not None:
self.bitrate_controller.enable(enabled)
if not enabled:
self.params.put("LivestreamRequestKeyframe", True)
case "clockSync":
Expand All @@ -325,7 +308,8 @@ def message_handler(self, message: bytes):
case _:
if payload.get("type") not in self.incoming_bridge_services:
return
self.incoming_bridge.send(message)
if self.incoming_bridge is not None:
self.incoming_bridge.send(message)
except Exception:
self.logger.exception("Cereal incoming proxy failure")

Expand All @@ -341,7 +325,8 @@ async def run(self):
channel = self.stream.get_messaging_channel()
self.outgoing_bridge.add_channel(channel)
self.outgoing_bridge.start()
self.bitrate_controller.start()
if self.bitrate_controller is not None:
self.bitrate_controller.start()

self.logger.info("Stream session (%s) connected", self.identifier)
await self.stream.wait_for_disconnection()
Expand All @@ -357,7 +342,8 @@ async def post_run_cleanup(self):
return
self._cleanup_done = True
self.params.put("LivestreamRequestKeyframe", False)
await self.bitrate_controller.stop()
if self.bitrate_controller is not None:
await self.bitrate_controller.stop()
if self.outgoing_bridge is not None:
await self.outgoing_bridge.stop()
if self.video_track is not None:
Expand Down Expand Up @@ -558,9 +544,6 @@ async def _shutdown(server: WebrtcdHTTPServer, state: ServerState, loop: asyncio


def prewarm_stream_session_imports(debug_mode: bool = False) -> None:
if debug_mode:
from aiortc.mediastreams import VideoStreamTrack
assert VideoStreamTrack
from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack
from teleoprtc.builder import WebRTCAnswerBuilder
assert LiveStreamVideoStreamTrack
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ dependencies = [

# body / webrtcd
"av",
"aiortc",
"libdatachannel-py>=2026.1.0.dev2",

# panda
"libusb1",
Expand Down
Loading
Loading