From b65bb4898faaaf464c115755e12ef846088dec90 Mon Sep 17 00:00:00 2001 From: zhangxu Date: Wed, 20 May 2026 22:10:17 +0800 Subject: [PATCH 1/5] 2026.05.20 fix compressed_vipc.py by using pyzmq for zmq client --- tools/camerastream/compressed_vipc.py | 145 ++++++++++++++++++++++---- 1 file changed, 123 insertions(+), 22 deletions(-) diff --git a/tools/camerastream/compressed_vipc.py b/tools/camerastream/compressed_vipc.py index 4dc74272ea50be..a3695682e116b6 100755 --- a/tools/camerastream/compressed_vipc.py +++ b/tools/camerastream/compressed_vipc.py @@ -7,16 +7,90 @@ import multiprocessing import time import signal +import zmq - -import cereal.messaging as messaging +import capnp +from cereal import log from msgq.visionipc import VisionIpcServer, VisionStreamType V4L2_BUF_FLAG_KEYFRAME = 8 -# start encoderd -# also start cereal messaging bridge -# then run this "./compressed_vipc.py " + +def fnv1a_hash(s: str) -> int: + """FNV-1a hash function matching bridge_zmq.cc""" + h = 0xcbf29ce484222325 + for c in s.encode(): + h ^= c + h = (h * 0x100000001b3) & 0xFFFFFFFFFFFFFFFF + return h + + +def get_port(endpoint: str) -> int: + """Calculate port number from endpoint name, matching bridge_zmq.cc""" + return 8023 + (fnv1a_hash(endpoint) % (65535 - 8023)) + + +class ZmqSubSocket: + """ZMQ subscriber socket matching BridgeZmqSubSocket from bridge_zmq.cc""" + + def __init__(self, endpoint: str, address: str, conflate: bool = False): + self.ctx = zmq.Context() + self.sock = self.ctx.socket(zmq.SUB) + self.sock.setsockopt(zmq.SUBSCRIBE, b"") + + if conflate: + self.sock.setsockopt(zmq.CONFLATE, 1) + + reconnect_ivl = 500 + self.sock.setsockopt(zmq.RECONNECT_IVL_MAX, reconnect_ivl) + + port = get_port(endpoint) + self.endpoint = f"tcp://{address}:{port}" + self.sock.connect(self.endpoint) + + def set_timeout(self, timeout_ms: int): + self.sock.setsockopt(zmq.RCVTIMEO, timeout_ms) + + def recv(self, non_blocking: bool = False) -> bytes: + flags = zmq.NOBLOCK if non_blocking else 0 + try: + return self.sock.recv(flags=flags) + except zmq.Again: + return None + + def __del__(self): + self.sock.close() + self.ctx.term() + + +class ZmqPoller: + """ZMQ poller matching BridgeZmqPoller from bridge_zmq.cc""" + + def __init__(self): + self.socks = [] + + def register(self, sock: ZmqSubSocket): + self.socks.append(sock) + + def poll(self, timeout_ms: int): + """Poll registered sockets, return list of sockets with events""" + items = [sock.sock for sock in self.socks] + if not items: + return [] + + zmq_poller = zmq.Poller() + for s in items: + zmq_poller.register(s, zmq.POLLIN) + + result = zmq_poller.poll(timeout_ms) + ready_socks = [] + for s, _ in result: + for sock in self.socks: + if sock.sock == s: + ready_socks.append(sock) + break + return ready_socks + ENCODE_SOCKETS = { VisionStreamType.VISION_STREAM_ROAD: "roadEncodeData", @@ -24,6 +98,7 @@ VisionStreamType.VISION_STREAM_WIDE_ROAD: "wideRoadEncodeData", } + def decoder(addr, vipc_server, vst, nvidia, W, H, debug=False): sock_name = ENCODE_SOCKETS[vst] if debug: @@ -42,17 +117,18 @@ def decoder(addr, vipc_server, vst, nvidia, W, H, debug=False): else: codec = av.CodecContext.create("hevc", "r") - os.environ["ZMQ"] = "1" - messaging.reset_context() - sock = messaging.sub_sock(sock_name, None, addr=addr, conflate=False) + sock = ZmqSubSocket(sock_name, addr, conflate=False) cnt = 0 last_idx = -1 seen_iframe = False time_q = [] while 1: - msgs = messaging.drain_sock(sock, wait_for_one=True) - for evt in msgs: + dat = sock.recv() + if dat is None: + continue + + with log.Event.from_bytes(dat) as evt: evta = getattr(evt, evt.which()) if debug and evta.idx.encodeId != 0 and evta.idx.encodeId != (last_idx+1): print("DROP PACKET!") @@ -101,31 +177,55 @@ def decoder(addr, vipc_server, vst, nvidia, W, H, debug=False): pc_latency = (time.monotonic()-time_q[0])*1000 time_q = time_q[1:] if debug: - print(f"{len(msgs):2d} {evta.idx.encodeId:4d} {evt.logMonoTime/1e9:.3f} {evta.idx.timestampEof/1e6:.3f} \ - roll {frame_latency:6.2f} ms latency {process_latency:6.2f} ms + {network_latency:6.2f} ms + {pc_latency:6.2f} ms \ - = {process_latency+network_latency+pc_latency:6.2f} ms", len(evta.data), sock_name) + print(f" 1 {evta.idx.encodeId:4d} {evt.logMonoTime/1e9:.3f} {evta.idx.timestampEof/1e6:.3f} " + f"roll {frame_latency:6.2f} ms latency {process_latency:6.2f} ms + {network_latency:6.2f} ms + {pc_latency:6.2f} ms " + f"= {process_latency+network_latency+pc_latency:6.2f} ms [{len(evta.data)} bytes] {sock_name}") class CompressedVipc: def __init__(self, addr, vision_streams, server_name, nvidia=False, debug=False): print("getting frame sizes") - os.environ["ZMQ"] = "1" - messaging.reset_context() - sm = messaging.SubMaster([ENCODE_SOCKETS[s] for s in vision_streams], addr=addr) - while min(sm.recv_frame.values()) == 0: - sm.update(100) - os.environ.pop("ZMQ") - messaging.reset_context() + + # Wait for first frame to get dimensions + self.poller = ZmqPoller() + self.socks = {} + self.data = {} + + for vst in vision_streams: + sock_name = ENCODE_SOCKETS[vst] + sock = ZmqSubSocket(sock_name, addr, conflate=True) + self.poller.register(sock) + self.socks[sock_name] = sock + self.data[sock_name] = None + + # Poll until we get at least one message on each socket + waiting_for = set(ENCODE_SOCKETS[vst] for vst in vision_streams) + while waiting_for: + ready = self.poller.poll(5000) + for sock in ready: + for name, s in self.socks.items(): + if s == sock: + dat = sock.recv() + if dat and self.data[name] is None: + with log.Event.from_bytes(dat) as evt: + self.data[name] = getattr(evt, name) + waiting_for.discard(name) + break + + print(222) self.vipc_server = VisionIpcServer(server_name) for vst in vision_streams: - ed = sm[ENCODE_SOCKETS[vst]] + ed = self.data[ENCODE_SOCKETS[vst]] self.vipc_server.create_buffers(vst, 4, ed.width, ed.height) self.vipc_server.start_listener() + print("start decoders") + self.procs = [] for vst in vision_streams: - ed = sm[ENCODE_SOCKETS[vst]] + ed = self.data[ENCODE_SOCKETS[vst]] + print(f"start decoder for {ENCODE_SOCKETS[vst]}, {ed.width}x{ed.height}") p = multiprocessing.Process(target=decoder, args=(addr, self.vipc_server, vst, nvidia, ed.width, ed.height, debug)) p.start() self.procs.append(p) @@ -139,6 +239,7 @@ def kill(self): p.terminate() self.join() + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Decode video streams and broadcast on VisionIPC") parser.add_argument("addr", help="Address of comma three") From d3daace43f81adbad0cc564c33247f385be5d273 Mon Sep 17 00:00:00 2001 From: zhangxu Date: Thu, 21 May 2026 14:25:10 +0800 Subject: [PATCH 2/5] 2026.05.21 Clean up compressed_vipc.py by removing unused import, fixing return type annotation, and using idiomatic Python syntax - Remove unused 'capnp' import - Update ZmqSubSocket.recv() return type to bytes | None (can return None on timeout) - Wrap multi-line print statement with parentheses for cleaner formatting - Use set comprehension syntax instead of set() with generator --- tools/camerastream/compressed_vipc.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tools/camerastream/compressed_vipc.py b/tools/camerastream/compressed_vipc.py index a3695682e116b6..f226677211d1e6 100755 --- a/tools/camerastream/compressed_vipc.py +++ b/tools/camerastream/compressed_vipc.py @@ -8,8 +8,6 @@ import time import signal import zmq - -import capnp from cereal import log from msgq.visionipc import VisionIpcServer, VisionStreamType @@ -51,7 +49,7 @@ def __init__(self, endpoint: str, address: str, conflate: bool = False): def set_timeout(self, timeout_ms: int): self.sock.setsockopt(zmq.RCVTIMEO, timeout_ms) - def recv(self, non_blocking: bool = False) -> bytes: + def recv(self, non_blocking: bool = False) -> bytes | None: flags = zmq.NOBLOCK if non_blocking else 0 try: return self.sock.recv(flags=flags) @@ -177,9 +175,11 @@ def decoder(addr, vipc_server, vst, nvidia, W, H, debug=False): pc_latency = (time.monotonic()-time_q[0])*1000 time_q = time_q[1:] if debug: - print(f" 1 {evta.idx.encodeId:4d} {evt.logMonoTime/1e9:.3f} {evta.idx.timestampEof/1e6:.3f} " - f"roll {frame_latency:6.2f} ms latency {process_latency:6.2f} ms + {network_latency:6.2f} ms + {pc_latency:6.2f} ms " - f"= {process_latency+network_latency+pc_latency:6.2f} ms [{len(evta.data)} bytes] {sock_name}") + print(( + f" 1 {evta.idx.encodeId:4d} {evt.logMonoTime/1e9:.3f} {evta.idx.timestampEof/1e6:.3f} " + f"roll {frame_latency:6.2f} ms latency {process_latency:6.2f} ms + {network_latency:6.2f} ms + {pc_latency:6.2f} ms " + f"= {process_latency+network_latency+pc_latency:6.2f} ms [{len(evta.data)} bytes] {sock_name}" + )) class CompressedVipc: @@ -199,7 +199,7 @@ def __init__(self, addr, vision_streams, server_name, nvidia=False, debug=False) self.data[sock_name] = None # Poll until we get at least one message on each socket - waiting_for = set(ENCODE_SOCKETS[vst] for vst in vision_streams) + waiting_for = {ENCODE_SOCKETS[vst] for vst in vision_streams} while waiting_for: ready = self.poller.poll(5000) for sock in ready: From e6e71f9c646e05e00d9a6bfbf311a167d420f5db Mon Sep 17 00:00:00 2001 From: zhangxu Date: Thu, 21 May 2026 14:31:27 +0800 Subject: [PATCH 3/5] - Remove extraneous parentheses around print() statement --- tools/camerastream/compressed_vipc.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tools/camerastream/compressed_vipc.py b/tools/camerastream/compressed_vipc.py index f226677211d1e6..c213217e358ecc 100755 --- a/tools/camerastream/compressed_vipc.py +++ b/tools/camerastream/compressed_vipc.py @@ -175,11 +175,9 @@ def decoder(addr, vipc_server, vst, nvidia, W, H, debug=False): pc_latency = (time.monotonic()-time_q[0])*1000 time_q = time_q[1:] if debug: - print(( - f" 1 {evta.idx.encodeId:4d} {evt.logMonoTime/1e9:.3f} {evta.idx.timestampEof/1e6:.3f} " - f"roll {frame_latency:6.2f} ms latency {process_latency:6.2f} ms + {network_latency:6.2f} ms + {pc_latency:6.2f} ms " - f"= {process_latency+network_latency+pc_latency:6.2f} ms [{len(evta.data)} bytes] {sock_name}" - )) + print(f" 1 {evta.idx.encodeId:4d} {evt.logMonoTime/1e9:.3f} {evta.idx.timestampEof/1e6:.3f} " + f"roll {frame_latency:6.2f} ms latency {process_latency:6.2f} ms + {network_latency:6.2f} ms + {pc_latency:6.2f} ms " + f"= {process_latency+network_latency+pc_latency:6.2f} ms [{len(evta.data)} bytes] {sock_name}") class CompressedVipc: From ac9c25002cdec5b0b1177a2b243cb3afa63ed184 Mon Sep 17 00:00:00 2001 From: zhangxu Date: Thu, 21 May 2026 14:38:07 +0800 Subject: [PATCH 4/5] - Merge multi-line formatted log print statement into single line to resolve ISC002 implicit string concatenation and UP034 extraneous parentheses lint errors. --- tools/camerastream/compressed_vipc.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tools/camerastream/compressed_vipc.py b/tools/camerastream/compressed_vipc.py index c213217e358ecc..0f6ef0e942881d 100755 --- a/tools/camerastream/compressed_vipc.py +++ b/tools/camerastream/compressed_vipc.py @@ -175,10 +175,7 @@ def decoder(addr, vipc_server, vst, nvidia, W, H, debug=False): pc_latency = (time.monotonic()-time_q[0])*1000 time_q = time_q[1:] if debug: - print(f" 1 {evta.idx.encodeId:4d} {evt.logMonoTime/1e9:.3f} {evta.idx.timestampEof/1e6:.3f} " - f"roll {frame_latency:6.2f} ms latency {process_latency:6.2f} ms + {network_latency:6.2f} ms + {pc_latency:6.2f} ms " - f"= {process_latency+network_latency+pc_latency:6.2f} ms [{len(evta.data)} bytes] {sock_name}") - + print(f" 1 {evta.idx.encodeId:4d} {evt.logMonoTime/1e9:.3f} {evta.idx.timestampEof/1e6:.3f} roll {frame_latency:6.2f} ms latency {process_latency:6.2f} ms + {network_latency:6.2f} ms + {pc_latency:6.2f} ms = {process_latency+network_latency+pc_latency:6.2f} ms [{len(evta.data)} bytes] {sock_name}") class CompressedVipc: def __init__(self, addr, vision_streams, server_name, nvidia=False, debug=False): From cd5e6c725960c94c95c02395a5fd5e70154c987d Mon Sep 17 00:00:00 2001 From: zhangxu Date: Thu, 21 May 2026 14:41:49 +0800 Subject: [PATCH 5/5] - Fix lint errors including line length, implicit string concatenation, and extraneous parentheses in compressed_vipc by using explicit string concatenation and splitting debug print message into multiple lines. --- tools/camerastream/compressed_vipc.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tools/camerastream/compressed_vipc.py b/tools/camerastream/compressed_vipc.py index 0f6ef0e942881d..4d45a9ccc5aad4 100755 --- a/tools/camerastream/compressed_vipc.py +++ b/tools/camerastream/compressed_vipc.py @@ -175,7 +175,10 @@ def decoder(addr, vipc_server, vst, nvidia, W, H, debug=False): pc_latency = (time.monotonic()-time_q[0])*1000 time_q = time_q[1:] if debug: - print(f" 1 {evta.idx.encodeId:4d} {evt.logMonoTime/1e9:.3f} {evta.idx.timestampEof/1e6:.3f} roll {frame_latency:6.2f} ms latency {process_latency:6.2f} ms + {network_latency:6.2f} ms + {pc_latency:6.2f} ms = {process_latency+network_latency+pc_latency:6.2f} ms [{len(evta.data)} bytes] {sock_name}") + msg = f" 1 {evta.idx.encodeId:4d} {evt.logMonoTime/1e9:.3f} {evta.idx.timestampEof/1e6:.3f} " + msg += f"roll {frame_latency:6.2f} ms latency {process_latency:6.2f} ms + {network_latency:6.2f} ms + {pc_latency:6.2f} ms " + msg += f"= {process_latency+network_latency+pc_latency:6.2f} ms [{len(evta.data)} bytes] {sock_name}" + print(msg) class CompressedVipc: def __init__(self, addr, vision_streams, server_name, nvidia=False, debug=False):