diff --git a/examples/apple/coreml/scripts/BUCK b/examples/apple/coreml/scripts/BUCK index 164feb8d306..42a97ea893f 100644 --- a/examples/apple/coreml/scripts/BUCK +++ b/examples/apple/coreml/scripts/BUCK @@ -16,6 +16,19 @@ fbcode_target(_kind = python_binary, ], ) +fbcode_target(_kind = python_binary, + name = "coreml_compute_plan", + srcs = [ + "coreml_compute_plan.py", + ], + main_function = "executorch.examples.apple.coreml.scripts.coreml_compute_plan.main", + deps = [ + "//executorch/backends/apple/coreml:executorchcoreml", + "//executorch/exir:schema", + "//executorch/exir/_serialize:lib", + ], +) + fbcode_target(_kind = python_binary, name = "export", srcs = [ diff --git a/examples/apple/coreml/scripts/coreml_compute_plan.py b/examples/apple/coreml/scripts/coreml_compute_plan.py new file mode 100644 index 00000000000..7169ff71a58 --- /dev/null +++ b/examples/apple/coreml/scripts/coreml_compute_plan.py @@ -0,0 +1,230 @@ +# Copyright © 2026 Apple Inc. All rights reserved. +# +# Please refer to the license found in the LICENSE file in the root directory of the source tree. + +"""Report which CoreML operations would dispatch to ANE / GPU / CPU. + +The CoreML runtime decides at compile/load time which compute device each +MIL operation will run on; that decision is exposed by ``MLComputePlan`` +in coremltools 9.0+. This script wraps that API so users can answer +"why isn't my model running on the ANE?" without writing Swift. + +Usage:: + + # Analyze a CoreML model directly (mlpackage or compiled mlmodelc). + python coreml_compute_plan.py --model_path path/to/model.mlpackage + + # Analyze every Core ML partition embedded in an ExecuTorch .pte. + python coreml_compute_plan.py --model_path path/to/program.pte + + # Show ops that fell off the ANE, grouped by op type. + python coreml_compute_plan.py --model_path model.mlpackage --show_non_ane + + # Pick which devices the runtime is allowed to consider. + python coreml_compute_plan.py --model_path model.mlpackage \\ + --compute_units cpu_and_ne +""" + +import argparse +import os +import sys +import tempfile +from collections import Counter +from typing import Iterable, List, Tuple + +import coremltools as ct +from coremltools.models.compute_device import ( + MLCPUComputeDevice, + MLGPUComputeDevice, + MLNeuralEngineComputeDevice, +) +from coremltools.models.compute_plan import MLComputePlan + +from executorch.examples.apple.coreml.scripts.extract_coreml_models import ( + extract_coreml_models, +) + + +_DEVICE_NAMES: List[Tuple[type, str]] = [ + (MLNeuralEngineComputeDevice, "ANE"), + (MLGPUComputeDevice, "GPU"), + (MLCPUComputeDevice, "CPU"), +] + +_COMPUTE_UNIT_CHOICES = { + "all": ct.ComputeUnit.ALL, + "cpu_and_ne": ct.ComputeUnit.CPU_AND_NE, + "cpu_and_gpu": ct.ComputeUnit.CPU_AND_GPU, + "cpu_only": ct.ComputeUnit.CPU_ONLY, +} + + +def _device_name(device) -> str: + if device is None: + return "unknown" + for cls, name in _DEVICE_NAMES: + if isinstance(device, cls): + return name + return type(device).__name__ + + +def _iter_operations(block) -> Iterable: + for op in block.operations: + yield op + for nested in getattr(op, "blocks", None) or []: + yield from _iter_operations(nested) + + +def _ensure_compiled(model_path: str, tmpdir: str) -> str: + """Return a `.mlmodelc` path; compile from `.mlpackage` if needed.""" + if model_path.endswith(".mlmodelc"): + return model_path + if model_path.endswith(".mlpackage"): + dest = os.path.join( + tmpdir, os.path.basename(model_path).replace(".mlpackage", ".mlmodelc") + ) + return str(ct.models.utils.compile_model(model_path, destination_path=dest)) + raise ValueError( + f"Expected a .mlpackage or .mlmodelc path, got: {model_path}" + ) + + +def analyze_one(model_path: str, compute_units: ct.ComputeUnit) -> List[Tuple[str, str, str]]: + """Return [(function, operator_name, device)] for every op that has a plan. + + coremltools 9.0's ``MLComputePlan.load_from_path`` only exposes usage for + the default function of a multifunction package, so a multifunction + .mlpackage is analyzed function-by-function by projecting each function + as the ``main`` of a temp single-function copy. + """ + function_names = _mlpackage_function_names(model_path) + if len(function_names) <= 1: + return _analyze_compiled(model_path, compute_units) + rows: List[Tuple[str, str, str]] = [] + with tempfile.TemporaryDirectory() as tmpdir: + for fname in function_names: + projected = _project_to_single(model_path, fname, tmpdir) + for _, op_name, device in _analyze_compiled(projected, compute_units): + rows.append((fname, op_name, device)) + return rows + + +def _analyze_compiled( + model_path: str, compute_units: ct.ComputeUnit +) -> List[Tuple[str, str, str]]: + with tempfile.TemporaryDirectory() as tmpdir: + compiled = _ensure_compiled(model_path, tmpdir) + plan = MLComputePlan.load_from_path(compiled, compute_units=compute_units) + program = plan.model_structure.program + if program is None: + raise RuntimeError( + f"{model_path} is not an MLProgram model; this tool only supports " + "the MLProgram backend (the CoreML backend executorch produces today)." + ) + + rows: List[Tuple[str, str, str]] = [] + for fname, fn in program.functions.items(): + for op in _iter_operations(fn.block): + usage = plan.get_compute_device_usage_for_mlprogram_operation(op) + if usage is None: + # Constants and similar non-dispatched ops don't have a plan. + continue + rows.append( + (fname, op.operator_name, _device_name(usage.preferred_compute_device)) + ) + return rows + + +def _mlpackage_function_names(model_path: str) -> List[str]: + """Names of the MLProgram functions inside an .mlpackage, or [] otherwise.""" + if not model_path.endswith(".mlpackage"): + return [] + spec = ct.models.MLModel(model_path, skip_model_load=True).get_spec() + if spec.WhichOneof("Type") != "mlProgram": + return [] + return list(spec.mlProgram.functions.keys()) + + +def _project_to_single(src_mlpackage: str, function_name: str, tmpdir: str) -> str: + """Re-save ``src_mlpackage`` with only ``function_name`` exposed as ``main``.""" + from coremltools.models.utils import MultiFunctionDescriptor, save_multifunction + + dest = os.path.join(tmpdir, f"{function_name}.mlpackage") + desc = MultiFunctionDescriptor() + desc.add_function( + src_mlpackage, + src_function_name=function_name, + target_function_name="main", + ) + desc.default_function_name = "main" + save_multifunction(desc, dest) + return dest + + +def _print_report(label: str, rows: List[Tuple[str, str, str]], show_non_ane: bool) -> None: + print(f"\n=== {label} ===") + if not rows: + print(" (no dispatched operations found)") + return + by_device = Counter(device for _, _, device in rows) + total = sum(by_device.values()) + for device in ("ANE", "GPU", "CPU", "unknown"): + count = by_device.get(device, 0) + if count == 0: + continue + pct = 100.0 * count / total + print(f" {device}: {count:5d} / {total} ({pct:5.1f}%)") + + if show_non_ane: + non_ane = [(fn, op_name) for fn, op_name, dev in rows if dev != "ANE"] + if non_ane: + print("\n Non-ANE op types:") + for op_name, count in Counter(op for _, op in non_ane).most_common(): + print(f" {count:5d} {op_name}") + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__.splitlines()[0]) + parser.add_argument( + "--model_path", + required=True, + help="Path to a .pte, .mlpackage, or .mlmodelc.", + ) + parser.add_argument( + "--compute_units", + default="cpu_and_ne", + choices=sorted(_COMPUTE_UNIT_CHOICES), + help="Which devices the runtime may use when planning dispatch.", + ) + parser.add_argument( + "--show_non_ane", + action="store_true", + help="List op types that did not get assigned to the ANE.", + ) + args = parser.parse_args() + + compute_units = _COMPUTE_UNIT_CHOICES[args.compute_units] + model_path = args.model_path + + if model_path.endswith(".pte"): + with open(model_path, "rb") as f: + pte_data = f.read() + with tempfile.TemporaryDirectory() as out_dir: + extracted = extract_coreml_models(pte_data, out_dir=out_dir) + if not extracted: + print( + f"{model_path} does not contain any CoreML delegate partitions.", + file=sys.stderr, + ) + return 1 + for path in extracted: + rows = analyze_one(str(path), compute_units) + _print_report(path.name, rows, args.show_non_ane) + else: + rows = analyze_one(model_path, compute_units) + _print_report(os.path.basename(model_path.rstrip("/")), rows, args.show_non_ane) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/examples/apple/coreml/scripts/extract_coreml_models.py b/examples/apple/coreml/scripts/extract_coreml_models.py index 685b6b594f3..8d07ab6b1bf 100644 --- a/examples/apple/coreml/scripts/extract_coreml_models.py +++ b/examples/apple/coreml/scripts/extract_coreml_models.py @@ -9,7 +9,7 @@ import shutil from pathlib import Path -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Union from executorch.backends.apple.coreml import executorchcoreml from executorch.exir._serialize._program import deserialize_pte_binary @@ -20,17 +20,28 @@ ) COREML_BACKEND_ID = "CoreMLBackend" +# JSON references to named_data (multifunction models) are prefixed with this. +_NAMED_DATA_MAGIC = b"CMJR" -def extract_coreml_models(pte_data: bytes): +def extract_coreml_models( + pte_data: bytes, + out_dir: Optional[Union[str, Path]] = None, +) -> List[Path]: + """Extract every Core ML partition embedded in a .pte to ``out_dir``. + + Multifunction models share partitions across delegates via a JSON reference + into ``named_data``; duplicates are deduplicated by that key. ``out_dir`` + defaults to ``./extracted_coreml_models`` (CLI behaviour). Returns the list + of extracted model directories, suitable for passing to + ``MLComputePlan.load_from_path`` or to ``ct.models.MLModel``. + """ + out_root = Path(out_dir) if out_dir is not None else Path("extracted_coreml_models") + out_root.mkdir(parents=True, exist_ok=True) + pte_file = deserialize_pte_binary(pte_data) program = pte_file.program - # Build a map from named_data keys to their data for multifunction model support. - # Multifunction models store a JSON reference in processed_bytes that points to - # the actual model data in named_data. - # After deserialization, pte_file.named_data is a NamedDataStoreOutput containing - # buffers and pte_data (key -> DataEntry mapping). named_data_map: Dict[str, bytes] = {} if pte_file.named_data is not None: for key, data_entry in pte_file.named_data.pte_data.items(): @@ -43,51 +54,38 @@ def extract_coreml_models(pte_data: bytes): delegate for delegate in delegates if delegate.id == COREML_BACKEND_ID ] - # Track extracted models to avoid duplicates (multifunction models share partitions) + extracted_paths: List[Path] = [] extracted_keys: set = set() model_index: int = 1 for coreml_delegate in coreml_delegates: coreml_delegate_data: BackendDelegateDataReference = coreml_delegate.processed + if coreml_delegate_data.location != DataLocation.INLINE: + continue + + raw_bytes = program.backend_delegate_data[coreml_delegate_data.index].data coreml_processed_bytes: Optional[bytes] = None model_name: Optional[str] = None - match coreml_delegate_data.location: - case DataLocation.INLINE: - raw_bytes = program.backend_delegate_data[ - coreml_delegate_data.index - ].data - - # Check if this is a JSON reference to named_data (multifunction models) - # JSON references are prefixed with "CMJR" magic number - MAGIC_NUMBER = b"CMJR" - if raw_bytes.startswith(MAGIC_NUMBER): - # Strip magic number and parse JSON - json_bytes = raw_bytes[len(MAGIC_NUMBER) :] - try: - reference = json.loads(json_bytes.decode("utf-8")) - key = reference.get("key") - if key in extracted_keys: - # Already extracted this partition, skip - continue - if key in named_data_map: - coreml_processed_bytes = named_data_map[key] - model_name = key # Use the key as model name - extracted_keys.add(key) - else: - print( - f"Warning: Named data key '{key}' not found in program" - ) - continue - except (json.JSONDecodeError, UnicodeDecodeError) as e: - print(f"Warning: Failed to parse JSON reference: {e}") - continue - else: - # Not a JSON reference, treat as raw model data (legacy format) - coreml_processed_bytes = raw_bytes - - case _: - AssertionError("The loaded Program must have inline data.") + if raw_bytes.startswith(_NAMED_DATA_MAGIC): + try: + reference = json.loads( + raw_bytes[len(_NAMED_DATA_MAGIC) :].decode("utf-8") + ) + key = reference.get("key") + except (json.JSONDecodeError, UnicodeDecodeError) as e: + print(f"Warning: Failed to parse JSON reference: {e}") + continue + if key in extracted_keys: + continue + if key not in named_data_map: + print(f"Warning: Named data key '{key}' not found in program") + continue + extracted_keys.add(key) + coreml_processed_bytes = named_data_map[key] + model_name = key + else: + coreml_processed_bytes = raw_bytes if coreml_processed_bytes is None: continue @@ -95,19 +93,19 @@ def extract_coreml_models(pte_data: bytes): if model_name is None: model_name = f"model_{model_index}" - model_path: Path = Path() / "extracted_coreml_models" / model_name + model_path = out_root / model_name if model_path.exists(): - shutil.rmtree(model_path.absolute()) - os.makedirs(model_path.absolute()) + shutil.rmtree(model_path) + model_path.mkdir(parents=True) if executorchcoreml.unflatten_directory_contents( coreml_processed_bytes, str(model_path.absolute()) ): - print(f"Core ML models are extracted and saved to path = {model_path}") + extracted_paths.append(model_path) + model_index += 1 - if len(coreml_delegates) == 0: - print("The model isn't delegated to Core ML.") + return extracted_paths def main() -> None: @@ -127,7 +125,13 @@ def main() -> None: model_path = str(args.model_path) with open(model_path, mode="rb") as pte_file: pte_data = pte_file.read() - extract_coreml_models(pte_data) + extracted_paths = extract_coreml_models(pte_data) + + if extracted_paths: + for path in extracted_paths: + print(f"Core ML models are extracted and saved to path = {path}") + else: + print("The model isn't delegated to Core ML.") if __name__ == "__main__": diff --git a/examples/apple/coreml/scripts/test_coreml_compute_plan.py b/examples/apple/coreml/scripts/test_coreml_compute_plan.py new file mode 100644 index 00000000000..dc9eb20fa99 --- /dev/null +++ b/examples/apple/coreml/scripts/test_coreml_compute_plan.py @@ -0,0 +1,165 @@ +# Copyright © 2026 Apple Inc. All rights reserved. +# +# Please refer to the license found in the LICENSE file in the root directory of the source tree. + +"""Tests for coreml_compute_plan.py.""" + +import os +import shutil +import tempfile +import unittest +from collections import Counter + +import coremltools as ct +import torch +from coremltools.models.utils import MultiFunctionDescriptor, save_multifunction + +from executorch.examples.apple.coreml.scripts.coreml_compute_plan import ( + _COMPUTE_UNIT_CHOICES, + _device_name, + analyze_one, +) + + +class _Op: + def __init__(self, operator_name: str, blocks=None): + self.operator_name = operator_name + self.blocks = blocks or [] + + +class _Block: + def __init__(self, ops): + self.operations = ops + + +def _build_small_mlpackage(out_dir: str) -> str: + class M(torch.nn.Module): + def forward(self, x): + return torch.nn.functional.relu(x @ x.T) + x.sum() + + model = M().eval() + ep = torch.export.export(model, (torch.randn(8, 8),), strict=True) + ep = ep.run_decompositions({}) + mlmodel = ct.convert( + ep, + source="pytorch", + convert_to="mlprogram", + minimum_deployment_target=ct.target.iOS17, + skip_model_load=True, + ) + out = os.path.join(out_dir, "tiny.mlpackage") + mlmodel.save(out) + return out + + +class TestDeviceName(unittest.TestCase): + def test_none_device(self): + self.assertEqual(_device_name(None), "unknown") + + def test_known_device_classes(self): + from coremltools.models.compute_device import ( + MLCPUComputeDevice, + MLGPUComputeDevice, + MLNeuralEngineComputeDevice, + ) + + # Don't construct the device classes directly (they wrap proxies that + # may be unavailable in some envs); just confirm the type-mapping path + # returns sensible names by mocking the isinstance check with a fake. + class FakeNE(MLNeuralEngineComputeDevice): + def __init__(self): + pass + + self.assertEqual(_device_name(FakeNE()), "ANE") + + +class TestComputeUnitChoices(unittest.TestCase): + def test_includes_cpu_and_ne(self): + self.assertEqual( + _COMPUTE_UNIT_CHOICES["cpu_and_ne"], ct.ComputeUnit.CPU_AND_NE + ) + + def test_includes_all(self): + self.assertEqual(_COMPUTE_UNIT_CHOICES["all"], ct.ComputeUnit.ALL) + + +class TestAnalyzeOne(unittest.TestCase): + """End-to-end: build a tiny mlpackage and analyze it.""" + + @classmethod + def setUpClass(cls): + cls.tmpdir = tempfile.mkdtemp() + cls.mlpackage = _build_small_mlpackage(cls.tmpdir) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tmpdir, ignore_errors=True) + + def test_returns_rows_for_dispatched_ops(self): + rows = analyze_one(self.mlpackage, ct.ComputeUnit.CPU_AND_NE) + self.assertGreater(len(rows), 0, "expected at least one dispatched op") + # Every row is (function_name, operator_name, device_name). + for fname, op_name, device in rows: + self.assertIsInstance(fname, str) + self.assertIsInstance(op_name, str) + self.assertIn(device, {"ANE", "GPU", "CPU", "unknown"}) + + def test_main_function_present(self): + rows = analyze_one(self.mlpackage, ct.ComputeUnit.CPU_ONLY) + self.assertIn("main", {fname for fname, _, _ in rows}) + + def test_op_types_for_relu_matmul_model(self): + # The toy model is `relu(x @ x.T) + x.sum()` so the lowered MIL + # should at least contain matmul, relu, add and reduce_sum. + rows = analyze_one(self.mlpackage, ct.ComputeUnit.CPU_ONLY) + op_types = Counter(op for _, op, _ in rows) + # Op names are versioned (e.g. "ios17.matmul"), so match by suffix. + suffixes = {name.split(".")[-1] for name in op_types} + for expected in ("matmul", "relu", "add", "reduce_sum"): + self.assertIn(expected, suffixes, f"missing op {expected}: {suffixes}") + + +class TestAnalyzeOneMultifunction(unittest.TestCase): + """Verify analyze_one walks every function of a multifunction .mlpackage. + + coremltools 9.0's MLComputePlan.load_from_path only exposes usage for + the default function, so analyze_one re-projects each function through + MultiFunctionDescriptor to surface plans for the rest. + """ + + @classmethod + def setUpClass(cls): + cls.tmpdir = tempfile.mkdtemp() + single = _build_small_mlpackage(cls.tmpdir) + desc = MultiFunctionDescriptor() + desc.add_function( + single, src_function_name="main", target_function_name="prefill" + ) + desc.add_function( + single, src_function_name="main", target_function_name="decode" + ) + desc.default_function_name = "prefill" + cls.multi = os.path.join(cls.tmpdir, "multi.mlpackage") + save_multifunction(desc, cls.multi) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tmpdir, ignore_errors=True) + + def test_reports_every_function(self): + rows = analyze_one(self.multi, ct.ComputeUnit.CPU_ONLY) + fnames = {fname for fname, _, _ in rows} + self.assertEqual(fnames, {"prefill", "decode"}) + + def test_each_function_lowers_the_same_ops(self): + rows = analyze_one(self.multi, ct.ComputeUnit.CPU_ONLY) + per_fn: dict = {} + for fname, op_name, _ in rows: + per_fn.setdefault(fname, set()).add(op_name.split(".")[-1]) + for fname in ("prefill", "decode"): + self.assertIn("matmul", per_fn.get(fname, set()), f"{fname} missing matmul") + self.assertIn("relu", per_fn.get(fname, set()), f"{fname} missing relu") + + +if __name__ == "__main__": + unittest.main()