Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions common/file_chunker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
import sys
import io
import math
import os
from pathlib import Path
Expand Down Expand Up @@ -39,14 +40,33 @@ def get_existing_chunks(path):
return _chunk_paths(path, num_chunks)
raise FileNotFoundError(path)

def read_file_chunked(path):
class ChunkStream(io.RawIOBase):
def __init__(self, paths):
self._files = (open(p, 'rb') for p in paths)
self._cur = next(self._files, None)

def readable(self):
return True

def readinto(self, b):
while self._cur is not None:
n = self._cur.readinto(b)
if n:
return n
self._cur.close() # chunk drained (or empty) -> advance to next
self._cur = next(self._files, None)
return 0 # no chunks left -> EOF

def open_file_chunked(path):
manifest_path = get_manifest_path(path)
if os.path.isfile(manifest_path):
num_chunks = int(Path(manifest_path).read_text().strip())
return b''.join(Path(get_chunk_name(path, i, num_chunks)).read_bytes() for i in range(num_chunks))
if os.path.isfile(path):
return Path(path).read_bytes()
raise FileNotFoundError(path)
paths = [get_chunk_name(path, i, num_chunks) for i in range(num_chunks)]
elif os.path.isfile(path):
paths = [path]
else:
raise FileNotFoundError(path)
return io.BufferedReader(ChunkStream(paths))


if __name__ == "__main__":
Expand Down
118 changes: 81 additions & 37 deletions selfdrive/modeld/compile_modeld.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,25 @@

import numpy as np


class StreamPickler(pickle._Pickler):
# tinygrad's Buffer.__reduce__ copies each device buffer out into a fresh host bytearray.
# those bytearrays are never shared, but pickle memoizes them and keeps every one alive
# until dump() finishes -> peak host RAM = the whole model (OOM on a big model). skipping
# the memoize() frees each copyout right after it's written (peak ~= one buffer). the
# Buffer objects themselves (incl. view/base sharing) are still memoized, and no
# back-reference ever points at a bytearray, so the output is a standard pickle that
# loads with unmodified tinygrad. requires protocol 5 (BYTEARRAY8).
dispatch = dict(pickle._Pickler.dispatch)
def save_bytearray(self, obj):
n = len(obj)
if n >= self.framer._FRAME_SIZE_TARGET:
self._write_large_bytes(pickle.BYTEARRAY8 + n.to_bytes(8, "little"), obj)
else:
self.write(pickle.BYTEARRAY8 + n.to_bytes(8, "little") + obj)
dispatch[bytearray] = save_bytearray


def _patch_tinygrad_fetch_fw():
import hashlib
import pathlib
Expand All @@ -32,8 +51,8 @@ def fetch_fw(path, name, sha256):


NV12Frame = namedtuple("NV12Frame", ['width', 'height', 'stride', 'y_height', 'uv_height', 'size'])
WARP_INPUTS = ['img_q', 'big_img_q', 'tfm', 'big_tfm']
POLICY_INPUTS = ['feat_q', 'desire_q', 'desire', 'traffic_convention', 'action_t']
WARP_INPUTS = ['tfm', 'big_tfm']
POLICY_INPUTS = ['img_q', 'big_img_q', 'feat_q', 'desire_q', 'all_npy_inputs']

