diff --git a/openpilot/tools/camerastream/compressed_vipc.py b/openpilot/tools/camerastream/compressed_vipc.py index 0b96223e8e0fbc..4d45a9ccc5aad4 100755 --- a/openpilot/tools/camerastream/compressed_vipc.py +++ b/openpilot/tools/camerastream/compressed_vipc.py @@ -7,16 +7,88 @@ import multiprocessing import time import signal - - -import openpilot.cereal.messaging as messaging +import zmq +from cereal import log from msgq.visionipc import VisionIpcServer, VisionStreamType V4L2_BUF_FLAG_KEYFRAME = 8 -# start encoderd -# also start cereal messaging bridge -# then run this "./compressed_vipc.py " + +def fnv1a_hash(s: str) -> int: + """FNV-1a hash function matching bridge_zmq.cc""" + h = 0xcbf29ce484222325 + for c in s.encode(): + h ^= c + h = (h * 0x100000001b3) & 0xFFFFFFFFFFFFFFFF + return h + + +def get_port(endpoint: str) -> int: + """Calculate port number from endpoint name, matching bridge_zmq.cc""" + return 8023 + (fnv1a_hash(endpoint) % (65535 - 8023)) + + +class ZmqSubSocket: + """ZMQ subscriber socket matching BridgeZmqSubSocket from bridge_zmq.cc""" + + def __init__(self, endpoint: str, address: str, conflate: bool = False): + self.ctx = zmq.Context() + self.sock = self.ctx.socket(zmq.SUB) + self.sock.setsockopt(zmq.SUBSCRIBE, b"") + + if conflate: + self.sock.setsockopt(zmq.CONFLATE, 1) + + reconnect_ivl = 500 + self.sock.setsockopt(zmq.RECONNECT_IVL_MAX, reconnect_ivl) + + port = get_port(endpoint) + self.endpoint = f"tcp://{address}:{port}" + self.sock.connect(self.endpoint) + + def set_timeout(self, timeout_ms: int): + self.sock.setsockopt(zmq.RCVTIMEO, timeout_ms) + + def recv(self, non_blocking: bool = False) -> bytes | None: + flags = zmq.NOBLOCK if non_blocking else 0 + try: + return self.sock.recv(flags=flags) + except zmq.Again: + return None + + def __del__(self): + self.sock.close() + self.ctx.term() + + +class ZmqPoller: + """ZMQ poller matching BridgeZmqPoller from bridge_zmq.cc""" + + def __init__(self): + self.socks = [] + + def register(self, sock: ZmqSubSocket): + self.socks.append(sock) + + def poll(self, timeout_ms: int): + """Poll registered sockets, return list of sockets with events""" + items = [sock.sock for sock in self.socks] + if not items: + return [] + + zmq_poller = zmq.Poller() + for s in items: + zmq_poller.register(s, zmq.POLLIN) + + result = zmq_poller.poll(timeout_ms) + ready_socks = [] + for s, _ in result: + for sock in self.socks: + if sock.sock == s: + ready_socks.append(sock) + break + return ready_socks + ENCODE_SOCKETS = { VisionStreamType.VISION_STREAM_ROAD: "roadEncodeData", @@ -24,6 +96,7 @@ VisionStreamType.VISION_STREAM_WIDE_ROAD: "wideRoadEncodeData", } + def decoder(addr, vipc_server, vst, nvidia, W, H, debug=False): sock_name = ENCODE_SOCKETS[vst] if debug: @@ -42,17 +115,18 @@ def decoder(addr, vipc_server, vst, nvidia, W, H, debug=False): else: codec = av.CodecContext.create("hevc", "r") - os.environ["ZMQ"] = "1" - messaging.reset_context() - sock = messaging.sub_sock(sock_name, None, addr=addr, conflate=False) + sock = ZmqSubSocket(sock_name, addr, conflate=False) cnt = 0 last_idx = -1 seen_iframe = False time_q = [] while 1: - msgs = messaging.drain_sock(sock, wait_for_one=True) - for evt in msgs: + dat = sock.recv() + if dat is None: + continue + + with log.Event.from_bytes(dat) as evt: evta = getattr(evt, evt.which()) if debug and evta.idx.encodeId != 0 and evta.idx.encodeId != (last_idx+1): print("DROP PACKET!") @@ -101,31 +175,55 @@ def decoder(addr, vipc_server, vst, nvidia, W, H, debug=False): pc_latency = (time.monotonic()-time_q[0])*1000 time_q = time_q[1:] if debug: - print(f"{len(msgs):2d} {evta.idx.encodeId:4d} {evt.logMonoTime/1e9:.3f} {evta.idx.timestampEof/1e6:.3f} \ - roll {frame_latency:6.2f} ms latency {process_latency:6.2f} ms + {network_latency:6.2f} ms + {pc_latency:6.2f} ms \ - = {process_latency+network_latency+pc_latency:6.2f} ms", len(evta.data), sock_name) - + msg = f" 1 {evta.idx.encodeId:4d} {evt.logMonoTime/1e9:.3f} {evta.idx.timestampEof/1e6:.3f} " + msg += f"roll {frame_latency:6.2f} ms latency {process_latency:6.2f} ms + {network_latency:6.2f} ms + {pc_latency:6.2f} ms " + msg += f"= {process_latency+network_latency+pc_latency:6.2f} ms [{len(evta.data)} bytes] {sock_name}" + print(msg) class CompressedVipc: def __init__(self, addr, vision_streams, server_name, nvidia=False, debug=False): print("getting frame sizes") - os.environ["ZMQ"] = "1" - messaging.reset_context() - sm = messaging.SubMaster([ENCODE_SOCKETS[s] for s in vision_streams], addr=addr) - while min(sm.recv_frame.values()) == 0: - sm.update(100) - os.environ.pop("ZMQ") - messaging.reset_context() + + # Wait for first frame to get dimensions + self.poller = ZmqPoller() + self.socks = {} + self.data = {} + + for vst in vision_streams: + sock_name = ENCODE_SOCKETS[vst] + sock = ZmqSubSocket(sock_name, addr, conflate=True) + self.poller.register(sock) + self.socks[sock_name] = sock + self.data[sock_name] = None + + # Poll until we get at least one message on each socket + waiting_for = {ENCODE_SOCKETS[vst] for vst in vision_streams} + while waiting_for: + ready = self.poller.poll(5000) + for sock in ready: + for name, s in self.socks.items(): + if s == sock: + dat = sock.recv() + if dat and self.data[name] is None: + with log.Event.from_bytes(dat) as evt: + self.data[name] = getattr(evt, name) + waiting_for.discard(name) + break + + print(222) self.vipc_server = VisionIpcServer(server_name) for vst in vision_streams: - ed = sm[ENCODE_SOCKETS[vst]] + ed = self.data[ENCODE_SOCKETS[vst]] self.vipc_server.create_buffers(vst, 4, ed.width, ed.height) self.vipc_server.start_listener() + print("start decoders") + self.procs = [] for vst in vision_streams: - ed = sm[ENCODE_SOCKETS[vst]] + ed = self.data[ENCODE_SOCKETS[vst]] + print(f"start decoder for {ENCODE_SOCKETS[vst]}, {ed.width}x{ed.height}") p = multiprocessing.Process(target=decoder, args=(addr, self.vipc_server, vst, nvidia, ed.width, ed.height, debug)) p.start() self.procs.append(p) @@ -139,6 +237,7 @@ def kill(self): p.terminate() self.join() + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Decode video streams and broadcast on VisionIPC") parser.add_argument("addr", help="Address of comma three")