Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 123 additions & 24 deletions openpilot/tools/camerastream/compressed_vipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,96 @@
import multiprocessing
import time
import signal


import openpilot.cereal.messaging as messaging
import zmq
from cereal import log
from msgq.visionipc import VisionIpcServer, VisionStreamType

V4L2_BUF_FLAG_KEYFRAME = 8

# start encoderd
# also start cereal messaging bridge
# then run this "./compressed_vipc.py <ip>"

def fnv1a_hash(s: str) -> int:
"""FNV-1a hash function matching bridge_zmq.cc"""
h = 0xcbf29ce484222325
for c in s.encode():
h ^= c
h = (h * 0x100000001b3) & 0xFFFFFFFFFFFFFFFF
return h


def get_port(endpoint: str) -> int:
"""Calculate port number from endpoint name, matching bridge_zmq.cc"""
return 8023 + (fnv1a_hash(endpoint) % (65535 - 8023))


class ZmqSubSocket:
"""ZMQ subscriber socket matching BridgeZmqSubSocket from bridge_zmq.cc"""

def __init__(self, endpoint: str, address: str, conflate: bool = False):
self.ctx = zmq.Context()
self.sock = self.ctx.socket(zmq.SUB)
self.sock.setsockopt(zmq.SUBSCRIBE, b"")

if conflate:
self.sock.setsockopt(zmq.CONFLATE, 1)

reconnect_ivl = 500
self.sock.setsockopt(zmq.RECONNECT_IVL_MAX, reconnect_ivl)

port = get_port(endpoint)
self.endpoint = f"tcp://{address}:{port}"
self.sock.connect(self.endpoint)

def set_timeout(self, timeout_ms: int):
self.sock.setsockopt(zmq.RCVTIMEO, timeout_ms)

def recv(self, non_blocking: bool = False) -> bytes | None:
flags = zmq.NOBLOCK if non_blocking else 0
try:
return self.sock.recv(flags=flags)
except zmq.Again:
return None

def __del__(self):
self.sock.close()
self.ctx.term()


class ZmqPoller:
"""ZMQ poller matching BridgeZmqPoller from bridge_zmq.cc"""

def __init__(self):
self.socks = []

def register(self, sock: ZmqSubSocket):
self.socks.append(sock)

def poll(self, timeout_ms: int):
"""Poll registered sockets, return list of sockets with events"""
items = [sock.sock for sock in self.socks]
if not items:
return []

zmq_poller = zmq.Poller()
for s in items:
zmq_poller.register(s, zmq.POLLIN)

result = zmq_poller.poll(timeout_ms)
ready_socks = []
for s, _ in result:
for sock in self.socks:
if sock.sock == s:
ready_socks.append(sock)
break
return ready_socks


ENCODE_SOCKETS = {
VisionStreamType.VISION_STREAM_ROAD: "roadEncodeData",
VisionStreamType.VISION_STREAM_DRIVER: "driverEncodeData",
VisionStreamType.VISION_STREAM_WIDE_ROAD: "wideRoadEncodeData",
}


def decoder(addr, vipc_server, vst, nvidia, W, H, debug=False):
sock_name = ENCODE_SOCKETS[vst]
if debug:
Expand All @@ -42,17 +115,18 @@ def decoder(addr, vipc_server, vst, nvidia, W, H, debug=False):
else:
codec = av.CodecContext.create("hevc", "r")

os.environ["ZMQ"] = "1"
messaging.reset_context()
sock = messaging.sub_sock(sock_name, None, addr=addr, conflate=False)
sock = ZmqSubSocket(sock_name, addr, conflate=False)
cnt = 0
last_idx = -1
seen_iframe = False

time_q = []
while 1:
msgs = messaging.drain_sock(sock, wait_for_one=True)
for evt in msgs:
dat = sock.recv()
if dat is None:
continue

with log.Event.from_bytes(dat) as evt:
evta = getattr(evt, evt.which())
if debug and evta.idx.encodeId != 0 and evta.idx.encodeId != (last_idx+1):
print("DROP PACKET!")
Expand Down Expand Up @@ -101,31 +175,55 @@ def decoder(addr, vipc_server, vst, nvidia, W, H, debug=False):
pc_latency = (time.monotonic()-time_q[0])*1000
time_q = time_q[1:]
if debug:
print(f"{len(msgs):2d} {evta.idx.encodeId:4d} {evt.logMonoTime/1e9:.3f} {evta.idx.timestampEof/1e6:.3f} \
roll {frame_latency:6.2f} ms latency {process_latency:6.2f} ms + {network_latency:6.2f} ms + {pc_latency:6.2f} ms \
= {process_latency+network_latency+pc_latency:6.2f} ms", len(evta.data), sock_name)

msg = f" 1 {evta.idx.encodeId:4d} {evt.logMonoTime/1e9:.3f} {evta.idx.timestampEof/1e6:.3f} "
msg += f"roll {frame_latency:6.2f} ms latency {process_latency:6.2f} ms + {network_latency:6.2f} ms + {pc_latency:6.2f} ms "
msg += f"= {process_latency+network_latency+pc_latency:6.2f} ms [{len(evta.data)} bytes] {sock_name}"
print(msg)

class CompressedVipc:
def __init__(self, addr, vision_streams, server_name, nvidia=False, debug=False):
print("getting frame sizes")
os.environ["ZMQ"] = "1"
messaging.reset_context()
sm = messaging.SubMaster([ENCODE_SOCKETS[s] for s in vision_streams], addr=addr)
while min(sm.recv_frame.values()) == 0:
sm.update(100)
os.environ.pop("ZMQ")
messaging.reset_context()

# Wait for first frame to get dimensions
self.poller = ZmqPoller()
self.socks = {}
self.data = {}

for vst in vision_streams:
sock_name = ENCODE_SOCKETS[vst]
sock = ZmqSubSocket(sock_name, addr, conflate=True)
self.poller.register(sock)
self.socks[sock_name] = sock
self.data[sock_name] = None

# Poll until we get at least one message on each socket
waiting_for = {ENCODE_SOCKETS[vst] for vst in vision_streams}
while waiting_for:
ready = self.poller.poll(5000)
for sock in ready:
for name, s in self.socks.items():
if s == sock:
dat = sock.recv()
if dat and self.data[name] is None:
with log.Event.from_bytes(dat) as evt:
self.data[name] = getattr(evt, name)
waiting_for.discard(name)
break

print(222)

self.vipc_server = VisionIpcServer(server_name)
for vst in vision_streams:
ed = sm[ENCODE_SOCKETS[vst]]
ed = self.data[ENCODE_SOCKETS[vst]]
self.vipc_server.create_buffers(vst, 4, ed.width, ed.height)
self.vipc_server.start_listener()

print("start decoders")

self.procs = []
for vst in vision_streams:
ed = sm[ENCODE_SOCKETS[vst]]
ed = self.data[ENCODE_SOCKETS[vst]]
print(f"start decoder for {ENCODE_SOCKETS[vst]}, {ed.width}x{ed.height}")
p = multiprocessing.Process(target=decoder, args=(addr, self.vipc_server, vst, nvidia, ed.width, ed.height, debug))
p.start()
self.procs.append(p)
Expand All @@ -139,6 +237,7 @@ def kill(self):
p.terminate()
self.join()


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Decode video streams and broadcast on VisionIPC")
parser.add_argument("addr", help="Address of comma three")
Expand Down
Loading