UV_SCALE_MATRIX = np.array([[0.5, 0, 0], [0, 0.5, 0], [0, 0, 1]], dtype=np.float32)
UV_SCALE_MATRIX_INV = np.linalg.inv(UV_SCALE_MATRIX)
Expand Down Expand Up @@ -139,16 +158,22 @@ def make_input_queues(vision_input_shapes, policy_input_shapes, frame_skip, devi
#TODO action_t is hardcoded to match tc for future compatibility
at = tc

# pack desire, traffic_convention and action_t into one contiguous host buffer so they copy to the
# device in a single transfer (per-tensor copies are expensive on the usbgpu path). the npy entries
# are views into the shared buffer, so modeld's in-place writes still land in it; run_policy slices
# it back out in the same order. see POLICY_INPUTS / make_run_policy.
d_len, tc_len, at_len = dp[2], int(np.prod(tc)), int(np.prod(at))
all_npy_inputs = np.zeros(d_len + tc_len + at_len, dtype=np.float32)
policy_npy = {
'desire': np.zeros(dp[2], dtype=np.float32),
'traffic_convention': np.zeros(tc, dtype=np.float32),
'action_t': np.zeros(at, dtype=np.float32),
'desire': all_npy_inputs[:d_len],
'traffic_convention': all_npy_inputs[d_len:d_len + tc_len].reshape(tc),
'action_t': all_npy_inputs[d_len + tc_len:].reshape(at),
}
npy.update(policy_npy)
input_queues.update({
'feat_q': Tensor(np.zeros((frame_skip * (fb[1] - 1) + 1, fb[0], fb[2]), dtype=np.float32), device=device).contiguous().realize(),
'desire_q': Tensor(np.zeros((frame_skip * dp[1], dp[0], dp[2]), dtype=np.float32), device=device).contiguous().realize(),
**{k: Tensor(v, device='NPY').realize() for k, v in policy_npy.items()},
'all_npy_inputs': Tensor(all_npy_inputs, device='NPY').realize(),
})
return input_queues, npy

Expand All @@ -168,18 +193,16 @@ def sample_desire(buf, frame_skip):

def make_warp(nv12, model_w, model_h, frame_skip):
frame_prepare = make_frame_prepare(nv12, model_w, model_h)
sample_skip_fn = partial(sample_skip, frame_skip=frame_skip)

def warp_enqueue(img_q, big_img_q, tfm, big_tfm, frame, big_frame):
def warp_enqueue(tfm, big_tfm, frame, big_frame):
tfm = tfm.to(WARP_DEV)
big_tfm = big_tfm.to(WARP_DEV)
Tensor.realize(tfm, big_tfm)

warped_frame = frame_prepare(frame, tfm).unsqueeze(0).to(Device.DEFAULT)
warped_big_frame = frame_prepare(big_frame, big_tfm).unsqueeze(0).to(Device.DEFAULT)
img = shift_and_sample(img_q, warped_frame, sample_skip_fn)
big_img = shift_and_sample(big_img_q, warped_big_frame, sample_skip_fn)
return img, big_img
warped_frame = frame_prepare(frame, tfm).unsqueeze(0)
warped_big_frame = frame_prepare(big_frame, big_tfm).unsqueeze(0)
warped = Tensor.cat(warped_frame, warped_big_frame, dim=0)
return warped
return warp_enqueue


Expand All @@ -188,11 +211,26 @@ def make_run_policy(model_runners, model_metadata, frame_skip):
sample_skip_fn = partial(sample_skip, frame_skip=frame_skip)
vision_features_slice = model_metadata['vision']['output_slices']['hidden_state']

def run_policy(img, big_img, feat_q, desire_q, desire, traffic_convention, action_t):
desire = desire.to(Device.DEFAULT)
traffic_convention = traffic_convention.to(Device.DEFAULT)
action_t = action_t.to(Device.DEFAULT)
Tensor.realize(desire, traffic_convention, action_t)
policy_input_shapes = model_metadata['on_policy']['input_shapes']
dp = policy_input_shapes['desire_pulse'] # (1, 25, 8)
tc = policy_input_shapes['traffic_convention'] # (1, 2)
at = tc
d_len, tc_len = dp[2], int(np.prod(tc))

# desire, traffic_convention and action_t arrive packed into one NPY tensor (see make_input_queues),
# so the host->device copy happens once; slice them back out in the same order they were packed.
def run_policy(warped, img_q, big_img_q, feat_q, desire_q, all_npy_inputs):
all_npy_inputs = all_npy_inputs.to(Device.DEFAULT)
warped = warped.to(Device.DEFAULT)
Tensor.realize(all_npy_inputs, warped)

img = shift_and_sample(img_q, warped[0:1], sample_skip_fn)
big_img = shift_and_sample(big_img_q, warped[1:2], sample_skip_fn)

desire = all_npy_inputs[:d_len]
traffic_convention = all_npy_inputs[d_len:d_len + tc_len].reshape(tc)
action_t = all_npy_inputs[d_len + tc_len:].reshape(at)

desire_buf = shift_and_sample(desire_q, desire.reshape(1, 1, -1), sample_desire_fn)
vision_out = next(iter(model_runners['vision']({'img': img, 'big_img': big_img}).values())).cast('float32')

Expand All @@ -207,7 +245,7 @@ def run_policy(img, big_img, feat_q, desire_q, desire, traffic_convention, actio
}
on_policy_out = next(iter(model_runners['on_policy'](inputs).values())).cast('float32')
off_policy_out = next(iter(model_runners['off_policy'](inputs).values())).cast('float32')
return vision_out, on_policy_out, off_policy_out
return Tensor.cat(vision_out[0], on_policy_out[0], off_policy_out[0])
return run_policy


Expand All @@ -234,8 +272,8 @@ def random_inputs_run(fn, seed, test_val=None, test_buffers=None, expect_match=T
print(f" [{i+1}/{n_runs}] enqueue {(mt-st)*1e3:6.2f} ms -- total {(et-st)*1e3:6.2f} ms")

if i == 0:
val = [np.copy(v.numpy()) for v in outs]
buffers = [np.copy(v.numpy().copy()) for v in input_queues.values()]
val = np.copy(outs.numpy())
buffers = [np.copy(v.numpy()) for v in input_queues.values()]

if test_val is not None:
match = all(np.array_equal(a, b) for a, b in zip(val, test_val, strict=True))
Expand All @@ -248,9 +286,10 @@ def random_inputs_run(fn, seed, test_val=None, test_buffers=None, expect_match=T
print('capture + replay')
test_val, test_buffers = random_inputs_run(jit, SEED)
print('pickle round trip')
jit = pickle.loads(pickle.dumps(jit))
random_inputs_run(jit, SEED, test_val, test_buffers, expect_match=True)
random_inputs_run(jit, SEED+1, test_val, test_buffers, expect_match=False)
# TODO need to round trip on disk
# jit = pickle.loads(pickle.dumps(jit))
# random_inputs_run(jit, SEED, test_val, test_buffers, expect_match=True)
# random_inputs_run(jit, SEED+1, test_val, test_buffers, expect_match=False)
return jit


Expand All @@ -259,14 +298,19 @@ def _parse_size(s):
return int(w), int(h)


def read_file_chunked_to_shm(path):
from openpilot.common.file_chunker import read_file_chunked
from openpilot.system.hardware.hw import Paths
shm_path = os.path.join(Paths.shm_path(), os.path.basename(path))
atexit.register(lambda: os.path.exists(shm_path) and os.remove(shm_path))
with open(shm_path, 'wb') as f:
f.write(read_file_chunked(path))
return shm_path
def read_file_chunked_to_disk(path):
import shutil
import tempfile
from openpilot.common.file_chunker import open_file_chunked
# Stream the chunks into one file on the same (real) filesystem as the chunks -- NOT /dev/shm --
# so models too big to fit in RAM/shm still load. OnnxRunner mmaps the result and pages it in
# lazily, so peak memory stays small regardless of model size.
fd, dst = tempfile.mkstemp(prefix=os.path.basename(path) + '.', suffix='.assembled',
dir=os.path.dirname(os.path.abspath(path)))
atexit.register(lambda: os.path.exists(dst) and os.remove(dst))
with os.fdopen(fd, 'wb') as f, open_file_chunked(path) as src:
shutil.copyfileobj(src, f)
return dst


if __name__ == "__main__":
Expand All @@ -285,9 +329,9 @@ def read_file_chunked_to_shm(path):
args = p.parse_args()

model_paths = {
'vision': read_file_chunked_to_shm(args.vision_onnx),
'off_policy': read_file_chunked_to_shm(args.off_policy_onnx),
'on_policy': read_file_chunked_to_shm(args.on_policy_onnx),
'vision': read_file_chunked_to_disk(args.vision_onnx),
'off_policy': read_file_chunked_to_disk(args.off_policy_onnx),
'on_policy': read_file_chunked_to_disk(args.on_policy_onnx),
}
model_w, model_h = args.model_size

Expand All @@ -300,7 +344,7 @@ def read_file_chunked_to_shm(path):

make_policy_queues = partial(make_input_queues, out['metadata']['vision']['input_shapes'],
out['metadata']['on_policy']['input_shapes'], args.frame_skip)
make_random_model_inputs = partial(make_random_images, keys=['img', 'big_img'], shape=out['metadata']['vision']['input_shapes']['img'])
make_random_model_inputs = partial(make_random_images, keys=['warped'], shape=(2, 6, 128, 256), device=WARP_DEV)
out['run_policy'] = compile_jit(run_policy_jit, make_random_model_inputs, POLICY_INPUTS,
make_policy_queues)

Expand All @@ -312,5 +356,5 @@ def read_file_chunked_to_shm(path):
out[(cam_w,cam_h)] = compile_jit(warp_enqueue, make_random_warp_inputs, WARP_INPUTS, make_warp_queues)

with open(args.output, "wb") as f:
pickle.dump(out, f)
StreamPickler(f, protocol=5).dump(out)
print(f"Saved JITs to {args.output} ({os.path.getsize(args.output) / 1e6:.2f} MB)")
5 changes: 3 additions & 2 deletions selfdrive/modeld/dmonitoringmodeld.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from openpilot.common.transformations.model import dmonitoringmodel_intrinsics
from openpilot.common.transformations.camera import _ar_ox_fisheye, _os_fisheye
from openpilot.system.camerad.cameras.nv12_info import get_nv12_info
from openpilot.common.file_chunker import read_file_chunked
from openpilot.common.file_chunker import open_file_chunked
from openpilot.selfdrive.modeld.parse_model_outputs import sigmoid, safe_exp

PROCESS_NAME = "selfdrive.modeld.dmonitoringmodeld"
Expand Down Expand Up @@ -43,7 +43,8 @@ def __init__(self, cam_w: int, cam_h: int):
self.frame_buf_params = get_nv12_info(cam_w, cam_h)
self.tensor_inputs = {k: Tensor(v, device='NPY').realize() for k,v in self.numpy_inputs.items()}
self._blob_cache : dict[int, Tensor] = {}
self.model_run = pickle.loads(read_file_chunked(str(MODEL_PKL_PATH)))
with open_file_chunked(str(MODEL_PKL_PATH)) as f:
self.model_run = pickle.load(f)
with open(MODELS_DIR / f'dm_warp_{cam_w}x{cam_h}_tinygrad.pkl', "rb") as f:
self.image_warp = pickle.load(f)

Expand Down
28 changes: 16 additions & 12 deletions selfdrive/modeld/modeld.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
import os
os.environ['GMMU'] = '0' # for usbgpu fast loading, noop for qcom
os.environ['AM_POWER_LIMIT'] = '100'
from tinygrad.tensor import Tensor
import time
import pickle
Expand All @@ -22,7 +23,7 @@
from openpilot.selfdrive.modeld.parse_model_outputs import Parser
from openpilot.selfdrive.modeld.compile_modeld import make_input_queues, WARP_INPUTS, POLICY_INPUTS
from openpilot.selfdrive.modeld.fill_model_msg import fill_model_msg, fill_pose_msg, PublishState
from openpilot.common.file_chunker import read_file_chunked, get_manifest_path
from openpilot.common.file_chunker import open_file_chunked, get_manifest_path
from openpilot.selfdrive.modeld.constants import ModelConstants, Plan
from openpilot.selfdrive.modeld.helpers import usbgpu_present, modeld_pkl_path, get_tg_input_devices

Expand Down Expand Up @@ -78,7 +79,8 @@ class ModelState:
def __init__(self, cam_w: int, cam_h: int, usbgpu: bool):
input_devices = get_tg_input_devices(PROCESS_NAME, usbgpu)
self.WARP_DEV, self.QUEUE_DEV = input_devices['WARP_DEV'], input_devices['QUEUE_DEV']
jits = pickle.loads(read_file_chunked(modeld_pkl_path(usbgpu)))
with open_file_chunked(modeld_pkl_path(usbgpu)) as f:
jits = pickle.load(f)
vision_metadata = jits['metadata']['vision']
self.vision_input_shapes = vision_metadata['input_shapes']
self.vision_input_names = list(self.vision_input_shapes.keys())
Expand Down Expand Up @@ -126,18 +128,20 @@ def run(self, bufs: dict[str, VisionBuf], transforms: dict[str, np.ndarray],
self.npy['tfm'][:,:] = transforms['img'][:,:]
self.npy['big_tfm'][:,:] = transforms['big_img'][:,:]

img, big_img = self.warp_enqueue(**{k: self.input_queues[k] for k in WARP_INPUTS}, frame=self.full_frames['img'], big_frame=self.full_frames['big_img'])
warped = self.warp_enqueue(**{k: self.input_queues[k] for k in WARP_INPUTS}, frame=self.full_frames['img'], big_frame=self.full_frames['big_img'])

if prepare_only:
return None

vision_output, on_policy_output, off_policy_output = self.run_policy(
**{k: self.input_queues[k] for k in POLICY_INPUTS if k in self.input_queues}, img=img, big_img=big_img
outputs = self.run_policy(
**{k: self.input_queues[k] for k in POLICY_INPUTS if k in self.input_queues}, warped=warped
)

vision_output = vision_output.numpy().flatten()
off_policy_output = off_policy_output.numpy().flatten()
on_policy_output = on_policy_output.numpy().flatten()
outputs = outputs.numpy()
vision_end = max(s.stop for s in self.vision_output_slices.values() if s.stop)
on_policy_end = vision_end + max(s.stop for s in self.policy_output_slices.values() if s.stop)
off_policy_end = on_policy_end + max(s.stop for s in self.off_policy_output_slices.values() if s.stop)
vision_output, on_policy_output, off_policy_output = outputs[:vision_end][None], outputs[vision_end:on_policy_end][None], outputs[on_policy_end:off_policy_end][None]

vision_output = vision_output.flatten()
off_policy_output = off_policy_output.flatten()
on_policy_output = on_policy_output.flatten()
vision_outputs_dict = self.parser.parse_vision_outputs(self.slice_outputs(vision_output, self.vision_output_slices))
off_policy_outputs_dict = self.parser.parse_off_policy_outputs(self.slice_outputs(off_policy_output, self.off_policy_output_slices))
policy_outputs_dict = self.parser.parse_policy_outputs(self.slice_outputs(on_policy_output, self.policy_output_slices))
Expand Down
Loading