diff --git a/.github/workflows/mps_cli_py_build.yaml b/.github/workflows/mps_cli_py_build.yaml index 908b5bb..78c16a5 100644 --- a/.github/workflows/mps_cli_py_build.yaml +++ b/.github/workflows/mps_cli_py_build.yaml @@ -25,8 +25,9 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip + cd mps-cli-py + pip install -e . pip install parameterized - # pip install -r requirements.txt - name: Test with unittest run: | cd mps-cli-py diff --git a/mps-cli-py/Readme.md b/mps-cli-py/README.md similarity index 87% rename from mps-cli-py/Readme.md rename to mps-cli-py/README.md index 7553859..317dcc7 100644 --- a/mps-cli-py/Readme.md +++ b/mps-cli-py/README.md @@ -2,12 +2,18 @@ This project provides a Python library which parses MPS files and builds the object model. +### Requirements + +- numpy >= 1.20 + ### Features The following features are available: - load MPS files (*.mpsr, *.mps, *.mpb, *.jar) and expose their content as Python object model - solutions, models, root nodes, nodes, children, references, properties - extract the meta-information and expose it as Python object model - list of languages, their concepts with information about properties, references, children +- JAR files are read directly without extracting to disk +- Parsed models are cached in '~/.mps_cli_cache' directory and invalidated automatically when JARs change on disk The core of the Python object model is given by the following classes: - `SNode` - represents a node diff --git a/mps-cli-py/pyproject.toml b/mps-cli-py/pyproject.toml index cc959c2..0f3ff74 100644 --- a/mps-cli-py/pyproject.toml +++ b/mps-cli-py/pyproject.toml @@ -18,6 +18,12 @@ classifiers = [ "License :: OSI Approved :: Eclipse Public License 2.0 (EPL-2.0)", "Operating System :: OS Independent", ] +dependencies = [ + "numpy>=1.20", +] + +[project.optional-dependencies] +fast = ["lxml>=4.0"] [project.urls] "Homepage" = "https://github.com/mbeddr/mps-cli" diff --git a/mps-cli-py/src/mpscli/demo.py b/mps-cli-py/src/mpscli/demo.py index a11f6f2..219b99b 100644 --- a/mps-cli-py/src/mpscli/demo.py +++ b/mps-cli-py/src/mpscli/demo.py @@ -9,6 +9,7 @@ MODE = "test_project": parse a single mps_test_projects directory (original behaviour) """ +import logging import sys import time from datetime import datetime @@ -283,6 +284,7 @@ def main() -> None: for c in lan.concepts: c.print_concept_details() + if __name__ == "__main__": import multiprocessing @@ -293,4 +295,13 @@ def main() -> None: sys.stdout = open(log_file, "w", encoding="utf-8", buffering=1) print(f"Logging to: {log_file}") + # route library log output (each parse phasee timings and cache progress) to stderr so they appear on + # console alongside the summary lines... + # format leaves out timestamps and level names to keep the output clean + logging.basicConfig( + stream=sys.stderr, + level=logging.INFO, + format="%(message)s", + ) + main() diff --git a/mps-cli-py/src/mpscli/demo_language_extraction.py b/mps-cli-py/src/mpscli/demo_language_extraction.py index 00e7193..191d19b 100644 --- a/mps-cli-py/src/mpscli/demo_language_extraction.py +++ b/mps-cli-py/src/mpscli/demo_language_extraction.py @@ -5,13 +5,14 @@ """ import sys +import time from datetime import datetime from pathlib import Path sys.path.insert(1, "..") MODE = "plugins" -PLUGINS_PATH = r"C:\Temp\plugins" +PLUGINS_PATH = r"C:\Users\emb-venkpri\vemb\arch" TEST_PROJECT = r"..\..\mps_test_projects\mps_cli_binary_persistency_language" OUTPUT_FILE = Path(f"language_concepts_{datetime.now().strftime('%H%M%S')}.md") @@ -224,16 +225,27 @@ def build_markdown(languages_with_structure: list) -> str: def main(): path = PLUGINS_PATH if MODE == "plugins" else TEST_PROJECT - print(f"Parsing ({MODE}): {path}", flush=True) + msg = f"Parsing ({MODE}): {path}" + print(msg, flush=True) + sys.stderr.write(msg + "\n") + sys.stderr.flush() + + t0 = time.perf_counter() builder = SSolutionsRepositoryBuilder() - builder.USE_CACHE = False builder.build(path) + elapsed = time.perf_counter() - t0 + + timing_msg = f"Parsing complete in {elapsed:.1f}s" + print(f"\n{timing_msg}") + sys.stderr.write(timing_msg + "\n") + sys.stderr.flush() languages_with_structure = collect_languages_with_structure() - print( - f"Languages with structure aspect: {len(languages_with_structure)}", flush=True - ) + lang_msg = f"Languages with structure aspect: {len(languages_with_structure)}" + print(lang_msg, flush=True) + sys.stderr.write(lang_msg + "\n") + sys.stderr.flush() for lang, structure in languages_with_structure: print(f"\n{'=' * 60}") @@ -258,8 +270,22 @@ def main(): md = build_markdown(languages_with_structure) OUTPUT_FILE.write_text(md, encoding="utf-8") - print(f"\nmarkdown file written: {OUTPUT_FILE}", flush=True) + done_msg = f"markdown file written: {OUTPUT_FILE}" + print(f"\n{done_msg}", flush=True) + sys.stderr.write(done_msg + "\n") + sys.stderr.flush() if __name__ == "__main__": + import multiprocessing + + multiprocessing.freeze_support() + + # redirect stdout to a log file so all output goess to disk.. + log_file = f"lang_extraction_{datetime.now().strftime('%H%M%S')}.log" + sys.stdout = open(log_file, "w", encoding="utf-8", buffering=1) + print(f"Logging to: {log_file}") + sys.stderr.write(f"Logging to: {log_file}\n") + sys.stderr.flush() + main() diff --git a/mps-cli-py/src/mpscli/model/SConcept.py b/mps-cli-py/src/mpscli/model/SConcept.py index e132b22..bd56164 100644 --- a/mps-cli-py/src/mpscli/model/SConcept.py +++ b/mps-cli-py/src/mpscli/model/SConcept.py @@ -1,3 +1,8 @@ +import logging + +_log = logging.getLogger(__name__) + + class SConcept: def __init__(self, name, uuid): @@ -8,14 +13,14 @@ def __init__(self, name, uuid): self.references = [] def print_concept_details(self): - print("concept: " + self.name) - print("\tproperties: ") + _log.debug("concept: %s", self.name) + _log.debug("\tproperties: ") for property in self.properties: - print("\t\t" + property) - print("\tchildren: ") + _log.debug("\t\t%s", property) + _log.debug("\tchildren: ") for child in self.children: - print("\t\t" + child) - print("\treferences: ") + _log.debug("\t\t%s", child) + _log.debug("\treferences: ") for reference in self.references: - print("\t\t" + reference) - print("<<<") + _log.debug("\t\t%s", reference) + _log.debug("<<<") diff --git a/mps-cli-py/src/mpscli/model/SLanguage.py b/mps-cli-py/src/mpscli/model/SLanguage.py index 099ff1b..0a7ad1e 100644 --- a/mps-cli-py/src/mpscli/model/SLanguage.py +++ b/mps-cli-py/src/mpscli/model/SLanguage.py @@ -5,6 +5,9 @@ def __init__(self, name, uuid): self.uuid = uuid self.concepts = [] + # internal dict for concept lookup by name + self._concepts_by_name = {} + # version number from the languageVersion attribute in the .mpl file.. # stays 0 if this language was only seen via registry (never had its .mpl read) self.language_version = 0 @@ -15,10 +18,8 @@ def __init__(self, name, uuid): self.models = [] def find_concept_by_name(self, name): - for c in self.concepts: - if c.name == name: - return c - return None + # lookupp via internal dict + return self._concepts_by_name.get(name) def find_model_by_name(self, suffix): # find a specific aspect model by the last segment of its name. diff --git a/mps-cli-py/src/mpscli/model/SModel.py b/mps-cli-py/src/mpscli/model/SModel.py index d54ab04..e4961d4 100644 --- a/mps-cli-py/src/mpscli/model/SModel.py +++ b/mps-cli-py/src/mpscli/model/SModel.py @@ -1,24 +1,326 @@ +import numpy as np + +from mpscli.model.SNode import SNode +from mpscli.model.builder.utils.FlatModelPacker import ( + pack_strings, + pack_properties, + pack_references, +) class SModel: + # Two-phase designn: build then finalize + # + # During parsing, _add_node/_set_property/_set_reference fill plain Python list buffers (one entry per node). + # Once parsing is done, _finalize() converts those buffers into flat numpy arrays and frees the buffers. + # + # The DFS pre-order guarante: every parser allocates a parent node before its children meaning + # parent_buf[child_idx] < child_idx always holds true and _finalize() exploits this to build + # first_child/next_sibling in a single forward pass without sorting or recursion. + # + # After finalize, SNode objects are thin views meaning they hold (model, idx) and read everything from the arrays + # below via _get_node/_read_bytes. SNode objects are created on demand and cached by index so that any two + # lookups for the same node always return the same Python object so this means the code that stores a + # node reference and later compares it with another lookup will get the correct result. + # + # All arrays are saved to disk as .npz and loaded with mmap_mode='r' by ParseCache, so warm runs map the + # file into virtual memory without reading bytes into RAM. Strings (uuids, property values) are packed as a flat + # bytes blob with an int32 offset array: _read_bytes decodes a slice. + # Properties and references use offset array (prop_start, ref_start) so so variable-length per-node data fits in + # fixed-shape arrays. + # + # See storage layout below: + # _np_concept_idxs: index into _concepts_table + # _np_role_idxs: index into _roles_table, -1 = no role + # _np_parent_idxs: parent node index, -1 = root + # _np_first_child: first child index, -1 = leaf + # _np_next_sibling: next sibling index, -1 = last child + # _uuid_offsets: byte offsets into _uuid_data + # _uuid_data: packed UTF-8 uuid strings + # _prop_start: slice bounds into property arrays + # _prop_key_idxs: index into _prop_keys per property + # _prop_val_offsets: byte offsets into _prop_val_data + # _prop_val_data: packed UTF-8 property values + # _prop_keys: deduplicated key strings (100-200 entries) + # _ref_start: slice bounds into reference arrays + # _ref_key_idxs: index into _ref_keys per reference + # _ref_model_idxs: index into _ref_model_uuids per reference + # _ref_node_uuid_offsets: byte offsets into _ref_node_uuid_data + # _ref_node_uuid_data: packed node uuid strings + # _ref_resolve_offsets: byte offsets into _ref_resolve_data + # _ref_resolve_data: packed resolve_info strings + # _ref_keys: deduplicated role name strings + # _ref_model_uuids: deduplicated model uuid strings def __init__(self, name, uuid, is_do_not_generate): self.name = name self.uuid = uuid - self.root_nodes = [] - self.path_to_model_file = "" self.is_do_not_generate = is_do_not_generate - self.uuid_2_nodes = {} + self.path_to_model_file = "" + + # one entry per node and is grown during parsing then freed after _finalize() + self._uuids = [] + self._concept_buf = [] + self._role_buf = [] + self._parent_buf = [] + self._properties = [] + self._references = [] + + # concept and role dedup tables + # each unique concept/role is stored once and nodes store an int32 index.. + # _concepts_map keys are id(concept) so we do not rely on __eq__ + self._concepts_table = [] + self._concepts_map = {} + self._roles_table = [] + self._roles_map = {} + + # root node indices collected during parse and used by _build_root_nodes() + self._root_idxs = [] + + # {idx: SNode}: ensures the same object is returned for the same node + self._node_cache = {} + + # these three dicts are built lazily on first use and never serialized + self._uuid_index = None + self._prop_key_lookup = None + self._ref_key_lookup = None + + # flat numpy arrays, all None until _finalize() is called + self._np_concept_idxs = None + self._np_role_idxs = None + self._np_parent_idxs = None + self._np_first_child = None + self._np_next_sibling = None + self._uuid_offsets = None + self._uuid_data = None + self._prop_start = None + self._prop_key_idxs = None + self._prop_val_offsets = None + self._prop_val_data = None + self._prop_keys = None + self._ref_start = None + self._ref_key_idxs = None + self._ref_model_idxs = None + self._ref_node_uuid_offsets = None + self._ref_node_uuid_data = None + self._ref_resolve_offsets = None + self._ref_resolve_data = None + self._ref_keys = None + self._ref_model_uuids = None + + self._finalized = False + + # populatedd by _build_root_nodes() at the end of _finalize() + self.root_nodes = [] + + # ------------------------------------------------------------------ + # builder API - called by parsers during the build phase + # ------------------------------------------------------------------ + + def _add_node(self, uuid, concept, role, parent_idx): + # allocates a new node slot and returns its integer index where parent_idx is an int or None (None means this is a root node).. + # + # some low-level tests call read_node() on raw bytes after build() has already finalized the model and they + # need a builder context but do not really maybe care about the full model and to support this + # re-entering build phase after _finalize() is allowed so tat buffers are restored and the new nodes + # are not reflected in the finalized arrays + if self._concept_buf is None: + self._concept_buf = [] + self._role_buf = [] + self._parent_buf = [] + self._properties = [] + self._references = [] + self._node_cache = {} + if self._uuids is None: + self._uuids = [] + idx = len(self._uuids) + self._uuids.append(uuid) + + # store concept by index into _concepts_table to avoid duplicating the same SConcept object + # across all nodes that share a concept type + key = id(concept) + if key not in self._concepts_map: + self._concepts_map[key] = len(self._concepts_table) + self._concepts_table.append(concept) + self._concept_buf.append(self._concepts_map[key]) + + # same dedup approach for role strings + if role is None: + self._role_buf.append(-1) + else: + if role not in self._roles_map: + self._roles_map[role] = len(self._roles_table) + self._roles_table.append(role) + self._role_buf.append(self._roles_map[role]) + + self._parent_buf.append(-1 if parent_idx is None else parent_idx) + self._properties.append({}) + self._references.append({}) + return idx + + def _set_property(self, idx, key, value): + self._properties[idx][key] = value + + def _set_reference(self, idx, key, ref): + self._references[idx][key] = ref + + # ------------------------------------------------------------------ + # finalize - called once by each parser after all nodes are built + # ------------------------------------------------------------------ + def _finalize(self): + if self._finalized: + return + n = len(self._uuids) + empty = np.empty(0, dtype=np.int32) + + # models with no nodes still need valid (empty) arrays so that _read_bytes and slice logic work + # without None checks + if n == 0: + self._np_concept_idxs = empty + self._np_role_idxs = empty + self._np_parent_idxs = empty + self._np_first_child = empty + self._np_next_sibling = empty + self._uuid_offsets = np.zeros(1, dtype=np.int32) + self._uuid_data = b"" + self._prop_start = np.zeros(1, dtype=np.int32) + self._prop_key_idxs = empty + self._prop_val_offsets = np.zeros(1, dtype=np.int32) + self._prop_val_data = b"" + self._prop_keys = [] + self._ref_start = np.zeros(1, dtype=np.int32) + self._ref_key_idxs = empty + self._ref_model_idxs = empty + self._ref_node_uuid_offsets = np.zeros(1, dtype=np.int32) + self._ref_node_uuid_data = b"" + self._ref_resolve_offsets = np.zeros(1, dtype=np.int32) + self._ref_resolve_data = b"" + self._ref_keys = [] + self._ref_model_uuids = [] + self._finalized = True + self._build_root_nodes() + return + + # structural int32 arrays - one value per node + self._np_concept_idxs = np.array(self._concept_buf, dtype=np.int32) + self._np_role_idxs = np.array(self._role_buf, dtype=np.int32) + self._np_parent_idxs = np.array(self._parent_buf, dtype=np.int32) + + # build first_child/next_sibling linked lists in one forward pass and also parsers allocate a parent + # before any of its children + first_child = np.full(n, -1, dtype=np.int32) + next_sibling = np.full(n, -1, dtype=np.int32) + last_child = np.full(n, -1, dtype=np.int32) + + parent_buf = self._parent_buf + for ci in range(n): + pi = parent_buf[ci] + if pi == -1: + continue + if first_child[pi] == -1: + first_child[pi] = ci + else: + next_sibling[last_child[pi]] = ci + last_child[pi] = ci + + self._np_first_child = first_child + self._np_next_sibling = next_sibling + + # pack strings, properties and references - see FlatModelPacker for format details + self._uuid_offsets, self._uuid_data = pack_strings( + [u or "" for u in self._uuids] + ) + ( + self._prop_start, + self._prop_key_idxs, + self._prop_val_offsets, + self._prop_val_data, + self._prop_keys, + ) = pack_properties(self._properties) + + ( + self._ref_start, + self._ref_key_idxs, + self._ref_model_idxs, + self._ref_node_uuid_offsets, + self._ref_node_uuid_data, + self._ref_resolve_offsets, + self._ref_resolve_data, + self._ref_keys, + self._ref_model_uuids, + ) = pack_references(self._references) + + # all data is now in flat arrays so free the build buffers + self._uuids = None + self._concept_buf = None + self._role_buf = None + self._parent_buf = None + self._properties = None + self._references = None + self._concepts_map = {} + self._roles_map = {} + + self._finalized = True + self._build_root_nodes() + + def _build_root_nodes(self): + # SNode objects are created on demand where root_nodes are the entry points so that callers hold + # onto directly and we create them here rather than waiting for a traversal to trigger lazy creation + self.root_nodes = [self._get_node(i) for i in self._root_idxs] + + def _get_node(self, idx): + # returns the same SNode instance every time for a given index so that node identity is stable - two lookups for node 42 always give back + # the same Python object since the first call creates and caches it.. + node = self._node_cache.get(idx) + if node is None: + + node = SNode(self, idx) + self._node_cache[idx] = node + return node + + def _read_bytes(self, data, offsets, idx): + # reads one packed string entry from a bytes blob and offsets[idx] and offsets[idx+1] give the + # start and end byte positions. numpy uint8 slices need .tobytes() before decode but plain Python bytes + # slices decode directly. + start = int(offsets[idx]) + end = int(offsets[idx + 1]) + if start == end: + return "" + chunk = data[start:end] + if hasattr(chunk, "tobytes"): + return chunk.tobytes().decode() + return chunk.decode() + + def _get_prop_key_idx(self, key): + # builds the reverse lookup dict on first call, then returns the index of the given key in _prop_keys, + # or -1 if not found + if self._prop_key_lookup is None: + self._prop_key_lookup = {k: i for i, k in enumerate(self._prop_keys)} + return self._prop_key_lookup.get(key, -1) + + def _get_ref_key_idx(self, role): + # same as _get_prop_key_idx but for reference role names + if self._ref_key_lookup is None: + self._ref_key_lookup = {k: i for i, k in enumerate(self._ref_keys)} + return self._ref_key_lookup.get(role, -1) def get_nodes(self): res = [] - for r in self.root_nodes: - res.append(r) - res.extend(r.get_descendants()) + for root in self.root_nodes: + res.append(root) + res.extend(root.get_descendants()) return res - def get_node_by_uuid(self, uuid): - if len(self.uuid_2_nodes) == 0: - for n in self.get_nodes(): - self.uuid_2_nodes[n.uuid] = n - return self.uuid_2_nodes[uuid] \ No newline at end of file + def get_node_by_uuid(self, target_uuid): + # builds a uuid -> index dict on first call is most likely O(n) and then O(1) for every subsequent call I + # guess. + if self._uuid_index is None: + n = len(self._uuid_offsets) - 1 + self._uuid_index = { + self._read_bytes(self._uuid_data, self._uuid_offsets, i): i + for i in range(n) + } + idx = self._uuid_index.get(target_uuid) + if idx is None: + raise KeyError(target_uuid) + return self._get_node(idx) diff --git a/mps-cli-py/src/mpscli/model/SNode.py b/mps-cli-py/src/mpscli/model/SNode.py index 1c0ac59..ba24bc0 100644 --- a/mps-cli-py/src/mpscli/model/SNode.py +++ b/mps-cli-py/src/mpscli/model/SNode.py @@ -1,31 +1,141 @@ - class SNode: + # A node view that holds only a reference to its model and an integer index.. + # All data (uuid, concept, properties, children) lives in SModel's flat numpy arrays and is read on demand. + # So basicallyy nothing is stored in the node itself + __slots__ = ("_model", "_idx") + + def __init__(self, model, idx): + self._model = model + self._idx = idx + + def __eq__(self, other): + if other is None: + return False + if not isinstance(other, SNode): + return NotImplemented + return self._model is other._model and self._idx == other._idx + + def __hash__(self): + # model identity via id() is stable for the lifetime of a parse run + return hash((id(self._model), self._idx)) + + @property + def uuid(self): + m = self._model + return m._read_bytes(m._uuid_data, m._uuid_offsets, self._idx) + + @property + def concept(self): + return self._model._concepts_table[int(self._model._np_concept_idxs[self._idx])] + + @property + def role_in_parent(self): + r = int(self._model._np_role_idxs[self._idx]) + return None if r == -1 else self._model._roles_table[r] - def __init__(self, uuid, concept, role_in_parent, parent): - self.uuid = uuid - self.concept = concept - self.role_in_parent = role_in_parent - self.properties = {} - self.references = {} - self.children = [] - self.parent = parent + @property + def parent(self): + p = int(self._model._np_parent_idxs[self._idx]) + return None if p == -1 else self._model._get_node(p) + + @property + def properties(self): + # builds a fresh dict from this node's property slice in the model arrays + # if we need only one property then I guess get_property() is faster because it scans the slice directly + # without building the full dict + m = self._model + i = self._idx + start = int(m._prop_start[i]) + end = int(m._prop_start[i + 1]) + result = {} + for j in range(start, end): + key = m._prop_keys[int(m._prop_key_idxs[j])] + val = m._read_bytes(m._prop_val_data, m._prop_val_offsets, j) + result[key] = val + return result + + @property + def references(self): + # builds a fresh dict from this node's reference slice in the model arrays + from mpscli.model.SNodeRef import SNodeRef + + m = self._model + i = self._idx + start = int(m._ref_start[i]) + end = int(m._ref_start[i + 1]) + result = {} + for j in range(start, end): + role = m._ref_keys[int(m._ref_key_idxs[j])] + model_uuid = m._ref_model_uuids[int(m._ref_model_idxs[j])] + node_uuid = m._read_bytes( + m._ref_node_uuid_data, m._ref_node_uuid_offsets, j + ) + resolve = m._read_bytes(m._ref_resolve_data, m._ref_resolve_offsets, j) + result[role] = SNodeRef( + model_uuid if model_uuid else None, + node_uuid if node_uuid else None, + resolve if resolve else None, + ) + return result + + @property + def children(self): + model = self._model + result = [] + ci = int(model._np_first_child[self._idx]) + while ci != -1: + result.append(model._get_node(ci)) + ci = int(model._np_next_sibling[ci]) + return result def get_property(self, name): - return self.properties.get(name) + # looks up the key index once via the model's dedup table andd then scans only this node's property + # slice so I think typically 0-3 entries + m = self._model + i = self._idx + key_idx = m._get_prop_key_idx(name) + if key_idx == -1: + return None + start = int(m._prop_start[i]) + end = int(m._prop_start[i + 1]) + for j in range(start, end): + if int(m._prop_key_idxs[j]) == key_idx: + return m._read_bytes(m._prop_val_data, m._prop_val_offsets, j) + return None def get_reference(self, name): - return self.references[name] + from mpscli.model.SNodeRef import SNodeRef + + m = self._model + i = self._idx + key_idx = m._get_ref_key_idx(name) + if key_idx == -1: + raise KeyError(name) + start = int(m._ref_start[i]) + end = int(m._ref_start[i + 1]) + for j in range(start, end): + if int(m._ref_key_idxs[j]) == key_idx: + model_uuid = m._ref_model_uuids[int(m._ref_model_idxs[j])] + node_uuid = m._read_bytes( + m._ref_node_uuid_data, m._ref_node_uuid_offsets, j + ) + resolve = m._read_bytes(m._ref_resolve_data, m._ref_resolve_offsets, j) + return SNodeRef( + model_uuid if model_uuid else None, + node_uuid if node_uuid else None, + resolve if resolve else None, + ) + raise KeyError(name) def get_children(self, role): - return list(filter(lambda c : c.role_in_parent == role, self.children)) + return [c for c in self.children if c.role_in_parent == role] def get_descendants(self): res = [] - self.__do_collect_descendants(self, res) + self._collect_descendants(res) return res - def __do_collect_descendants(self, node, res): - res.extend(node.children) - for c in node.children: - self.__do_collect_descendants(c, res) - + def _collect_descendants(self, res): + for child in self.children: + res.append(child) + child._collect_descendants(res) diff --git a/mps-cli-py/src/mpscli/model/builder/SLanguageBuilder.py b/mps-cli-py/src/mpscli/model/builder/SLanguageBuilder.py index 158ddf8..c9beda3 100644 --- a/mps-cli-py/src/mpscli/model/builder/SLanguageBuilder.py +++ b/mps-cli-py/src/mpscli/model/builder/SLanguageBuilder.py @@ -1,5 +1,7 @@ +import warnings import xml.etree.ElementTree as ET from pathlib import Path +from threading import Lock from mpscli.model.SLanguage import SLanguage from mpscli.model.SConcept import SConcept @@ -7,21 +9,36 @@ class SLanguageBuilder: languages = {} + # protects languages dict and per-language _concepts_by_name dicts.. + # needed because get_concept is called from multiple threads during warm cache loads so.. + _lock = Lock() @classmethod def get_language(cls, name, uuid): - lan = cls.languages.get(name, None) - if lan is None: - lan = SLanguage(name, uuid) - cls.languages[name] = lan + # dict reads are GIL atomic I think so no lock needed on the fast path + lan = cls.languages.get(name) + if lan is not None: + return lan + with cls._lock: + lan = cls.languages.get(name) + if lan is None: + lan = SLanguage(name, uuid) + cls.languages[name] = lan return lan @classmethod def get_concept(cls, language, concept_name, concept_uuid): - concept = next((c for c in language.concepts if c.name == concept_name), None) - if concept is None: - concept = SConcept(concept_name, concept_uuid) - language.concepts.append(concept) + if not hasattr(language, "_concepts_by_name"): + language._concepts_by_name = {c.name: c for c in language.concepts} + concept = language._concepts_by_name.get(concept_name) + if concept is not None: + return concept + with cls._lock: + concept = language._concepts_by_name.get(concept_name) + if concept is None: + concept = SConcept(concept_name, concept_uuid) + language.concepts.append(concept) + language._concepts_by_name[concept_name] = concept return concept @classmethod @@ -54,39 +71,34 @@ def get_reference(cls, concept, reference_name): @classmethod def load_from_mpl(cls, mpl_path: Path) -> SLanguage: - # read a .mpl file and populayte the matching SLanguage with version and aspect models - # if the language was already registered via registry (from .mpb parsing), we - # update that same object so basically no duplicates are created.. + # reads a .mpl file and improves the matching SLanguage with its version number and aspect models.. + # If the language was already registered via a registry section in a .mpb file, get_language() returns + # the same object so no duplicate entries are created I guess.. try: return cls._read_and_enrich(mpl_path) except Exception as exc: - import warnings - warnings.warn(f"Failed to read language from {mpl_path.name}: {exc}") return None @classmethod def _read_and_enrich(cls, mpl_path: Path) -> SLanguage: root = ET.parse(mpl_path).getroot() - namespace = root.get("namespace", "") uuid = root.get("uuid", "") version = int(root.get("languageVersion", "0")) - - # get_language does get or create so this safely merges with any already registered entry lang = cls.get_language(namespace, uuid) lang.language_version = version lang.models = cls._load_aspect_models(mpl_path.parent / "models") - return lang @classmethod def _load_aspect_models(cls, models_dir: Path) -> list: - # parse every .mpb in the models directory next to the .mpl file - # these are the language aspects which is structure,, behavior, editor, constraints, typesystem, etc.. + # parses every .mpb in the models directory next to the .mpl file. + # these are the language aspect models such as the structure, behavior, editor, constraints, typesystem etc. + # import is kept local to avoid pulling SModelBuilderBinaryPersistency and all its binary parsing + # dependencies into the module at load time.. if not models_dir.exists(): return [] - from mpscli.model.builder.SModelBuilderBinaryPersistency import ( SModelBuilderBinaryPersistency, ) @@ -98,8 +110,5 @@ def _load_aspect_models(cls, models_dir: Path) -> list: if model is not None: loaded.append(model) except Exception as exc: - import warnings - warnings.warn(f"Failed to parse aspect model {mpb_file.name}: {exc}") - return loaded diff --git a/mps-cli-py/src/mpscli/model/builder/SModelBuilderBase.py b/mps-cli-py/src/mpscli/model/builder/SModelBuilderBase.py index 1230b90..2f66bd0 100644 --- a/mps-cli-py/src/mpscli/model/builder/SModelBuilderBase.py +++ b/mps-cli-py/src/mpscli/model/builder/SModelBuilderBase.py @@ -13,20 +13,24 @@ def __init__(self): self.index_2_reference_role = {} self.index_2_imported_model_uuid = {} - def extract_node(self, my_model, node_xml, parent): - root_node_id = node_xml.get("id") - root_node_concept = self.index_2_concept[node_xml.get("concept")] + def extract_node(self, my_model, node_xml, parent_node): + # allocate node in model's flat arrays - no SNode object constructed here. + # returns an SNode view (2-slot wrapperr around model+idx) + node_id = node_xml.get("id") + concept = self.index_2_concept[node_xml.get("concept")] child_role_index = node_xml.get("role") - if child_role_index is None: - child_role = None - else: - child_role = self.index_2_child_role_in_parent[child_role_index] - s_node = SNode(root_node_id, root_node_concept, child_role, parent) + role = ( + None + if child_role_index is None + else self.index_2_child_role_in_parent[child_role_index] + ) + parent_idx = None if parent_node is None else parent_node._idx + idx = my_model._add_node(node_id, concept, role, parent_idx) + for property_xml_node in node_xml.findall("property"): - property_role = property_xml_node.get("role") - property_value = property_xml_node.get("value") - property_name = self.index_2_property[property_role] - s_node.properties[property_name] = property_value + prop_name = self.index_2_property[property_xml_node.get("role")] + my_model._set_property(idx, prop_name, property_xml_node.get("value")) + for ref_xml_node in node_xml.findall("ref"): ref_role = ref_xml_node.get("role") ref_to = ref_xml_node.get("to") @@ -34,20 +38,30 @@ def extract_node(self, my_model, node_xml, parent): ref_node_uuid = ref_xml_node.get("node") s_node_ref = SNodeRef(my_model.uuid, ref_node_uuid) else: - ref_model_index = ref_to[0 : ref_to.find(":")] - ref_node_uuid = ref_to[ref_to.find(":") + 1 : len(ref_to)] - s_node_ref = SNodeRef(self.index_2_imported_model_uuid[ref_model_index], ref_node_uuid) - ref_name = self.index_2_reference_role[ref_role] - s_node.references[ref_name] = s_node_ref + sep = ref_to.find(":") + ref_model_index = ref_to[:sep] + ref_node_uuid = ref_to[sep + 1 :] + s_node_ref = SNodeRef( + self.index_2_imported_model_uuid[ref_model_index], ref_node_uuid + ) + my_model._set_reference( + idx, self.index_2_reference_role[ref_role], s_node_ref + ) + + node = SNode(my_model, idx) for child_node_xml in node_xml.findall("node"): - child_node = self.extract_node(my_model, child_node_xml, s_node) - s_node.children.append(child_node) + # recursive call - child registers its parent_idx via _add_node + # no explicit parent.children.append needed I think cuz parent-child links are derived from parent_idxs + # during model._finalize().. + self.extract_node(my_model, child_node_xml, node) + + return node - return s_node @staticmethod def is_model_generatable(model_xml_node): return any( - attribute.get("name") == "doNotGenerate" and attribute.get("value") == "true" + attribute.get("name") == "doNotGenerate" + and attribute.get("value") == "true" for attribute in model_xml_node.findall("attribute") ) @@ -64,7 +78,7 @@ def extract_imports_and_registry(self, model_xml_node): for import_xml_node in imports_xml_node.findall("import"): import_index = import_xml_node.get("index") imported_model_ref = import_xml_node.get("ref") - imported_model_uuid = imported_model_ref[0: imported_model_ref.find("(")] + imported_model_uuid = imported_model_ref[0 : imported_model_ref.find("(")] self.index_2_imported_model_uuid[import_index] = imported_model_uuid registry_xml_node = model_xml_node.find("registry") for language_xml_node in registry_xml_node.findall("language"): @@ -74,13 +88,17 @@ def extract_imports_and_registry(self, model_xml_node): for concept_xml_node in language_xml_node.findall("concept"): concept_id = concept_xml_node.get("id") concept_name = concept_xml_node.get("name") - concept = SLanguageBuilder.get_concept(language, concept_name, concept_id) + concept = SLanguageBuilder.get_concept( + language, concept_name, concept_id + ) concept_index = concept_xml_node.get("index") self.index_2_concept[concept_index] = concept for property_xml_node in concept_xml_node.findall("property"): property_name = property_xml_node.get("name") property_index = property_xml_node.get("index") - node_property = SLanguageBuilder.get_property(concept, property_name) + node_property = SLanguageBuilder.get_property( + concept, property_name + ) self.index_2_property[property_index] = node_property for child_xml_node in concept_xml_node.findall("child"): child_name = child_xml_node.get("name") diff --git a/mps-cli-py/src/mpscli/model/builder/SModelBuilderBinaryPersistency.py b/mps-cli-py/src/mpscli/model/builder/SModelBuilderBinaryPersistency.py index 794a7bd..44f322d 100644 --- a/mps-cli-py/src/mpscli/model/builder/SModelBuilderBinaryPersistency.py +++ b/mps-cli-py/src/mpscli/model/builder/SModelBuilderBinaryPersistency.py @@ -114,6 +114,7 @@ def build(self, path_to_model: str): # and abort entirely but our Python implementation does lenient partial extraction where the node tree # is usually intact even when an unusual import reference sub-kind is encountered advance_until_after(reader, MODEL_START) + model._finalize() return model # 4. MODEL_START @@ -127,6 +128,40 @@ def build(self, path_to_model: str): # 5. Node tree - recursive read_children populates model.root_nodes read_children(reader, self, model, None) + model._finalize() + return model + + def build_from_bytes(self, data: bytes, path_hint: str = ""): + # same as build() but accepts pre-read bytes instead of a file path.. and is used by MpbBatchParser workers + # which read bytes from ZipFile before passing to subprocess which in turn avoids + # opening the JAR again in the worker. + reader = ModelInputStream(data) + version, model_uuid, model_name = self._load_header(reader) + self.stream_version = version + uuid_str = model_uuid or "r:unknown" + name_str = model_name or "unknown.model" + model = SModel(name_str, uuid_str, False) + self.index_2_imported_model_uuid["0"] = uuid_str + load_registry(reader, self) + try: + self._load_model_properties(reader, version) + except _UnknownSubKind as e: + import warnings + + warnings.warn( + f"[build_from_bytes] {path_hint}: {e} - skipped to MODEL_START" + ) + advance_until_after(reader, MODEL_START) + model._finalize() + return model + token = reader.read_u32() + if token != MODEL_START: + raise RuntimeError( + f"Expected MODEL_START (0x{MODEL_START:08X}), " + f"got 0x{token:08X} at pos {reader.tell() - 4}" + ) + read_children(reader, self, model, None) + model._finalize() return model def _load_header(self, reader: ModelInputStream): diff --git a/mps-cli-py/src/mpscli/model/builder/SModelBuilderDefaultPersistency.py b/mps-cli-py/src/mpscli/model/builder/SModelBuilderDefaultPersistency.py index bb530ae..32b9907 100644 --- a/mps-cli-py/src/mpscli/model/builder/SModelBuilderDefaultPersistency.py +++ b/mps-cli-py/src/mpscli/model/builder/SModelBuilderDefaultPersistency.py @@ -1,4 +1,3 @@ - import xml.etree.ElementTree as ET from mpscli.model.builder.SModelBuilderBase import SModelBuilderBase @@ -14,6 +13,7 @@ def build(self, path): for node_xml_node in model_xml_node.findall("node"): root_node = self.extract_node(model, node_xml_node, None) - model.root_nodes.append(root_node) + model._root_idxs.append(root_node._idx) + model._finalize() return model diff --git a/mps-cli-py/src/mpscli/model/builder/SModelBuilderFilePerRootPersistency.py b/mps-cli-py/src/mpscli/model/builder/SModelBuilderFilePerRootPersistency.py index 2ba25fc..de461ec 100644 --- a/mps-cli-py/src/mpscli/model/builder/SModelBuilderFilePerRootPersistency.py +++ b/mps-cli-py/src/mpscli/model/builder/SModelBuilderFilePerRootPersistency.py @@ -1,4 +1,3 @@ - from mpscli.model.builder.SModelBuilderBase import SModelBuilderBase import xml.etree.ElementTree as ET @@ -6,17 +5,18 @@ class SModelBuilderFilePerRootPersistency(SModelBuilderBase): def build(self, path): - model_file = path / '.model' + model_file = path / ".model" tree = ET.parse(model_file) model_xml_node = tree.getroot() model = self.extract_model_core_info(model_xml_node) model.path_to_model_file = model_file for file in path.iterdir(): - if file.suffix == '.mpsr': + if file.suffix == ".mpsr": root_node = self.extract_root_node(model, file) - model.root_nodes.append(root_node) + model._root_idxs.append(root_node._idx) + model._finalize() return model def extract_root_node(self, model, mpsr_file): @@ -25,6 +25,3 @@ def extract_root_node(self, model, mpsr_file): self.extract_imports_and_registry(model_xml_node) root_node = model_xml_node.find("node") return self.extract_node(model, root_node, None) - - - diff --git a/mps-cli-py/src/mpscli/model/builder/SSolutionBuilder.py b/mps-cli-py/src/mpscli/model/builder/SSolutionBuilder.py index 950bae7..a94e629 100644 --- a/mps-cli-py/src/mpscli/model/builder/SSolutionBuilder.py +++ b/mps-cli-py/src/mpscli/model/builder/SSolutionBuilder.py @@ -1,27 +1,26 @@ -import os +import logging +import warnings import xml.etree.ElementTree as ET from pathlib import Path from typing import List, Optional, Tuple +_log = logging.getLogger(__name__) + from mpscli.model.SSolution import SSolution -from mpscli.model.builder.SModelBuilderDefaultPersistency import ( - SModelBuilderDefaultPersistency, -) -from mpscli.model.builder.SModelBuilderFilePerRootPersistency import ( - SModelBuilderFilePerRootPersistency, -) +from mpscli.model.builder.utils.DiskModelLoader import parse_fpr, parse_mps from mpscli.model.builder.utils.MpbBatchParser import MpbBatchParser class SSolutionBuilder: - # builds SSolution objects from .msd files and their associated model directories. - # Callers that need caching or parallelism construct a MpbBatchParser with the desired settings and - # pass it to build_all() + # builds SSolution objects from .msd files and their associated model directories... + # callers that need caching shouldd construct an MpbBatchParser with the desired settings and pass it to build_all(). + def build_solution(self, path_to_msd_file: Path) -> Optional[SSolution]: models_dir = path_to_msd_file.parent / "models" if not models_dir.exists(): - print( - f"ERROR: 'models' directory not found - no model is loaded from: {path_to_msd_file.parent}" + _log.warning( + "'models' directory not found - no model loaded from: %s", + path_to_msd_file.parent, ) return None solutions = self.build_all([path_to_msd_file]) @@ -40,12 +39,10 @@ def build_all( for msd_path in msd_paths: solution = self._extract_solution_info(msd_path) solution.path_to_solution_file = msd_path - models_dir = msd_path.parent / "models" if not models_dir.exists(): solution_infos.append((solution, [], [], [])) continue - mpb, fpr, mps = self._collect_model_paths(models_dir) solution_infos.append((solution, mpb, fpr, mps)) all_mpb_paths.extend(str(p) for p in mpb) @@ -61,20 +58,18 @@ def build_all( for model_path in fpr_dirs: try: - model = SModelBuilderFilePerRootPersistency().build(model_path) - solution.models.append(model) + model = parse_fpr(model_path) + if model is not None: + solution.models.append(model) except Exception as exc: - import warnings - warnings.warn(f"Failed to parse {model_path}: {exc}") for model_path in mps_paths: try: - model = SModelBuilderDefaultPersistency().build(model_path) - solution.models.append(model) + model = parse_mps(model_path) + if model is not None: + solution.models.append(model) except Exception as exc: - import warnings - warnings.warn(f"Failed to parse {model_path}: {exc}") solutions.append(solution) diff --git a/mps-cli-py/src/mpscli/model/builder/SSolutionsRepositoryBuilder.py b/mps-cli-py/src/mpscli/model/builder/SSolutionsRepositoryBuilder.py index 5e667a1..9f9edaa 100644 --- a/mps-cli-py/src/mpscli/model/builder/SSolutionsRepositoryBuilder.py +++ b/mps-cli-py/src/mpscli/model/builder/SSolutionsRepositoryBuilder.py @@ -2,125 +2,267 @@ SSolutionsRepositoryBuilder - builds an SRepository from a directory tree. """ -from timeit import default_timer as timer +import logging import os import sys -import zipfile -import shutil -import warnings import threading -from concurrent.futures import ThreadPoolExecutor, as_completed +import warnings +from timeit import default_timer as timer +from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path +from typing import Dict from mpscli.model.SRepository import SRepository from mpscli.model.builder.SLanguageBuilder import SLanguageBuilder -from mpscli.model.builder.SSolutionBuilder import SSolutionBuilder -from mpscli.model.builder.utils.MpbBatchParser import MpbBatchParser +from mpscli.model.builder.utils.MpbBatchParser import ( + MpbBatchParser, + _parse_jar_members_batch_worker, + _register_lang_pairs, +) from mpscli.model.builder.utils.ModelCache import ModelCache -from mpscli.model.builder.utils.JarUtils import jar_is_relevant +from mpscli.model.builder.utils.JarScanner import scan_all_jars +from mpscli.model.builder.utils.JarModelLoader import load_jar_solutions +from mpscli.model.builder.utils.ParseCache import ParseCache + +_log = logging.getLogger(__name__) + +# minimum number of JAR batches before spawning a ProcessPool... +_JAR_POOL_THRESHOLD = 8 class SSolutionsRepositoryBuilder: - # number of threads for parallel jar processing JAR_THREADS: int | None = None - - # Set False to disable the persistent cache and always reparse from disk USE_CACHE: bool = True def __init__(self): self.repo = SRepository() self._repo_lock = threading.Lock() - self._cache = ModelCache() + self._disk_cache = ModelCache() if self.USE_CACHE else None + self._parse_cache = ( + ParseCache(workers=min(os.cpu_count() or 4, 16)) if self.USE_CACHE else None + ) + self._mpb_parser = MpbBatchParser( + use_cache=self.USE_CACHE, + cache_load_fn=self._disk_cache.load if self._disk_cache else None, + cache_save_fn=self._disk_cache.save if self._disk_cache else None, + parse_cache=self._parse_cache, + ) + # workers=1 so no ProcessPool is spawned I think during disk mpb parsing.. + # also I guess this prevents GIL contention with load_sync ThreadPool... + self._disk_mpb_parser = MpbBatchParser( + workers=1, + use_cache=self.USE_CACHE, + cache_load_fn=self._disk_cache.load if self._disk_cache else None, + cache_save_fn=self._disk_cache.save if self._disk_cache else None, + ) def build(self, paths): if isinstance(paths, str): paths = [paths] elif not isinstance(paths, list): - print("ERROR: paths should be either a string or a list of strings!") + _log.error("paths should be either a string or a list of strings") sys.exit(1) start = timer() - for path in paths: - if not os.path.exists(path): - warnings.warn(f"Path not found: {path}") - continue - if not os.path.isdir(path): - print("ERROR: path", path, "is not a directory!") - continue - - self.collect_solutions_from_sources(path) - self.collect_solutions_from_jars(path) - + valid_paths = [p for p in paths if self._is_valid_path(p)] + if valid_paths: + self.collect_solutions_from_jars(valid_paths) self.repo.languages = list(SLanguageBuilder.languages.values()) - stop = timer() - print(f"duration for parsing modules: {stop - start:.2f} seconds") + _log.info("duration for parsing modules: %.2f seconds", stop - start) + if self._parse_cache: + self._parse_cache.flush() return self.repo - def collect_solutions_from_sources(self, path): - msd_paths = list(Path(path).rglob("*.msd")) + def collect_solutions_from_sources(self, paths, msd_paths=None, preread_bytes=None): + # disk .msd files are always serial for FPR/MPS so SLanguageBuilder.get_concept is called in this thread + # and concepts are populated correctly for test projects + if msd_paths is None: + msd_paths = [p for path in paths for p in Path(path).rglob("*.msd")] if not msd_paths: return - mpb_parser = MpbBatchParser( - use_cache=self.USE_CACHE, - cache_load_fn=self._cache.load if self.USE_CACHE else None, - cache_save_fn=self._cache.save if self.USE_CACHE else None, - ) - - builder = SSolutionBuilder() - solutions = builder.build_all(msd_paths, mpb_parser=mpb_parser) + t0 = timer() + from mpscli.model.builder.utils.DiskModelLoader import load_disk_solutions + solutions = load_disk_solutions( + msd_paths, + mpb_parser=self._disk_mpb_parser, + preread_bytes=preread_bytes, + cache_load_fn=self._disk_cache.load if self._disk_cache else None, + cache_save_fn=self._disk_cache.save if self._disk_cache else None, + ) with self._repo_lock: for solution in solutions: if solution is not None and not self.repo.find_solution_by_name( solution.name ): self.repo.solutions.append(solution) + _log.info( + "[diag] phase3 disk: %.1fs -- %d msd files", timer() - t0, len(msd_paths) + ) - def collect_solutions_from_jars(self, path): - """ - For every JAR under 'path': - 1. Peek inside the zip central directory (no extraction) and skip if no .msd file - 2. Extract to a private temp directory. - 3. collect_solutions_from_sources() on the temp dir - 4. Delete the temp dir unconditionally. - Now all jars are processed concurrently via a ThreadPoolExecutor - """ - jar_paths = list(Path(path).rglob("*.jar")) - if not jar_paths: - return - + def collect_solutions_from_jars(self, paths): workers = self.JAR_THREADS or min(os.cpu_count() or 4, 16) - with ThreadPoolExecutor(max_workers=workers) as pool: - futures = {pool.submit(self._process_jar, jp): jp for jp in jar_paths} - for future in as_completed(futures): - exc = future.exception() - if exc: - warnings.warn(f"Error processing {futures[future]}: {exc}") + # phase1: scan ZIP central directories. + t0 = timer() + jar_paths = [jp for path in paths for jp in Path(path).rglob("*.jar")] + scan = scan_all_jars(jar_paths, workers) + t1 = timer() + _log.info( + "[diag] phase1 scan: %.1fs -- %d jar solutions, %d languages, %d mpb members", + t1 - t0, + len(scan.solution_infos), + len(scan.language_infos), + len(scan.all_mpb_members), + ) + + if self._parse_cache: + t_load = timer() + self._parse_cache.load_sync(scan.jar_stats) + _log.info("[diag] cache load: %.1fs", timer() - t_load) + + jar_cached, uncached_by_jar = self._mpb_parser.get_cached_and_uncached_by_jar( + scan.all_mpb_members + ) + jar_results = dict(jar_cached) + hits = len(jar_cached) + misses = sum(len(v) for v in uncached_by_jar.values()) + _log.info("[diag] cache: %d hits, %d misses", hits, misses) + + # scan disk msd files and start I/O pre-read before phase2. preread is a ThreadPool reading model/.mpsr bytes which is + # pure I/O. This also runs concurrently with phase2's ProcessPool (so basically separate processes, no conflict).. + msd_paths = sorted([p for path in paths for p in Path(path).rglob("*.msd")]) + preread_bytes: dict = {} + preread_thread = None + if msd_paths: + from mpscli.model.builder.utils.DiskModelLoader import preread_disk_bytes + + def _do_preread(): + preread_bytes.update(preread_disk_bytes(msd_paths)) + + preread_thread = threading.Thread( + target=_do_preread, daemon=True, name="disk-preread" + ) + preread_thread.start() + + # phase2: parse uncached JAR .mpb members.. + # single pool spanning phase2 and phase4 - spawn happens once, before disk_thread, so no second SPAWN event + # most probably occurs after disk_thread starts. + jar_new: Dict[str, dict] = {} + t2_start = timer() + use_pool = len(uncached_by_jar) >= _JAR_POOL_THRESHOLD + pool = ProcessPoolExecutor(max_workers=workers) if use_pool else None - def _process_jar(self, jar_path: Path) -> None: - # cheap peek - skip jars with neither a .msd nor .mpl file - if not jar_is_relevant(jar_path): - return - # extract to an isolated temp directory - extract_dir = jar_path.parent / jar_path.name.replace(".", "_") - extract_dir.mkdir(parents=True, exist_ok=True) try: - with zipfile.ZipFile(jar_path) as jar: - jar.extractall(extract_dir) - # print just the jar name - print(f"extracting: {jar_path.name}") - # scan and also parse solutions (.msd and models) - self.collect_solutions_from_sources(extract_dir) - # read any .mpl files to populate SLanguage objects with version and aspect models - for mpl_file in extract_dir.rglob("*.mpl"): - SLanguageBuilder.load_from_mpl(mpl_file) + if pool is not None: + futures = { + pool.submit(_parse_jar_members_batch_worker, jar, members): jar + for jar, members in uncached_by_jar.items() + } + for future in as_completed(futures): + jar = futures[future] + try: + batch, lang_pairs = future.result() + jar_new[jar] = batch + for member, model in batch.items(): + jar_results[(jar, member)] = model + if lang_pairs: + _register_lang_pairs(lang_pairs) + except Exception as exc: + warnings.warn(f"Failed to parse JAR {jar}: {exc}") + else: + for jar, members in uncached_by_jar.items(): + try: + batch, lang_pairs = _parse_jar_members_batch_worker( + jar, members + ) + jar_new[jar] = batch + for member, model in batch.items(): + jar_results[(jar, member)] = model + if lang_pairs: + _register_lang_pairs(lang_pairs) + except Exception as exc: + warnings.warn(f"Failed to parse JAR {jar}: {exc}") + + self._mpb_parser.flush_jar_cache(jar_new, scan.jar_stats) + t2 = timer() + _log.info( + "[diag] phase2 jar mpb: %.1fs -- %d models", + t2 - t2_start, + len(jar_results), + ) + + # join preread which should already be done + if preread_thread is not None: + preread_thread.join() + + # disk_thread starts here and pool already spawned so no second SPAWN event.. This starts after phase2 + # so disk does not compete with phase2 workerss + disk_thread = threading.Thread( + target=self.collect_solutions_from_sources, + args=(paths,), + kwargs={ + "msd_paths": msd_paths, + "preread_bytes": preread_bytes or None, + }, + daemon=False, + name="disk-build", + ) + disk_thread.start() + + # phase4: JAR FPR/MPS solutions + disk_solution_names = {s.name for s in self.repo.solutions} + jar_solutions = load_jar_solutions( + scan, + jar_results, + disk_solution_names, + workers, + parse_cache=self._parse_cache, + use_cache=self.USE_CACHE, + pool=pool, + prereaded_fpr=None, + jar_stats=scan.jar_stats, + ) + t3 = timer() + + disk_thread.join() finally: - # always clean up - try: - shutil.rmtree(extract_dir) - except OSError as e: - print(f"Error removing {extract_dir}: {e}") + if pool is not None: + # wait=False: disk already joined above so the workers idle since phase4.. + pool.shutdown(wait=False) + + with self._repo_lock: + for solution in jar_solutions: + if not self.repo.find_solution_by_name(solution.name): + self.repo.solutions.append(solution) + + total = sum(len(s.models) for s in self.repo.solutions) + _log.info( + "[diag] phase4 fpr/mps: %.1fs -- %d solutions, %d models", + t3 - t2, + len(self.repo.solutions), + total, + ) + + t4 = timer() + for namespace, uuid, version, mpb_members in scan.language_infos: + lang = SLanguageBuilder.get_language(namespace, uuid) + lang.language_version = version + lang.models = [ + jar_results[key] + for key in mpb_members + if jar_results.get(key) is not None + ] + _log.info("[diag] phase5 lang: %.1fs", timer() - t4) + + def _is_valid_path(self, path: str) -> bool: + if not os.path.exists(path): + warnings.warn(f"Path not found: {path}") + return False + if not os.path.isdir(path): + _log.error("path %s is not a directory", path) + return False + return True diff --git a/mps-cli-py/src/mpscli/model/builder/binary/nodes.py b/mps-cli-py/src/mpscli/model/builder/binary/nodes.py index 8c0128a..d50a06d 100644 --- a/mps-cli-py/src/mpscli/model/builder/binary/nodes.py +++ b/mps-cli-py/src/mpscli/model/builder/binary/nodes.py @@ -51,11 +51,11 @@ def read_children(reader, builder, model, parent=None): node = read_node(reader, builder, model, parent) if parent is None: - model.root_nodes.append(node) - else: - parent.children.append(node) + # register as root.. parent-child links are derived from parent_idxs during finalize + model._root_idxs.append(node._idx) - return model.root_nodes if parent is None else parent.children + # no return value so callers do not use the result. + # parent-child links are derived from _parent_buf during model._finalize() call.. def read_node(reader, builder, model, parent=None): @@ -88,7 +88,10 @@ def read_node(reader, builder, model, parent=None): f"Expected '{{' (0x{NODE_OPEN_BRACE:02X}) at pos {reader.tell() - 1}, got 0x{brace:02X}" ) - node = SNode(node_id, concept, role_in_parent, parent) + # allocate node in models flat arrays so basically no SNode object constructed here + parent_idx = None if parent is None else parent._idx + idx = model._add_node(node_id, concept, role_in_parent, parent_idx) + node = SNode(model, idx) props_count = reader.read_u16() for _ in range(props_count): @@ -96,7 +99,7 @@ def read_node(reader, builder, model, parent=None): value = reader.read_string() prop_key = builder.index_2_property.get(prop_index) if prop_key is not None: - node.properties[prop_key] = value + model._set_property(idx, prop_key, value) user_obj_count = reader.read_u16() for _ in range(user_obj_count): @@ -106,9 +109,9 @@ def read_node(reader, builder, model, parent=None): for _ in range(refs_count): ref_name, ref = _read_reference(reader, builder, model) if ref_name is not None: - node.references[ref_name] = ref + model._set_reference(idx, ref_name, ref) - # read children recursively + # read children recursively - parent-child links built from parent_idxs during finalize read_children(reader, builder, model, node) brace = reader.read_u8() diff --git a/mps-cli-py/src/mpscli/model/builder/utils/DiskModelLoader.py b/mps-cli-py/src/mpscli/model/builder/utils/DiskModelLoader.py new file mode 100644 index 0000000..3afa487 --- /dev/null +++ b/mps-cli-py/src/mpscli/model/builder/utils/DiskModelLoader.py @@ -0,0 +1,249 @@ +# mpscli/model/builder/utils/DiskModelLoader.py +# +# Builds SSolution objects from disk .msd files. +# MPB: parsed serially via MpbBatchParser (workers=1 avoids GIL contention with load_sync's ThreadPool). +# FPR/MPS: preread bytes supplied from a background I/O thread (preread_disk_bytes) that runs during phase2. +# disk_thread then parses from memory with mostly zero file I/O which eliminates competition with phase4 +# ProcessPool workers. +# And on warm runs the ModelCache supplies models directly so basicallyy preread bytes are unused. + +import threading +import warnings +import xml.etree.ElementTree as ET +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import List, Optional, Tuple + +from mpscli.model.SSolution import SSolution +from mpscli.model.builder.utils.MpbBatchParser import MpbBatchParser + + +def preread_disk_bytes(msd_paths: List[Path], io_workers: int = 32) -> dict: + # preread all FPR .model/.mpsr and MPS bytes and this is pure I/O and no parsing.. + # This is called from a background thread that starts before phase2 + # key: ('fpr', idx, path) -> (model_bytes, [(name, mpsr_bytes), ...]) + # ('mps', idx, path) -> data_bytes + tasks = [] + for idx, msd_path in enumerate(msd_paths): + models_dir = msd_path.parent / "models" + if not models_dir.exists(): + continue + _, fpr, mps = _collect_model_paths(models_dir) + for fp in fpr: + tasks.append(("fpr", idx, fp)) + for mp in mps: + tasks.append(("mps", idx, mp)) + + if not tasks: + return {} + + preread: dict = {} + lock = threading.Lock() + + def read_task(task): + kind, idx, path = task + try: + if kind == "fpr": + model_bytes = (path / ".model").read_bytes() + mpsr_data = [ + (str(f), f.read_bytes()) for f in sorted(path.glob("*.mpsr")) + ] + with lock: + preread[(kind, idx, path)] = (model_bytes, mpsr_data) + else: + with lock: + preread[(kind, idx, path)] = path.read_bytes() + except Exception as exc: + warnings.warn(f"Failed to preread {path}: {exc}") + + with ThreadPoolExecutor(max_workers=min(len(tasks), io_workers)) as pool: + list(pool.map(read_task, tasks)) + + return preread + + +def load_disk_solutions( + msd_paths: List[Path], + mpb_parser: Optional[MpbBatchParser] = None, + preread_bytes: Optional[dict] = None, + cache_load_fn=None, + cache_save_fn=None, +) -> List[SSolution]: + if mpb_parser is None: + mpb_parser = MpbBatchParser() + + solution_infos: List[Tuple[SSolution, List[Path], List[Path], List[Path]]] = [] + all_mpb_paths: List[str] = [] + + for msd_path in msd_paths: + solution = _extract_solution_info(msd_path) + solution.path_to_solution_file = msd_path + models_dir = msd_path.parent / "models" + if not models_dir.exists(): + solution_infos.append((solution, [], [], [])) + continue + mpb, fpr, mps = _collect_model_paths(models_dir) + solution_infos.append((solution, mpb, fpr, mps)) + all_mpb_paths.extend(str(p) for p in mpb) + + mpb_results = mpb_parser.parse(all_mpb_paths) + + solutions = [] + for idx, (solution, mpb_paths, fpr_dirs, mps_paths) in enumerate(solution_infos): + for p in mpb_paths: + model = mpb_results.get(str(p)) + if model is not None: + solution.models.append(model) + + for fp in fpr_dirs: + try: + model = None + model_file = fp / ".model" + # warm path: check ModelCache before parsing + if cache_load_fn is not None: + model = cache_load_fn(model_file) + if model is None: + key = ("fpr", idx, fp) + if preread_bytes is not None and key in preread_bytes: + # cold path: parse from preread bytes - zero file I/O + model_bytes, mpsr_data = preread_bytes[key] + model = _parse_fpr_bytes(model_bytes, mpsr_data, fpr_path=fp) + if model is not None and cache_save_fn is not None: + cache_save_fn(model_file, model) + else: + # fallback: when preread is not available then parse from file directly + model = parse_fpr(fp) + if model is not None: + solution.models.append(model) + except Exception as exc: + warnings.warn(f"Failed to parse {fp}: {exc}") + + for mp in mps_paths: + try: + model = None + # warm run (cache folder exist on disk after first run): check ModelCache before parsing + if cache_load_fn is not None: + model = cache_load_fn(mp) + if model is None: + key = ("mps", idx, mp) + if preread_bytes is not None and key in preread_bytes: + # cold path: parse from preread bytes - zero file I/O + model = _parse_mps_bytes(preread_bytes[key], mps_path=mp) + if model is not None and cache_save_fn is not None: + cache_save_fn(mp, model) + else: + # fallback: preread unavailable - parse from file directly + model = parse_mps(mp) + if model is not None: + solution.models.append(model) + except Exception as exc: + warnings.warn(f"Failed to parse {mp}: {exc}") + + solutions.append(solution) + + return solutions + + +def _parse_fpr_bytes(model_bytes: bytes, mpsr_data: list, fpr_path: Path = None): + # parse FPR from pre-read bytes in-process so basically no file I/O.. This is called from disk_thread so basically + # no shared mutable state between callers. + # fpr_path is the FPR directory used to set model.path_to_model_file so it matches + # what SModelBuilderFilePerRootPersistency.build() would actually set.. + try: + from lxml import etree as ET + except ImportError: + import xml.etree.ElementTree as ET + from mpscli.model.builder.SModelBuilderFilePerRootPersistency import ( + SModelBuilderFilePerRootPersistency, + ) + + builder = SModelBuilderFilePerRootPersistency() + model_xml = ET.fromstring(model_bytes) + model = builder.extract_model_core_info(model_xml) + if fpr_path is not None: + model.path_to_model_file = fpr_path / ".model" + for _name, mpsr_bytes in mpsr_data: + try: + mpsr_xml = ET.fromstring(mpsr_bytes) + builder.extract_imports_and_registry(mpsr_xml) + root = mpsr_xml.find("node") + if root is not None: + root_node = builder.extract_node(model, root, None) + model._root_idxs.append(root_node._idx) + except Exception: + pass + model._finalize() + return model + + +def _parse_mps_bytes(data: bytes, mps_path: Path = None): + # parse MPS from preread bytes in-process so basically no file I/O. Also, mps_path is the source .mps file used + # to set model.path_to_model_file so it matches what SModelBuilderDefaultPersistency.build() would set + try: + from lxml import etree as ET + except ImportError: + import xml.etree.ElementTree as ET + from mpscli.model.builder.SModelBuilderDefaultPersistency import ( + SModelBuilderDefaultPersistency, + ) + + builder = SModelBuilderDefaultPersistency() + model_xml = ET.fromstring(data) + model = builder.extract_model_core_info(model_xml) + if mps_path is not None: + model.path_to_model_file = mps_path + builder.extract_imports_and_registry(model_xml) + for node_xml in model_xml.findall("node"): + root_node = builder.extract_node(model, node_xml, None) + model._root_idxs.append(root_node._idx) + model._finalize() + return model + + +def _collect_model_paths( + models_dir: Path, +) -> Tuple[List[Path], List[Path], List[Path]]: + mpb, fpr, mps = [], [], [] + for p in models_dir.rglob("*"): + if p.is_file() and p.suffix == ".mpb": + mpb.append(p) + elif p.is_dir() and (p / ".model").exists(): + fpr.append(p) + elif p.is_file() and p.suffix == ".mps": + mps.append(p) + return mpb, fpr, mps + + +def _extract_solution_info(solution_file: Path) -> SSolution: + tree = ET.parse(solution_file) + root = tree.getroot() + return SSolution(root.get("name"), root.get("uuid")) + + +def parse_fpr(model_path: Path, cache_load_fn=None, cache_save_fn=None): + # parse one FPR model directory with optional ModelCache integration. This is used by SSolutionBuilder + # for disk-resident FPR models + model_file = model_path / ".model" + if cache_load_fn is not None: + cached = cache_load_fn(model_file) + if cached is not None: + return cached + model_bytes = model_file.read_bytes() + mpsr_data = [(str(f), f.read_bytes()) for f in sorted(model_path.glob("*.mpsr"))] + model = _parse_fpr_bytes(model_bytes, mpsr_data, fpr_path=model_path) + if model is not None and cache_save_fn is not None: + cache_save_fn(model_file, model) + return model + + +def parse_mps(model_path: Path, cache_load_fn=None, cache_save_fn=None): + # parse one .mps model file with optional ModelCache integration.. + # This is used by SSolutionBuilder for disk-resident MPS models. + if cache_load_fn is not None: + cached = cache_load_fn(model_path) + if cached is not None: + return cached + model = _parse_mps_bytes(model_path.read_bytes(), mps_path=model_path) + if model is not None and cache_save_fn is not None: + cache_save_fn(model_path, model) + return model diff --git a/mps-cli-py/src/mpscli/model/builder/utils/FlatModelPacker.py b/mps-cli-py/src/mpscli/model/builder/utils/FlatModelPacker.py new file mode 100644 index 0000000..59a65d3 --- /dev/null +++ b/mps-cli-py/src/mpscli/model/builder/utils/FlatModelPacker.py @@ -0,0 +1,138 @@ +# mpscli/model/builder/utils/FlatModelPacker.py +# +# Serialization helpers for SModel's flat array storage. This is called by SModel._finalize() to +# convert Python dicts into compact numpy arrays suitable for memory mapping on warm cache loads. +# + +import numpy as np + + +def pack_strings(strings): + """ + Pack a list of strings into a bytes blob with int32 byte offsets + + offsets[i] to offsets[i+1] is the UTF-8 encoded slice for string i. The resulting arrays are + numpy-memmappable so on warm cache load they are mapped into virtual memory with np.load(mmap_mode='r') and + individual strings are decoded on first access only. + + Returns (offsets int32[n+1], data bytes) + """ + encoded = [s.encode() for s in strings] + n = len(encoded) + offsets = np.zeros(n + 1, dtype=np.int32) + for i, b in enumerate(encoded): + offsets[i + 1] = offsets[i] + len(b) + return offsets, b"".join(encoded) + + +def pack_properties(properties): + """ + Pack a list of per node property dicts into flat arrayss + + layout explanation: + prop_start[i] to prop_start[i+1] is the slice of property entries for node i. Property keys are deduplicated + across all nodes because the same key (ex: 'name','abstract', 'visibility') appears on millions of nodes + so storing a single int32 index per occurrence instead of the full string saves significant memory in my opinion. + Property values are stored as a packed bytes blob with int32 byte offsets because values vary significantly + in length and cannot fit in a fixed-width array right? + + Returns: + prop_start: int32[n+1] which is per-node slice bounds + prop_key_idxs: int32[p] - key index into prop_keys + prop_val_offsets: int32[p+1] - byte offsets into prop_val_data + prop_val_data: bytes - packed UTF-8 property values + prop_keys: list[str] - deduplicated key names (small, around 100-200 entries maybe) + """ + n = len(properties) + prop_keys = [] + prop_key_map = {} + prop_start = np.zeros(n + 1, dtype=np.int32) + key_idxs = [] + val_encoded = [] + total = 0 + for node_idx, d in enumerate(properties): + prop_start[node_idx] = total + for k, v in d.items(): + if k not in prop_key_map: + prop_key_map[k] = len(prop_keys) + prop_keys.append(k) + key_idxs.append(prop_key_map[k]) + val_encoded.append((str(v) if v is not None else "").encode()) + total += 1 + prop_start[n] = total + empty = np.empty(0, dtype=np.int32) + prop_key_idxs = np.array(key_idxs, dtype=np.int32) if key_idxs else empty + val_offsets = np.zeros(total + 1, dtype=np.int32) + for i, b in enumerate(val_encoded): + val_offsets[i + 1] = val_offsets[i] + len(b) + return prop_start, prop_key_idxs, val_offsets, b"".join(val_encoded), prop_keys + + +def pack_references(references): + """ + Pack a list of per-node reference dicts into flat arrays.. + + Each SNodeRef has three string fields: model_uuid, node_uuid, and resolve_info. model_uuid is deduplicated + because a model typically references a small set of other modelss and the same model UUID appears on many + references so an int32 index into a lookup table is much cheaper I think than repeating the full string.. + node_uuid and resolve_info are stored as separate packed bytes blobs because each reference points to a + distinct node and these values are rarely repeated + + Returns: + ref_start: int32[n+1] - per-node slice bounds + ref_key_idxs: int32[r] - role name index into ref_keys + ref_model_idxs: int32[r] - model UUID index into ref_model_uuids + ref_node_uuid_offsets: int32[r+1] - byte offsets into ref_node_uuid_data + ref_node_uuid_data: bytes + ref_resolve_offsets: int32[r+1] - byte offsets into ref_resolve_data + ref_resolve_data: bytes + ref_keys: list[str] - deduplicated role names (small) + ref_model_uuids: list[str] - deduplicated model UUIDs (small) + """ + n = len(references) + ref_keys = [] + ref_key_map = {} + ref_model_uuids = [] + ref_model_uuid_map = {} + ref_start = np.zeros(n + 1, dtype=np.int32) + key_idxs = [] + model_idxs = [] + node_uuid_encoded = [] + resolve_encoded = [] + total = 0 + for node_idx, d in enumerate(references): + ref_start[node_idx] = total + for role, ref in d.items(): + if role not in ref_key_map: + ref_key_map[role] = len(ref_keys) + ref_keys.append(role) + key_idxs.append(ref_key_map[role]) + model_uuid = ref.model_uuid or "" + if model_uuid not in ref_model_uuid_map: + ref_model_uuid_map[model_uuid] = len(ref_model_uuids) + ref_model_uuids.append(model_uuid) + model_idxs.append(ref_model_uuid_map[model_uuid]) + node_uuid_encoded.append((ref.node_uuid or "").encode()) + resolve_encoded.append((ref.resolve_info or "").encode()) + total += 1 + ref_start[n] = total + empty = np.empty(0, dtype=np.int32) + ref_key_idxs = np.array(key_idxs, dtype=np.int32) if key_idxs else empty + ref_model_idxs_arr = np.array(model_idxs, dtype=np.int32) if model_idxs else empty + node_uuid_offsets = np.zeros(total + 1, dtype=np.int32) + for i, b in enumerate(node_uuid_encoded): + node_uuid_offsets[i + 1] = node_uuid_offsets[i] + len(b) + resolve_offsets = np.zeros(total + 1, dtype=np.int32) + for i, b in enumerate(resolve_encoded): + resolve_offsets[i + 1] = resolve_offsets[i] + len(b) + return ( + ref_start, + ref_key_idxs, + ref_model_idxs_arr, + node_uuid_offsets, + b"".join(node_uuid_encoded), + resolve_offsets, + b"".join(resolve_encoded), + ref_keys, + ref_model_uuids, + ) diff --git a/mps-cli-py/src/mpscli/model/builder/utils/JarModelLoader.py b/mps-cli-py/src/mpscli/model/builder/utils/JarModelLoader.py new file mode 100644 index 0000000..4d25dc0 --- /dev/null +++ b/mps-cli-py/src/mpscli/model/builder/utils/JarModelLoader.py @@ -0,0 +1,316 @@ +# mpscli/model/builder/utils/JarModelLoader.py +# +# Builds SSolution objects from JAR scan results. For FPR/MPS: ThreadPool preread bytes + ProcessPool compute +# Also, uses ParseCache (one file loaded once) instead of per JAR files + +import warnings +import zipfile +from collections import defaultdict +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed +from contextlib import nullcontext +from pathlib import Path +from typing import Dict, List, Optional, Set, Tuple +import threading + +from mpscli.model.SSolution import SSolution +from mpscli.model.builder.utils.JarScanner import JarScanResult, JarSolutionInfo + +# minimum number of FPR/MPS tasks before spawning a ProcessPool for phase 4 +# below this threshold tasks run serially to avoid Windows process spawn overhead.. +_FPR_POOL_THRESHOLD = 8 + + +def _parse_fpr_bytes_worker(model_bytes: bytes, mpsr_data: list): + # top-level picklable which is pure compute so basically no ZIP I/O + # returns (model, new_lang_pairs) with (name, uuid) only and not the concept data. see + # _parse_jar_members_batch_worker for explanation on why concept data is not returned from pool workers + try: + from lxml import etree as ET + except ImportError: + import xml.etree.ElementTree as ET + from mpscli.model.builder.SModelBuilderFilePerRootPersistency import ( + SModelBuilderFilePerRootPersistency, + ) + from mpscli.model.builder.SLanguageBuilder import SLanguageBuilder + + existing_names = set(SLanguageBuilder.languages.keys()) + + builder = SModelBuilderFilePerRootPersistency() + model_xml = ET.fromstring(model_bytes) + model = builder.extract_model_core_info(model_xml) + for _name, mpsr_bytes in mpsr_data: + try: + mpsr_xml = ET.fromstring(mpsr_bytes) + builder.extract_imports_and_registry(mpsr_xml) + root = mpsr_xml.find("node") + if root is not None: + root_node = builder.extract_node(model, root, None) + model._root_idxs.append(root_node._idx) + except Exception: + pass + model._finalize() + + new_lang_pairs = [ + (name, lang.uuid) + for name, lang in SLanguageBuilder.languages.items() + if name not in existing_names + ] + return model, new_lang_pairs + + +def _parse_mps_bytes_worker(data: bytes): + # top-level picklable so again pure compute and no ZIP I/O. + # returns (model, new_lang_pairs) with (name, uuid) only, see _parse_fpr_bytes_worker comment + try: + from lxml import etree as ET + except ImportError: + import xml.etree.ElementTree as ET + from mpscli.model.builder.SModelBuilderDefaultPersistency import ( + SModelBuilderDefaultPersistency, + ) + from mpscli.model.builder.SLanguageBuilder import SLanguageBuilder + + existing_names = set(SLanguageBuilder.languages.keys()) + + builder = SModelBuilderDefaultPersistency() + model_xml = ET.fromstring(data) + model = builder.extract_model_core_info(model_xml) + builder.extract_imports_and_registry(model_xml) + for node_xml in model_xml.findall("node"): + root_node = builder.extract_node(model, node_xml, None) + model._root_idxs.append(root_node._idx) + model._finalize() + + new_lang_pairs = [ + (name, lang.uuid) + for name, lang in SLanguageBuilder.languages.items() + if name not in existing_names + ] + return model, new_lang_pairs + + +def _preread_fpr_bytes( + infos: List[JarSolutionInfo], workers: int +) -> Dict[Tuple[str, str], Tuple[bytes, list]]: + # this is I/O bound: ThreadPoolExecutor opens each unique JAR exactly once + by_jar: Dict[str, List[str]] = defaultdict(list) + for info in infos: + for prefix in info.fpr_prefixes: + by_jar[info.jar_path_str].append(prefix) + + if not by_jar: + return {} + + results: Dict[Tuple[str, str], Tuple[bytes, list]] = {} + lock = threading.Lock() + + def read_jar(jar_path_str: str, prefixes: List[str]) -> None: + jar_data = {} + try: + with zipfile.ZipFile(jar_path_str) as zf: + names = zf.namelist() + for prefix in prefixes: + try: + model_bytes = zf.read(prefix + ".model") + mpsr_data = [ + (n, zf.read(n)) + for n in sorted( + m + for m in names + if m.startswith(prefix) and m.endswith(".mpsr") + ) + ] + jar_data[(jar_path_str, prefix)] = (model_bytes, mpsr_data) + except Exception: + pass + except Exception as exc: + warnings.warn(f"Failed to pre-read fpr from {jar_path_str}: {exc}") + with lock: + results.update(jar_data) + + # use many threads to overlap delay across all jars.. + io_threads = min(len(by_jar) * 2, 200) + with ThreadPoolExecutor(max_workers=io_threads) as pool: + futures = { + pool.submit(read_jar, jar, prefixes): jar + for jar, prefixes in by_jar.items() + } + for future in as_completed(futures): + exc = future.exception() + if exc: + warnings.warn(f"Error pre-reading {futures[future]}: {exc}") + + return results + + +def preread_all_fpr_bytes( + scan: JarScanResult, workers: int +) -> Dict[Tuple[str, str], Tuple[bytes, list]]: + # preread FPR bytes for all solution_infos in the scan result. + return _preread_fpr_bytes(scan.solution_infos, workers) + + +def load_jar_solutions( + scan: JarScanResult, + jar_results: Dict[Tuple[str, str], object], + disk_solution_names: Set[str], + workers: int, + parse_cache=None, + use_cache: bool = True, + pool: Optional[ProcessPoolExecutor] = None, + prereaded_fpr: Optional[Dict] = None, + jar_stats: Optional[Dict[str, Tuple[float, int]]] = None, +) -> List[SSolution]: + from mpscli.model.builder.SLanguageBuilder import SLanguageBuilder + from mpscli.model.builder.utils.MpbBatchParser import _register_lang_pairs + + seen = set(disk_solution_names) + winners: List[JarSolutionInfo] = [] + for info in scan.solution_infos: + if info.solution.name not in seen: + seen.add(info.solution.name) + winners.append(info) + + if not winners: + return [] + + solution_by_name: Dict[str, SSolution] = {} + for info in winners: + solution_by_name[info.solution.name] = info.solution + for key in info.mpb_members: + model = jar_results.get(key) + if model is not None: + info.solution.models.append(model) + + # check ParseCache + by_jar_fpr: Dict[str, List[Tuple[str, str]]] = defaultdict(list) + by_jar_mps: Dict[str, List[Tuple[str, str]]] = defaultdict(list) + + for info in winners: + jar = info.jar_path_str + cached = parse_cache.get_xml(jar) if (use_cache and parse_cache) else None + if cached is not None: + for prefix in info.fpr_prefixes: + model = cached.get(("fpr", prefix)) + if model is not None: + info.solution.models.append(model) + for member in info.mps_members: + model = cached.get(("mps", member)) + if model is not None: + info.solution.models.append(model) + else: + for prefix in info.fpr_prefixes: + by_jar_fpr[jar].append((info.solution.name, prefix)) + for member in info.mps_members: + by_jar_mps[jar].append((info.solution.name, member)) + + has_fpr = any(by_jar_fpr.values()) + has_mps = any(by_jar_mps.values()) + + if not has_fpr and not has_mps: + return list(solution_by_name.values()) + + # use prereaded bytes if provided otherwise read now + if prereaded_fpr is not None: + fpr_bytes = prereaded_fpr + elif has_fpr: + fpr_bytes = _preread_fpr_bytes(winners, workers) + else: + fpr_bytes = {} + + mps_bytes: Dict[Tuple[str, str], bytes] = {} + if has_mps: + # use many concurrent threads to overlap delay across all MPS JARs. + mps_lock = threading.Lock() + + def read_mps_jar(jar_entries): + jar, entries = jar_entries + jar_data = {} + try: + with zipfile.ZipFile(jar) as zf: + for _sol_name, member in entries: + try: + jar_data[(jar, member)] = zf.read(member) + except Exception: + pass + except Exception as exc: + warnings.warn(f"Failed to pre-read mps from {jar}: {exc}") + with mps_lock: + mps_bytes.update(jar_data) + + io_threads = min(len(by_jar_mps) * 2, 200) + with ThreadPoolExecutor(max_workers=io_threads) as mps_pool: + list(mps_pool.map(read_mps_jar, by_jar_mps.items())) + + all_tasks = [] + for jar, entries in by_jar_fpr.items(): + for sol_name, prefix in entries: + byte_data = fpr_bytes.get((jar, prefix)) + if byte_data is not None: + model_bytes, mpsr_data = byte_data + all_tasks.append(("fpr", sol_name, jar, prefix, model_bytes, mpsr_data)) + for jar, entries in by_jar_mps.items(): + for sol_name, member in entries: + data = mps_bytes.get((jar, member)) + if data is not None: + all_tasks.append(("mps", sol_name, jar, member, data, None)) + + if not all_tasks: + return list(solution_by_name.values()) + + jar_cache_pending: Dict[str, Dict] = defaultdict(dict) + + def _handle_result(model, lang_pairs, kind, sol_name, jar, key): + sol = solution_by_name.get(sol_name) + if sol is not None: + sol.models.append(model) + jar_cache_pending[jar][(kind, key)] = model + # register language shells from subprocess workers + if lang_pairs: + _register_lang_pairs(lang_pairs) + + # only spawn workers when there are enough tasks to justify the overhead. The passed pool is used when + # above threshold and serial path is usd when below, task count drives the decision not pool presence so this + # ensures small test projects always parse in the main process so SLanguageBuilder gets populated... + if len(all_tasks) >= _FPR_POOL_THRESHOLD: + pool_ctx = ( + nullcontext(pool) + if pool is not None + else ProcessPoolExecutor(max_workers=workers) + ) + with pool_ctx as _pool: + future_to_info = {} + for task in all_tasks: + kind, sol_name, jar, key = task[0], task[1], task[2], task[3] + f = ( + _pool.submit(_parse_fpr_bytes_worker, task[4], task[5]) + if kind == "fpr" + else _pool.submit(_parse_mps_bytes_worker, task[4]) + ) + future_to_info[f] = (kind, sol_name, jar, key) + + for future in as_completed(future_to_info): + kind, sol_name, jar, key = future_to_info[future] + try: + model, lang_pairs = future.result() + _handle_result(model, lang_pairs, kind, sol_name, jar, key) + except Exception as exc: + warnings.warn(f"Failed to parse {jar}!{key}: {exc}") + else: + # serial path for small batches so no pool spawn overhead and languages are populated in-process + for task in all_tasks: + kind, sol_name, jar, key = task[0], task[1], task[2], task[3] + try: + if kind == "fpr": + model, lang_pairs = _parse_fpr_bytes_worker(task[4], task[5]) + else: + model, lang_pairs = _parse_mps_bytes_worker(task[4]) + _handle_result(model, lang_pairs, kind, sol_name, jar, key) + except Exception as exc: + warnings.warn(f"Failed to parse {jar}!{key}: {exc}") + + # update ParseCache synchronously so basically just dict assignments and no I/O + if use_cache and parse_cache and jar_cache_pending: + parse_cache.put_xml_many(dict(jar_cache_pending), jar_stats) + + return list(solution_by_name.values()) diff --git a/mps-cli-py/src/mpscli/model/builder/utils/JarScanner.py b/mps-cli-py/src/mpscli/model/builder/utils/JarScanner.py new file mode 100644 index 0000000..1661a4f --- /dev/null +++ b/mps-cli-py/src/mpscli/model/builder/utils/JarScanner.py @@ -0,0 +1,166 @@ +# mpscli/model/builder/utils/JarScanner.py +# +# Discovers MPS model content inside JAR files without extracting them to disk. Reads only the ZIP central +# directory (the table of contents at the end of the ZIP file) plus small .msd and .mpl XML members. +# No model bytes are actually readd here. +# +# Produces JarScanResult which SSolutionsRepositoryBuilder uses to drive phases 2-4 so basically the mpb member +# list feeds MpbBatchParser, solution_infos feeds.. + +import threading +import warnings +import xml.etree.ElementTree as ET +import zipfile +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +from mpscli.model.SSolution import SSolution +from mpscli.model.builder.utils.JarUtils import jar_is_relevant + + +class JarSolutionInfo: + # all MPS content discovered for one solution inside one JAR + def __init__(self, solution, jar_path_str, mpb_members, mps_members, fpr_prefixes): + self.solution = solution + self.jar_path_str = jar_path_str + # [(jar_path_str, member_name)] given directly to MpbBatchParser.parse_from_jars + self.mpb_members = mpb_members + # [member_name] for .mps files within this jar + self.mps_members = mps_members + # [prefix] for FPR directories within this JAR, ex: 'module/models/MyModel/' + self.fpr_prefixes = fpr_prefixes + + +class JarScanResult: + + def __init__(self): + # one entry per (.msd, jar) pair so may have duplicates across jars.. + self.solution_infos: List[JarSolutionInfo] = [] + # (namespace, uuid, version, [(jar_path_str, member_name)]) + self.language_infos: List[Tuple[str, str, int, List[Tuple[str, str]]]] = [] + # flat deduplicated list of all (jar_path_str, member_name) for .mpb files + self.all_mpb_members: List[Tuple[str, str]] = [] + # {jar_path_str: (mtime, size)} collected during scan. ParseCache.load_sync() uses these to validate + # cache staleness without extra calls where each JAR is already open so stat() is quite cheap here I think.. + self.jar_stats: Dict[str, Tuple[float, int]] = {} + + +def scan_jar(jar_path: Path) -> Optional[JarScanResult]: + if not jar_is_relevant(jar_path): + return None + + jar_path_str = str(jar_path) + result = JarScanResult() + seen_mpbs = set() + + try: + with zipfile.ZipFile(jar_path_str) as zf: + try: + stat = jar_path.stat() + result.jar_stats[jar_path_str] = (stat.st_mtime, stat.st_size) + except Exception: + pass + + names = zf.namelist() + mpb_names = [n for n in names if n.endswith(".mpb")] + mps_names = [n for n in names if n.endswith(".mps")] + fpr_prefixes = [ + n[: -len(".model")] + for n in names + if n.endswith("/.model") and "/models/" in n + ] + + for msd_name in (n for n in names if n.endswith(".msd")): + try: + root = ET.fromstring(zf.read(msd_name)) + sol_name = root.get("name", "") + sol_uuid = root.get("uuid", "") + if not sol_name: + continue + prefix = _models_prefix(msd_name) + solution = SSolution(sol_name, sol_uuid) + solution.path_to_solution_file = jar_path + + mpb_members = [ + (jar_path_str, n) for n in mpb_names if n.startswith(prefix) + ] + mps_members = [n for n in mps_names if n.startswith(prefix)] + sol_fpr = [p for p in fpr_prefixes if p.startswith(prefix)] + + result.solution_infos.append( + JarSolutionInfo( + solution, jar_path_str, mpb_members, mps_members, sol_fpr + ) + ) + for m in mpb_members: + if m not in seen_mpbs: + seen_mpbs.add(m) + result.all_mpb_members.append(m) + + except Exception as exc: + warnings.warn( + f"Failed to read {msd_name} in {jar_path.name}: {exc}" + ) + + for mpl_name in (n for n in names if n.endswith(".mpl")): + try: + root = ET.fromstring(zf.read(mpl_name)) + namespace = root.get("namespace", "") + uuid = root.get("uuid", "") + version = int(root.get("languageVersion", "0")) + if not namespace: + continue + prefix = _models_prefix(mpl_name) + members = [ + (jar_path_str, n) for n in mpb_names if n.startswith(prefix) + ] + result.language_infos.append((namespace, uuid, version, members)) + for m in members: + if m not in seen_mpbs: + seen_mpbs.add(m) + result.all_mpb_members.append(m) + except Exception as exc: + warnings.warn( + f"Failed to read {mpl_name} in {jar_path.name}: {exc}" + ) + + except Exception as exc: + warnings.warn(f"Failed to scan {jar_path.name}: {exc}") + return None + + return result + + +def scan_all_jars(jar_paths: List[Path], workers: int) -> JarScanResult: + combined = JarScanResult() + seen_mpbs = set() + lock = threading.Lock() + + def process_one(jar_path: Path): + result = scan_jar(jar_path) + if result is None: + return + with lock: + combined.solution_infos.extend(result.solution_infos) + combined.language_infos.extend(result.language_infos) + combined.jar_stats.update(result.jar_stats) + for m in result.all_mpb_members: + if m not in seen_mpbs: + seen_mpbs.add(m) + combined.all_mpb_members.append(m) + + with ThreadPoolExecutor(max_workers=workers) as pool: + futures = {pool.submit(process_one, jp): jp for jp in jar_paths} + for future in as_completed(futures): + exc = future.exception() + if exc: + warnings.warn(f"Error scanning {futures[future]}: {exc}") + + return combined + + +def _models_prefix(member_name: str) -> str: + if "/" in member_name: + return member_name.rsplit("/", 1)[0] + "/models/" + return "models/" diff --git a/mps-cli-py/src/mpscli/model/builder/utils/MpbBatchParser.py b/mps-cli-py/src/mpscli/model/builder/utils/MpbBatchParser.py index beccfb5..e0d81a4 100644 --- a/mps-cli-py/src/mpscli/model/builder/utils/MpbBatchParser.py +++ b/mps-cli-py/src/mpscli/model/builder/utils/MpbBatchParser.py @@ -1,34 +1,69 @@ -# mpscli/model/builder/MpbBatchParser.py +# mpscli/model/builder/utils/MpbBatchParser.py # -# Parallel and cached batch parser for .mpb model files. -# -# Usage: -# parser = MpbBatchParser(workers, use_cache, cache_load_fn, cache_save_fn) -# results = parser.parse(list_of_path_strings) -# results is a dict - {path_str - SModel | None} +# Parallel batch parser for .mpb binary model files. Handles caching via ModelCache call backs and parallel +# execution via ProcessPoolExecutor. +# parse_from_jars handles .mpb files inside JARs and reads bytes directly without extraction.. +# FPR and MPS parsing logic are in in DiskModelLoader import os import warnings +from collections import defaultdict from concurrent.futures import ProcessPoolExecutor, as_completed +from contextlib import nullcontext from pathlib import Path -from typing import Callable, Dict, List, Optional +from typing import Callable, Dict, List, Optional, Tuple def _parse_mpb_worker(path_str: str): - # top-level function (not a method) so it is picklable for ProcessPoolExecutor. Each worker process constructs - # its own SModelBuilderBinaryPersistency instance and the builder is stateful (index maps, string table) - # so it must not be shared.. + # top-level so it is picklable for ProcessPoolExecutor. Each worker constructs its own builder so the builder + # is stateful (index maps, string tables..) and should not be shared across processes + from mpscli.model.builder.SModelBuilderBinaryPersistency import ( + SModelBuilderBinaryPersistency, + ) + + return SModelBuilderBinaryPersistency().build(path_str) + + +def _parse_jar_members_batch_worker(jar_path_str: str, member_names: List[str]): + # top-level so it is picklable for ProcessPoolExecutor. Eachh worker opens its own JAR directly. + import zipfile from mpscli.model.builder.SModelBuilderBinaryPersistency import ( SModelBuilderBinaryPersistency, ) + from mpscli.model.builder.SLanguageBuilder import SLanguageBuilder + + existing_names = set(SLanguageBuilder.languages.keys()) + results = {} + with zipfile.ZipFile(jar_path_str) as zf: + for member_name in member_names: + try: + data = zf.read(member_name) + results[ + member_name + ] = SModelBuilderBinaryPersistency().build_from_bytes( + data, path_hint=f"{jar_path_str}!{member_name}" + ) + except Exception as exc: + warnings.warn(f"Failed to parse {jar_path_str}!{member_name}: {exc}") + + new_lang_pairs = [ + (name, lang.uuid) + for name, lang in SLanguageBuilder.languages.items() + if name not in existing_names + ] + return results, new_lang_pairs - builder = SModelBuilderBinaryPersistency() - return builder.build(path_str) + +def _register_lang_pairs(lang_pairs: list) -> None: + from mpscli.model.builder.SLanguageBuilder import SLanguageBuilder + + for lang_name, lang_uuid in lang_pairs: + SLanguageBuilder.get_language(lang_name, lang_uuid) class MpbBatchParser: - # threshold below which a process pool is not used - spawning workers has fixed overhead of around 100 or 200ms - # that is good for small batches + # threshold below which a ProcessPool is not used and the reason being is that spawning workers has fixed + # overhead of around 100-200ms so serial parsing is faster for small batches.. PARALLEL_THRESHOLD: int = 4 def __init__( @@ -37,36 +72,25 @@ def __init__( use_cache: bool = False, cache_load_fn: Optional[Callable[[Path], object]] = None, cache_save_fn: Optional[Callable[[Path, object], None]] = None, + parse_cache=None, ): - # workers: number of parallel processes for large batches. - # if use_cache is set to True then cache_load_fn and cache_save_fn are called before/after each parse to - # avoid reparsing unchanged files.. self._workers = workers self._use_cache = use_cache self._cache_load_fn = cache_load_fn self._cache_save_fn = cache_save_fn + self._parse_cache = parse_cache - def parse(self, path_strings: List[str]) -> Dict[str, object]: - # Parse all .mpb files in path_strings, returning a dict of {path_str - SModel | None} - # - # workflow explained below: - # 1. Check cache for each path - # 2. For the remaining (uncached) paths: - # :if batch is small (<= PARALLEL_THRESHOLD) or workers == 1 then parse serially - # :otherwise dispatch to a ProcessPoolExecutor - # 3. Save newly parsed models to cache. - # - # We use both ProcessPoolExecutor and ThreadPoolExecutor since .mpb parsing is - # cpu-bound (binary decode + struct unpacking), so processes avoid the GIL and give true parallelism - # where each worker imports the builder fresh so no shared state. - + def parse( + self, + path_strings: List[str], + pool: Optional[ProcessPoolExecutor] = None, + ) -> Dict[str, object]: + # parse a list of .mpb file paths and return {path_str: SModel}. This checks ModelCache first and + # then parses uncached paths either serially or in parallel depending on the batch size. if not path_strings: return {} - results: Dict[str, object] = {} workers = self._workers or min(os.cpu_count() or 4, 16) - - # phase 1: cache check.. uncached: List[str] = [] for ps in path_strings: cached = ( @@ -78,32 +102,23 @@ def parse(self, path_strings: List[str]) -> Dict[str, object]: results[ps] = cached else: uncached.append(ps) - if not uncached: return results - - # phase2 - parse uncached paths - use_pool = workers > 1 and len(uncached) > self.PARALLEL_THRESHOLD - - if not use_pool: - # serial path - simple and no spawn overhead so it's sufficient for small batches - for ps in uncached: + if pool is not None: + future_to_path = {pool.submit(_parse_mpb_worker, ps): ps for ps in uncached} + for future in as_completed(future_to_path): + ps = future_to_path[future] try: - model = _parse_mpb_worker(ps) + model = future.result() results[ps] = model self._maybe_save_cache(ps, model) except Exception as exc: warnings.warn(f"Failed to parse {ps}: {exc}") results[ps] = None - else: - # parallel path - one process per worker and each parses independently - import multiprocessing - - multiprocessing.freeze_support() - - with ProcessPoolExecutor(max_workers=workers) as pool: + elif workers > 1 and len(uncached) > self.PARALLEL_THRESHOLD: + with ProcessPoolExecutor(max_workers=workers) as own_pool: future_to_path = { - pool.submit(_parse_mpb_worker, ps): ps for ps in uncached + own_pool.submit(_parse_mpb_worker, ps): ps for ps in uncached } for future in as_completed(future_to_path): ps = future_to_path[future] @@ -114,7 +129,97 @@ def parse(self, path_strings: List[str]) -> Dict[str, object]: except Exception as exc: warnings.warn(f"Failed to parse {ps}: {exc}") results[ps] = None + else: + for ps in uncached: + try: + model = _parse_mpb_worker(ps) + results[ps] = model + self._maybe_save_cache(ps, model) + except Exception as exc: + warnings.warn(f"Failed to parse {ps}: {exc}") + results[ps] = None + return results + + def get_cached_and_uncached_by_jar( + self, jar_members: List[Tuple[str, str]] + ) -> Tuple[Dict[Tuple[str, str], object], Dict[str, List[str]]]: + # returns warm cache hits and cold misses grouped by jar.. + # ParseCache.load_sync() must be called before this. + # Returns: + # cached_results that contains (jar_path_str, member_name): SModel and + # uncached_by_jar that contains jar_path_str: [member_name] + cached_results: Dict[Tuple[str, str], object] = {} + uncached_by_jar: Dict[str, List[str]] = defaultdict(list) + + by_jar: Dict[str, List[str]] = defaultdict(list) + for jar_path_str, member_name in jar_members: + by_jar[jar_path_str].append(member_name) + + for jar_path_str, members in by_jar.items(): + jar_cache = ( + self._parse_cache.get_mpb(jar_path_str) + if self._use_cache and self._parse_cache + else None + ) + if jar_cache is not None: + for member in members: + model = jar_cache.get(member) + if model is not None: + cached_results[(jar_path_str, member)] = model + else: + uncached_by_jar[jar_path_str].append(member) + else: + uncached_by_jar[jar_path_str].extend(members) + return cached_results, uncached_by_jar + + def flush_jar_cache( + self, + jar_new: Dict[str, dict], + jar_stats: Optional[Dict[str, Tuple[float, int]]] = None, + ) -> None: + # stores parsed models in ParseCache with mtime and size information from the scan. + if self._use_cache and self._parse_cache and jar_new: + self._parse_cache.put_mpb_many(jar_new, jar_stats) + + def parse_from_jars( + self, + jar_members: List[Tuple[str, str]], + pool: Optional[ProcessPoolExecutor] = None, + ) -> Dict[Tuple[str, str], object]: + # parse .mpb members from jars so basically it reads bytes directly without extraction + if not jar_members: + return {} + workers = self._workers or min(os.cpu_count() or 4, 16) + cached_results, uncached_by_jar = self.get_cached_and_uncached_by_jar( + jar_members + ) + results = dict(cached_results) + if not uncached_by_jar: + return results + pool_ctx = ( + nullcontext(pool) + if pool is not None + else ProcessPoolExecutor(max_workers=workers) + ) + jar_new: Dict[str, dict] = {} + with pool_ctx as _pool: + futures = { + _pool.submit(_parse_jar_members_batch_worker, jar, members): jar + for jar, members in uncached_by_jar.items() + } + for future in as_completed(futures): + jar = futures[future] + try: + batch, lang_pairs = future.result() + jar_new[jar] = batch + for member, model in batch.items(): + results[(jar, member)] = model + if lang_pairs: + _register_lang_pairs(lang_pairs) + except Exception as exc: + warnings.warn(f"Failed to parse JAR {jar}: {exc}") + self.flush_jar_cache(jar_new) return results def _maybe_save_cache(self, path_str: str, model) -> None: diff --git a/mps-cli-py/src/mpscli/model/builder/utils/ParseCache.py b/mps-cli-py/src/mpscli/model/builder/utils/ParseCache.py new file mode 100644 index 0000000..8abbf14 --- /dev/null +++ b/mps-cli-py/src/mpscli/model/builder/utils/ParseCache.py @@ -0,0 +1,417 @@ +# mpscli/model/builder/utils/ParseCache.py +# +# Persistent per-jar cache for parsed SModel objects. +# +# Loadingg is done this way: synchronous and called explicitly after scan_all_jars completes... +# This seems to be the only correct approach since loading too many files concurrently with +# scan_all_jars I think causes I/O competition that most likely degrades scan time significantly regardless of +# whether we cache bytes or SModel.. +# +# Contents are basically SModel objects +# +# Writing works this way: non-daemon background thread so Python waits before exit +# +# Cache format explained below for per JAR entry: +# {md5}.npz - all numpy arrays for all models in this JAR, saved with np.savez() so np.load(mmap_mode='r') maps +# them directly into virtual memory without reading bytes. +# {md5}_meta.pkl - small Python objects per model: name, uuid, path, concepts_table, roles_table, prop_keys, +# ref_keys,ref_model_uuids and root_idxs. +# +# Format migration explained: if an old format file (no extension, legacy pickle) is found then it is evicted +# silently and treated as a cache miss and the next cold run rebuilds it in the new format automaticallyy.. + +import hashlib +import logging +import os +import pickle +import threading +import warnings +from pathlib import Path +from typing import Dict, Optional, Tuple + +import numpy as np + +_log = logging.getLogger(__name__) + + +# arrays stored per model in the npz using m{i}_ as prefix for model index i. bytes blobs are stored as +# uint8 arrays so np.savez can handle them without setting allow_pickle=True and so they are directly memmappable.. +_ARRAY_KEYS = [ + "np_concept_idxs", + "np_role_idxs", + "np_parent_idxs", + "np_first_child", + "np_next_sibling", + "uuid_offsets", + "uuid_data", + "prop_start", + "prop_key_idxs", + "prop_val_offsets", + "prop_val_data", + "ref_start", + "ref_key_idxs", + "ref_model_idxs", + "ref_node_uuid_offsets", + "ref_node_uuid_data", + "ref_resolve_offsets", + "ref_resolve_data", +] + + +def _model_to_arrays(model): + # extractt the small Python objects that cannot be stored as numpy arrays + def as_uint8(b): + if isinstance(b, (bytes, bytearray)): + return np.frombuffer(b, dtype=np.uint8) + # already numpy uint8 array + return b + + return { + "np_concept_idxs": model._np_concept_idxs, + "np_role_idxs": model._np_role_idxs, + "np_parent_idxs": model._np_parent_idxs, + "np_first_child": model._np_first_child, + "np_next_sibling": model._np_next_sibling, + "uuid_offsets": model._uuid_offsets, + "uuid_data": as_uint8(model._uuid_data), + "prop_start": model._prop_start, + "prop_key_idxs": model._prop_key_idxs, + "prop_val_offsets": model._prop_val_offsets, + "prop_val_data": as_uint8(model._prop_val_data), + "ref_start": model._ref_start, + "ref_key_idxs": model._ref_key_idxs, + "ref_model_idxs": model._ref_model_idxs, + "ref_node_uuid_offsets": model._ref_node_uuid_offsets, + "ref_node_uuid_data": as_uint8(model._ref_node_uuid_data), + "ref_resolve_offsets": model._ref_resolve_offsets, + "ref_resolve_data": as_uint8(model._ref_resolve_data), + } + + +def _model_meta(model, member_name): + # extractt the small Python objects that cannot be stored as numpy arrays + return { + "member_name": member_name, + "name": model.name, + "uuid": model.uuid, + "is_do_not_generate": model.is_do_not_generate, + "path_to_model_file": str(model.path_to_model_file), + "root_idxs": model._root_idxs, + "concepts_table": model._concepts_table, + "roles_table": model._roles_table, + "prop_keys": model._prop_keys, + "ref_keys": model._ref_keys, + "ref_model_uuids": model._ref_model_uuids, + } + + +def _restore_model(arrays, meta): + # reconstruct a finalized SModel from npz arrays and pkl metadata. arrays is an NpzFile opened with + # mmap_mode='r' so that array access is lazy I guess.. + from mpscli.model.SModel import SModel + + m = SModel.__new__(SModel) + m.name = meta["name"] + m.uuid = meta["uuid"] + m.is_do_not_generate = meta["is_do_not_generate"] + raw_path = meta.get("path_to_model_file", "") + m.path_to_model_file = Path(raw_path) if raw_path else "" + + # building buffers are not needed after restore so mark it as finalized + m._uuids = None + m._concept_buf = None + m._role_buf = None + m._parent_buf = None + m._properties = None + m._references = None + m._concepts_map = {} + m._roles_map = {} + + # small Python objects from pkl + m._concepts_table = meta["concepts_table"] + m._roles_table = meta["roles_table"] + m._prop_keys = meta["prop_keys"] + m._ref_keys = meta["ref_keys"] + m._ref_model_uuids = meta["ref_model_uuids"] + m._root_idxs = meta["root_idxs"] + + # lazy caches so not serialized + m._node_cache = {} + m._uuid_index = None + m._prop_key_lookup = None + m._ref_key_lookup = None + + # numpy arrays from npz (these are mmaped on warm load so no bytes read yet) + m._np_concept_idxs = arrays["np_concept_idxs"] + m._np_role_idxs = arrays["np_role_idxs"] + m._np_parent_idxs = arrays["np_parent_idxs"] + m._np_first_child = arrays["np_first_child"] + m._np_next_sibling = arrays["np_next_sibling"] + m._uuid_offsets = arrays["uuid_offsets"] + m._uuid_data = arrays["uuid_data"] + m._prop_start = arrays["prop_start"] + m._prop_key_idxs = arrays["prop_key_idxs"] + m._prop_val_offsets = arrays["prop_val_offsets"] + m._prop_val_data = arrays["prop_val_data"] + m._ref_start = arrays["ref_start"] + m._ref_key_idxs = arrays["ref_key_idxs"] + m._ref_model_idxs = arrays["ref_model_idxs"] + m._ref_node_uuid_offsets = arrays["ref_node_uuid_offsets"] + m._ref_node_uuid_data = arrays["ref_node_uuid_data"] + m._ref_resolve_offsets = arrays["ref_resolve_offsets"] + m._ref_resolve_data = arrays["ref_resolve_data"] + + m._finalized = True + m._build_root_nodes() + return m + + +class _FileCache: + + def __init__(self, cache_dir: Path, workers: int): + self._dir = cache_dir + self._workers = workers + self._data: Dict[str, dict] = {} + self._dirty: Dict[str, Tuple[float, int, dict]] = {} + self._lock = threading.Lock() + self._dir.mkdir(parents=True, exist_ok=True) + + def _npz_path(self, jar_path_str: str) -> Path: + return self._dir / (hashlib.md5(jar_path_str.encode()).hexdigest() + ".npz") + + def _meta_path(self, jar_path_str: str) -> Path: + return self._dir / ( + hashlib.md5(jar_path_str.encode()).hexdigest() + "_meta.pkl" + ) + + def _load_one( + self, meta_file: Path, jar_stats: Dict[str, Tuple[float, int]] + ) -> Optional[Tuple[str, dict]]: + # load one jar's cached models from npz + pkl pair. npz is memory-mapped so arrays are nott read into RAM + # until first access and warm cache_load could most likely be instant I guess.. + npz_file = meta_file.with_name(meta_file.name.replace("_meta.pkl", ".npz")) + if not npz_file.exists(): + # meta without npz means incomplete write so remove + try: + meta_file.unlink() + except FileNotFoundError: + pass + return None + try: + with meta_file.open("rb") as f: + entry = pickle.load(f) + jar_path_str = entry.get("jar_path_str", "") + if not jar_path_str: + return None + stored_mtime = entry.get("mtime") + stored_size = entry.get("size") + if stored_mtime is None or stored_size is None: + try: + meta_file.unlink() + except FileNotFoundError: + pass + try: + npz_file.unlink() + except FileNotFoundError: + pass + return None + current = jar_stats.get(jar_path_str) + if current is None: + # jar no longer in scan so remove.. + try: + meta_file.unlink() + except FileNotFoundError: + pass + try: + npz_file.unlink() + except FileNotFoundError: + pass + return None + cur_mtime, cur_size = current + if stored_mtime != cur_mtime or stored_size != cur_size: + # jar is rebuilt so remove any possible stale entries + try: + meta_file.unlink() + except FileNotFoundError: + pass + try: + npz_file.unlink() + except FileNotFoundError: + pass + return None + + # mmap the npz so most likely no bytes read until arrays are actually accessed.. + # this mmap concept is really awesome! + npz = np.load(str(npz_file), mmap_mode="r", allow_pickle=False) + content = {} + for i, model_meta in enumerate(entry["models"]): + prefix = f"m{i}_" + model_arrays = {k: npz[prefix + k] for k in _ARRAY_KEYS} + content[model_meta["member_name"]] = _restore_model( + model_arrays, model_meta + ) + return jar_path_str, content + except Exception: + return None + + def load_sync(self, jar_stats: Dict[str, Tuple[float, int]]) -> None: + # synchronous parallel load called after scan_all_jars so no I/O competition with scan and also + # no GIL competition likely from concurrent threads + meta_files = [ + f + for f in self._dir.iterdir() + if f.is_file() and f.name.endswith("_meta.pkl") + ] + # remove legacy format files (no extension and old pickle-only cache formatss) + for f in self._dir.iterdir(): + if f.is_file() and f.suffix == "" and not f.name.endswith("_meta"): + try: + f.unlink() + except FileNotFoundError: + pass + if not meta_files: + return + from concurrent.futures import ThreadPoolExecutor + + loaded = 0 + with ThreadPoolExecutor(max_workers=self._workers) as pool: + for result in pool.map( + lambda mf: self._load_one(mf, jar_stats), meta_files + ): + if result is not None: + jar_path_str, content = result + self._data[jar_path_str] = content + loaded += 1 + if loaded: + _log.info( + "[cache] loaded %d/%d entries from %s", + loaded, + len(meta_files), + self._dir.name, + ) + + def get(self, jar_path_str: str) -> Optional[dict]: + return self._data.get(jar_path_str) + + def put_many( + self, + jar_map: Dict[str, dict], + jar_stats: Optional[Dict[str, Tuple[float, int]]] = None, + ) -> None: + with self._lock: + for jar_path_str, content in jar_map.items(): + self._data[jar_path_str] = content + if jar_stats: + stats = jar_stats.get(jar_path_str) + if stats: + mtime, size = stats + self._dirty[jar_path_str] = (mtime, size, content) + + def _save_all(self, to_write: Dict) -> None: + from timeit import default_timer as timer + + t0 = timer() + for jar_path_str, (mtime, size, content) in to_write.items(): + try: + # [(member_name, SModel)] + models = list(content.items()) + # build combined npz dict with m{i}_ prefixed array names + npz_data = {} + models_meta = [] + for i, (member_name, model) in enumerate(models): + prefix = f"m{i}_" + for k, v in _model_to_arrays(model).items(): + npz_data[prefix + k] = v + models_meta.append(_model_meta(model, member_name)) + npz_path = self._npz_path(jar_path_str) + meta_path = self._meta_path(jar_path_str) + # write npz first and in case if interrupted then I think meta load will fail because _load_one + # checks npz existence first and np.savez appends .npz to names that do not already end in .npz and + # also use _tmp.npz as the temp name so the rename is mostly not ambiguous on Windows.. + tmp_npz = npz_path.with_name(npz_path.stem + "_tmp.npz") + np.savez(str(tmp_npz), **npz_data) + tmp_npz.replace(npz_path) + # write meta pkl + entry = { + "jar_path_str": jar_path_str, + "mtime": mtime, + "size": size, + "models": models_meta, + } + tmp_meta = meta_path.with_name(meta_path.stem + "_tmp.pkl") + with tmp_meta.open("wb") as f: + pickle.dump(entry, f, protocol=pickle.HIGHEST_PROTOCOL) + tmp_meta.replace(meta_path) + except Exception as exc: + warnings.warn(f"Cache save failed for {jar_path_str}: {exc}") + _log.info( + "[cache] saved %d entries to %s in %.1fs", + len(to_write), + self._dir.name, + timer() - t0, + ) + + def flush(self) -> None: + with self._lock: + to_write = dict(self._dirty) + self._dirty.clear() + if not to_write: + return + _log.info("[cache] saving %d entries to %s...", len(to_write), self._dir.name) + threading.Thread( + target=self._save_all, + args=(to_write,), + daemon=False, + name="cache-flush", + ).start() + + +class ParseCache: + _CACHE_ROOT = Path.home() / ".mps_cli_cache" + + def __init__(self, workers: int = None): + w = workers or min(os.cpu_count() or 4, 16) + self._mpb = _FileCache(self._CACHE_ROOT / "jar_mpb", w) + self._xml = _FileCache(self._CACHE_ROOT / "jar_xml", w) + + def load_sync(self, jar_stats: Dict[str, Tuple[float, int]]) -> None: + # call once after scan_all_jars, this validates staleness using jar_stats + self._mpb.load_sync(jar_stats) + self._xml.load_sync(jar_stats) + + def get_mpb(self, jar_path_str: str) -> Optional[dict]: + return self._mpb.get(jar_path_str) + + def put_mpb_many( + self, + jar_map: Dict[str, dict], + jar_stats: Optional[Dict[str, Tuple[float, int]]] = None, + ) -> None: + self._mpb.put_many(jar_map, jar_stats) + + def get_xml(self, jar_path_str: str) -> Optional[dict]: + return self._xml.get(jar_path_str) + + def put_xml_many( + self, + jar_map: Dict[str, dict], + jar_stats: Optional[Dict[str, Tuple[float, int]]] = None, + ) -> None: + self._xml.put_many(jar_map, jar_stats) + + def flush(self) -> None: + self._mpb.flush() + self._xml.flush() + + def stats(self) -> dict: + def dir_stats(d: Path) -> dict: + if not d.exists(): + return {"entries": 0, "size_mb": 0.0} + files = list(d.iterdir()) + size = sum(f.stat().st_size for f in files if f.is_file()) + return {"entries": len(files), "size_mb": round(size / 1024 / 1024, 1)} + + return { + "jar_mpb": dir_stats(self._CACHE_ROOT / "jar_mpb"), + "jar_xml": dir_stats(self._CACHE_ROOT / "jar_xml"), + } diff --git a/mps-cli-py/tests/binary/test_binary_format_parity.py b/mps-cli-py/tests/binary/test_binary_format_parity.py index 7680653..734c720 100644 --- a/mps-cli-py/tests/binary/test_binary_format_parity.py +++ b/mps-cli-py/tests/binary/test_binary_format_parity.py @@ -9,8 +9,9 @@ def _repo(location): + # set as class variable before construction so __init__ sees it and skips ParseCache + SSolutionsRepositoryBuilder.USE_CACHE = False builder = SSolutionsRepositoryBuilder() - builder.USE_CACHE = False return builder.build(f"../mps_test_projects/{location}") diff --git a/mps-cli-py/tests/binary/test_binary_language_from_mpl.py b/mps-cli-py/tests/binary/test_binary_language_from_mpl.py index cd93ae8..31735a3 100644 --- a/mps-cli-py/tests/binary/test_binary_language_from_mpl.py +++ b/mps-cli-py/tests/binary/test_binary_language_from_mpl.py @@ -11,8 +11,9 @@ def _build_repo(): + # set as class variable before construction so __init__ sees it and skips ParseCache + SSolutionsRepositoryBuilder.USE_CACHE = False builder = SSolutionsRepositoryBuilder() - builder.USE_CACHE = False return builder.build(str(TEST_PROJECT)) diff --git a/mps-cli-py/tests/binary/test_binary_library_top.py b/mps-cli-py/tests/binary/test_binary_library_top.py index 87bcc65..ad0072f 100644 --- a/mps-cli-py/tests/binary/test_binary_library_top.py +++ b/mps-cli-py/tests/binary/test_binary_library_top.py @@ -9,8 +9,9 @@ def _build_repo(): + # set as class variable before construction so __init__ sees it and skips ParseCache + SSolutionsRepositoryBuilder.USE_CACHE = False builder = SSolutionsRepositoryBuilder() - builder.USE_CACHE = False return builder.build(REPO_PATH) diff --git a/mps-cli-py/tests/binary/test_binary_references.py b/mps-cli-py/tests/binary/test_binary_references.py index 0576231..420a79f 100644 --- a/mps-cli-py/tests/binary/test_binary_references.py +++ b/mps-cli-py/tests/binary/test_binary_references.py @@ -12,8 +12,9 @@ def _build_repo(repo_path): + # set as class variable before construction so __init__ sees it and skips ParseCache + SSolutionsRepositoryBuilder.USE_CACHE = False builder = SSolutionsRepositoryBuilder() - builder.USE_CACHE = False return builder.build(repo_path) diff --git a/mps-cli-py/tests/binary/test_binary_repository.py b/mps-cli-py/tests/binary/test_binary_repository.py index 3cc13f4..3354fc4 100644 --- a/mps-cli-py/tests/binary/test_binary_repository.py +++ b/mps-cli-py/tests/binary/test_binary_repository.py @@ -11,8 +11,9 @@ def _build_repo(): + # set as class variable before construction so __init__ sees it and skips ParseCache + SSolutionsRepositoryBuilder.USE_CACHE = False builder = SSolutionsRepositoryBuilder() - builder.USE_CACHE = False return builder.build(REPO_PATH) diff --git a/mps-cli-py/tests/binary/test_binary_repository_completeness.py b/mps-cli-py/tests/binary/test_binary_repository_completeness.py index 678724a..6a06970 100644 --- a/mps-cli-py/tests/binary/test_binary_repository_completeness.py +++ b/mps-cli-py/tests/binary/test_binary_repository_completeness.py @@ -8,8 +8,9 @@ class TestBinaryRepositoryCompleteness(TestBase): REPO_PATH = "../mps_test_projects/mps_cli_binary_persistency_generated/" def _build_repo(self): + # set as class variable before construction so __init__ sees it and skips ParseCache + SSolutionsRepositoryBuilder.USE_CACHE = False builder = SSolutionsRepositoryBuilder() - builder.USE_CACHE = False return builder.build(self.REPO_PATH) def test_repository_builds(self): diff --git a/mps-cli-py/tests/test_base.py b/mps-cli-py/tests/test_base.py index 27abb04..e58bd52 100644 --- a/mps-cli-py/tests/test_base.py +++ b/mps-cli-py/tests/test_base.py @@ -20,9 +20,9 @@ def doSetUp(self, test_data_location): Builds the object model based on MPS models """ SLanguageBuilder.languages = {} - builder = SSolutionsRepositoryBuilder() # tests should never use the disk cache and they need fresh parses every time - builder.USE_CACHE = False + SSolutionsRepositoryBuilder.USE_CACHE = False + builder = SSolutionsRepositoryBuilder() test_data_location = "../mps_test_projects/" + test_data_location print("test data location ", test_data_location) path = os.path.abspath(test_data_location) diff --git a/mps-cli-py/tests/test_modules_and_models.py b/mps-cli-py/tests/test_modules_and_models.py index 9c40188..3fcb6f9 100644 --- a/mps-cli-py/tests/test_modules_and_models.py +++ b/mps-cli-py/tests/test_modules_and_models.py @@ -27,12 +27,16 @@ class TestModulesAndModels(TestBase): "r:ca00da79-915e-4bdb-9c30-11a341daf779", ), ( + # binary models live inside jars and are now read directly from ZIP bytes without extracting to disk. the original approach extracted JARs to a + # Previously we extracted jar to temp folder which gave each model a real on disk path but + # the new approach reads bytes in memory so path_to_model_file is not set + # and so None signals that the path check should be skipped for this case.. "mps_cli_lanuse_binary", "mps.cli.lanuse.library_top", - "mps_test_projects/mps_cli_lanuse_binary/mps_cli_lanuse_file_per_root_jar/mps.cli.lanuse.library_top", + "mps_cli_lanuse_file_per_root", "mps.cli.lanuse.library_second", "mps.cli.lanuse.library_top.authors_top", - "mps_test_projects/mps_cli_lanuse_binary/mps_cli_lanuse_file_per_root_jar/mps.cli.lanuse.library_top/models/mps.cli.lanuse.library_top.authors_top/.model", + None, "r:ec5f093b-9d83-43a1-9b41-b5952da8b1ed", ), ] @@ -70,7 +74,9 @@ def test_build_modules_and_models( library_top_authors_top_model_uuid, library_top_authors_top.uuid ) - self.assertTrue( - library_top_authors_top_path - in library_top_authors_top.path_to_model_file.as_posix() - ) + # None means the model came from a jar and has no on disk path to check... + if library_top_authors_top_path is not None: + self.assertTrue( + library_top_authors_top_path + in library_top_authors_top.path_to_model_file.as_posix() + ) diff --git a/mps-cli-py/tests/test_pack_properties.py b/mps-cli-py/tests/test_pack_properties.py new file mode 100644 index 0000000..a13e702 --- /dev/null +++ b/mps-cli-py/tests/test_pack_properties.py @@ -0,0 +1,77 @@ +# tests/binary/test_pack_properties.py +# +# Unit tests for pack_properties in FlatModelPacker. + +import unittest + +from mpscli.model.builder.utils.FlatModelPacker import pack_properties + + +def _decode_string(data, offsets, idx): + # helper that mirrors SModel._read_bytes and is used to verify round-trips + start = int(offsets[idx]) + end = int(offsets[idx + 1]) + if start == end: + return "" + chunk = data[start:end] + if hasattr(chunk, "tobytes"): + return chunk.tobytes().decode() + return chunk.decode() + + +def _get_props(result, node_idx): + # reconstruct the property dict for a given node + prop_start, prop_key_idxs, prop_val_offsets, prop_val_data, prop_keys = result + start = int(prop_start[node_idx]) + end = int(prop_start[node_idx + 1]) + result_dict = {} + for j in range(start, end): + key = prop_keys[int(prop_key_idxs[j])] + val = _decode_string(prop_val_data, prop_val_offsets, j) + result_dict[key] = val + return result_dict + + +class TestPackProperties(unittest.TestCase): + + def test_all_empty_dicts_produce_zero_properties(self): + prop_start, key_idxs, _, _, prop_keys = pack_properties([{}, {}, {}]) + self.assertEqual(len(prop_keys), 0) + self.assertEqual(len(key_idxs), 0) + for i in range(3): + self.assertEqual(int(prop_start[i]), int(prop_start[i + 1])) + + def test_empty_properties_list(self): + prop_start, _, _, _, _ = pack_properties([]) + self.assertEqual(len(prop_start), 1) + self.assertEqual(int(prop_start[0]), 0) + + def test_single_node_with_properties_round_trips(self): + properties = [{"name": "Alice", "age": "30"}] + result = pack_properties(properties) + props = _get_props(result, 0) + self.assertEqual(props["name"], "Alice") + self.assertEqual(props["age"], "30") + + def test_multiple_nodes_with_varying_property_counts(self): + properties = [ + {"name": "Mark Twain"}, + {}, + {"name": "Hemingway", "year": "1899"}, + ] + result = pack_properties(properties) + self.assertEqual(_get_props(result, 0), {"name": "Mark Twain"}) + self.assertEqual(_get_props(result, 1), {}) + self.assertEqual(_get_props(result, 2), {"name": "Hemingway", "year": "1899"}) + + def test_repeated_key_is_deduplicated_in_prop_keys(self): + # 'name' appears on all three nodes but should only appear once in prop_keys + properties = [{"name": "A"}, {"name": "B"}, {"name": "C"}] + _, _, _, _, prop_keys = pack_properties(properties) + self.assertEqual(prop_keys.count("name"), 1) + + def test_none_value_stored_as_empty_string(self): + properties = [{"description": None}] + result = pack_properties(properties) + props = _get_props(result, 0) + self.assertEqual(props["description"], "") diff --git a/mps-cli-py/tests/test_pack_references.py b/mps-cli-py/tests/test_pack_references.py new file mode 100644 index 0000000..cffbe54 --- /dev/null +++ b/mps-cli-py/tests/test_pack_references.py @@ -0,0 +1,102 @@ +# tests/binary/test_pack_references.py +# +# Unit tests for pack_references in FlatModelPacker. + +import unittest + +from mpscli.model.builder.utils.FlatModelPacker import pack_references + + +def _decode_string(data, offsets, idx): + # helper that mirrors SModel._read_bytes - used to verify round-trips + start = int(offsets[idx]) + end = int(offsets[idx + 1]) + if start == end: + return "" + chunk = data[start:end] + if hasattr(chunk, "tobytes"): + return chunk.tobytes().decode() + return chunk.decode() + + +class _FakeRef: + def __init__(self, model_uuid, node_uuid, resolve_info): + self.model_uuid = model_uuid + self.node_uuid = node_uuid + self.resolve_info = resolve_info + + +def _get_ref(result, node_idx, role): + # reconstruct one reference for a given node and role + ( + ref_start, + ref_key_idxs, + ref_model_idxs, + ref_node_uuid_offsets, + ref_node_uuid_data, + ref_resolve_offsets, + ref_resolve_data, + ref_keys, + ref_model_uuids, + ) = result + start = int(ref_start[node_idx]) + end = int(ref_start[node_idx + 1]) + key_idx = ref_keys.index(role) if role in ref_keys else -1 + for j in range(start, end): + if int(ref_key_idxs[j]) == key_idx: + model_uuid = ref_model_uuids[int(ref_model_idxs[j])] + node_uuid = _decode_string(ref_node_uuid_data, ref_node_uuid_offsets, j) + resolve = _decode_string(ref_resolve_data, ref_resolve_offsets, j) + return model_uuid, node_uuid, resolve + return None + + +class TestPackReferences(unittest.TestCase): + + def test_all_empty_dicts_produce_zero_references(self): + result = pack_references([{}, {}, {}]) + ref_start, ref_key_idxs = result[0], result[1] + self.assertEqual(len(ref_key_idxs), 0) + for i in range(3): + self.assertEqual(int(ref_start[i]), int(ref_start[i + 1])) + + def test_empty_references_list(self): + result = pack_references([]) + self.assertEqual(len(result[0]), 1) + self.assertEqual(int(result[0][0]), 0) + + def test_single_node_with_reference_round_trips(self): + ref = _FakeRef("r:model-uuid-123", "node-uuid-456", "MyType") + result = pack_references([{"extends": ref}]) + found = _get_ref(result, 0, "extends") + self.assertIsNotNone(found) + model_uuid, node_uuid, resolve = found + self.assertEqual(model_uuid, "r:model-uuid-123") + self.assertEqual(node_uuid, "node-uuid-456") + self.assertEqual(resolve, "MyType") + + def test_model_uuid_is_deduplicated_in_ref_model_uuids(self): + # same model_uuid on two different nodes should appear once only + ref_a = _FakeRef("r:shared-model", "node-1", "TypeA") + ref_b = _FakeRef("r:shared-model", "node-2", "TypeB") + result = pack_references([{"role1": ref_a}, {"role2": ref_b}]) + self.assertEqual(result[8].count("r:shared-model"), 1) + + def test_role_names_are_deduplicated_in_ref_keys(self): + ref_a = _FakeRef("r:m1", "n1", "T1") + ref_b = _FakeRef("r:m2", "n2", "T2") + result = pack_references([{"type": ref_a}, {"type": ref_b}]) + self.assertEqual(result[7].count("type"), 1) + + def test_node_with_no_references_has_empty_slice(self): + ref = _FakeRef("r:m", "n", "T") + result = pack_references([{}, {"role": ref}, {}]) + ref_start = result[0] + self.assertEqual(int(ref_start[0]), int(ref_start[1])) + self.assertEqual(int(ref_start[2]) - int(ref_start[1]), 1) + self.assertEqual(int(ref_start[2]), int(ref_start[3])) + + def test_none_model_uuid_stored_as_empty_string(self): + ref = _FakeRef(None, "node-uuid", "SomeType") + result = pack_references([{"ref": ref}]) + self.assertIn("", result[8]) diff --git a/mps-cli-py/tests/test_pack_strings.py b/mps-cli-py/tests/test_pack_strings.py new file mode 100644 index 0000000..a2ff9b4 --- /dev/null +++ b/mps-cli-py/tests/test_pack_strings.py @@ -0,0 +1,57 @@ +# tests/binary/test_pack_strings.py +# +# Unit tests for pack_strings in FlatModelPacker. + +import unittest + +from mpscli.model.builder.utils.FlatModelPacker import pack_strings + + +def _decode_string(data, offsets, idx): + # helper that mirrors SModel._read_bytes and is used to verify round-trips + start = int(offsets[idx]) + end = int(offsets[idx + 1]) + if start == end: + return "" + chunk = data[start:end] + if hasattr(chunk, "tobytes"): + return chunk.tobytes().decode() + return chunk.decode() + + +class TestPackStrings(unittest.TestCase): + + def test_empty_list_produces_single_zero_offset_and_empty_bytes(self): + offsets, data = pack_strings([]) + self.assertEqual(len(offsets), 1) + self.assertEqual(int(offsets[0]), 0) + self.assertEqual(data, b"") + + def test_single_string_round_trips(self): + offsets, data = pack_strings(["hello"]) + self.assertEqual(_decode_string(data, offsets, 0), "hello") + + def test_multiple_strings_all_round_trip(self): + strings = ["alpha", "beta", "gamma"] + offsets, data = pack_strings(strings) + for i, s in enumerate(strings): + self.assertEqual(_decode_string(data, offsets, i), s) + + def test_empty_string_in_middle_round_trips(self): + offsets, data = pack_strings(["before", "", "after"]) + self.assertEqual(_decode_string(data, offsets, 0), "before") + self.assertEqual(_decode_string(data, offsets, 1), "") + self.assertEqual(_decode_string(data, offsets, 2), "after") + + def test_all_empty_strings(self): + offsets, data = pack_strings(["", "", ""]) + self.assertEqual(data, b"") + for i in range(3): + self.assertEqual(_decode_string(data, offsets, i), "") + + def test_multibyte_utf8_character_round_trips(self): + # euro sign is 3 bytes in UTF-8 and thiss verifies that byte-length is used not char-length + strings = ["price: 5\u20ac", "normal"] + offsets, data = pack_strings(strings) + self.assertEqual(_decode_string(data, offsets, 0), "price: 5\u20ac") + self.assertEqual(_decode_string(data, offsets, 1), "normal") diff --git a/mps-cli-py/tests/test_parse_cache.py b/mps-cli-py/tests/test_parse_cache.py new file mode 100644 index 0000000..dad67c0 --- /dev/null +++ b/mps-cli-py/tests/test_parse_cache.py @@ -0,0 +1,233 @@ +# tests/binary/test_parse_cache.py +# +# Round-trip tests for ParseCache serialization. This verifies that an SModel saved to npz + pkl +# and restored via _restore_model produces a model whose nodes, properties, references +# and parent-child links are identical to the original parsed model + +import pickle +import tempfile +import unittest +from pathlib import Path + +import numpy as np + +from mpscli.model.SModel import SModel +from mpscli.model.builder.SModelBuilderBinaryPersistency import ( + SModelBuilderBinaryPersistency, +) +from mpscli.model.builder.utils.ParseCache import ( + _FileCache, + _model_meta, + _model_to_arrays, + _restore_model, +) + +MPB = ( + "../mps_test_projects/" + "mps_cli_binary_persistency_generated_low_level_access_test_data/" + "mps.cli.lanuse.library_top.binary_persistency.authors_top.mpb" +) + +ROOT_UUID = "4Yb5JA31NUu" +ROOT_CONCEPT = "mps.cli.landefs.people.structure.PersonsContainer" +ROOT_NAME_PROP = "_010_classical_authors" +CHILD_CONCEPT = "mps.cli.landefs.people.structure.Person" +MARK_TWAIN_UUID = "4Yb5JA31NUv" + + +def _save_and_restore(original_model): + # serialize original_model to a temp directory using the same format as ParseCache._save_all and then + # reload with np.load and _restore_model exactly as ParseCache._load_one does on warm runss + member_name = "test/model.mpb" + arrays = _model_to_arrays(original_model) + meta = _model_meta(original_model, member_name) + + with tempfile.TemporaryDirectory() as tmp: + npz_path = Path(tmp) / "model.npz" + pkl_path = Path(tmp) / "model_meta.pkl" + + # prefix arrays with m0_ to match the multi-model format used in production + prefixed = {"m0_" + k: v for k, v in arrays.items()} + np.savez(str(npz_path), **prefixed) + + with pkl_path.open("wb") as f: + pickle.dump({"models": [meta]}, f, protocol=pickle.HIGHEST_PROTOCOL) + + # load without mmap so Windows can probably delete the temp file after the test. mmap is basically a + # read optimization so the correctness of save or restore does not depend on it I think.. + npz = np.load(str(npz_path), allow_pickle=False) + with pkl_path.open("rb") as f: + entry = pickle.load(f) + + model_meta_loaded = entry["models"][0] + model_arrays = {k: npz["m0_" + k] for k in arrays.keys()} + # explicitly close the NpzFile so Windows releases the file handle before temp directory cleanup.. + # without mmap the arrays are already in memory so closing is safe + npz.close() + + return _restore_model(model_arrays, model_meta_loaded) + + +class TestParseCacheRoundTrip(unittest.TestCase): + + def setUp(self): + builder = SModelBuilderBinaryPersistency() + self.original = builder.build(MPB) + self.restored = _save_and_restore(self.original) + + def test_model_name_preserved(self): + self.assertEqual(self.original.name, self.restored.name) + + def test_model_uuid_preserved(self): + self.assertEqual(self.original.uuid, self.restored.uuid) + + def test_root_node_count_preserved(self): + self.assertEqual( + len(self.original.root_nodes), + len(self.restored.root_nodes), + ) + + def test_root_uuid_preserved(self): + self.assertEqual(ROOT_UUID, self.restored.root_nodes[0].uuid) + + def test_root_concept_preserved(self): + self.assertEqual(ROOT_CONCEPT, self.restored.root_nodes[0].concept.name) + + def test_root_name_property_preserved(self): + self.assertEqual( + ROOT_NAME_PROP, self.restored.root_nodes[0].get_property("name") + ) + + def test_all_child_name_properties_preserved(self): + orig_names = { + c.get_property("name") for c in self.original.root_nodes[0].children + } + rest_names = { + c.get_property("name") for c in self.restored.root_nodes[0].children + } + self.assertEqual(orig_names, rest_names) + + def test_child_count_preserved(self): + self.assertEqual( + len(self.original.root_nodes[0].children), + len(self.restored.root_nodes[0].children), + ) + + def test_all_children_have_correct_concept(self): + for child in self.restored.root_nodes[0].children: + self.assertEqual(CHILD_CONCEPT, child.concept.name) + + def test_parent_links_point_to_root(self): + root = self.restored.root_nodes[0] + for child in root.children: + self.assertIs(root, child.parent) + + def test_mark_twain_uuid_preserved(self): + twain = next( + ( + c + for c in self.restored.root_nodes[0].children + if c.get_property("name") == "Mark Twain" + ), + None, + ) + self.assertIsNotNone(twain, "Mark Twain node not found after restore") + self.assertEqual(MARK_TWAIN_UUID, twain.uuid) + + def test_role_in_parent_preserved_for_all_children(self): + orig_roles = [c.role_in_parent for c in self.original.root_nodes[0].children] + rest_roles = [c.role_in_parent for c in self.restored.root_nodes[0].children] + self.assertEqual(orig_roles, rest_roles) + + def test_total_node_count_preserved(self): + self.assertEqual( + len(self.original.get_nodes()), + len(self.restored.get_nodes()), + ) + + def test_get_node_by_uuid_finds_root(self): + found = self.restored.get_node_by_uuid(ROOT_UUID) + self.assertIs(self.restored.root_nodes[0], found) + + def test_get_node_by_uuid_finds_child(self): + child = self.restored.root_nodes[0].children[0] + found = self.restored.get_node_by_uuid(child.uuid) + self.assertIs(child, found) + + def test_get_node_by_uuid_raises_for_unknown(self): + with self.assertRaises(KeyError): + self.restored.get_node_by_uuid("completely_unknown_uuid") + + def test_repeated_root_access_returns_same_object(self): + # _node_cache must return the same SNode instance on every call + root_a = self.restored.root_nodes[0] + root_b = self.restored.get_node_by_uuid(ROOT_UUID) + self.assertIs(root_a, root_b) + + def test_empty_model_round_trips_without_error(self): + # verifies the n==0 branch in SModel._finalize() round-trips correctly.. created this inline because + # setUp builds a non-empty model from the .mpb fixture + empty = SModel("empty.model", "r:00000000-0000-0000-0000-000000000000", False) + empty._finalize() + restored = _save_and_restore(empty) + self.assertEqual(empty.name, restored.name) + self.assertEqual(empty.uuid, restored.uuid) + self.assertEqual(0, len(restored.root_nodes)) + self.assertEqual(0, len(restored.get_nodes())) + + def test_save_all_writes_npz_and_meta_files(self): + # _save_all should produce one .npz and one _meta.pkl per JAR entry + to_write = { + "fake/jar/path.jar": ( + 1234567890.0, + 99999, + {"test/model.mpb": self.original}, + ) + } + with tempfile.TemporaryDirectory() as tmp: + cache = _FileCache(Path(tmp), workers=1) + cache._save_all(to_write) + files = list(Path(tmp).iterdir()) + npz_files = [f for f in files if f.suffix == ".npz"] + pkl_files = [f for f in files if f.name.endswith("_meta.pkl")] + self.assertEqual(len(npz_files), 1) + self.assertEqual(len(pkl_files), 1) + + def test_save_all_leaves_no_temp_files(self): + # atomic write should leave no _tmp files behind + to_write = { + "fake/jar/path.jar": ( + 1234567890.0, + 99999, + {"test/model.mpb": self.original}, + ) + } + with tempfile.TemporaryDirectory() as tmp: + cache = _FileCache(Path(tmp), workers=1) + cache._save_all(to_write) + tmp_files = [f for f in Path(tmp).iterdir() if "_tmp" in f.name] + self.assertEqual(tmp_files, []) + + def test_save_all_bad_entry_does_not_stop_others(self): + # an exception on one jar entry should not prevent other entries from saving + good_jar = "fake/good.jar" + # None key will cause an error when computing md5 + bad_jar = None + to_write = { + good_jar: (1234567890.0, 99999, {"model.mpb": self.original}), + bad_jar: (0.0, 0, {"model.mpb": self.original}), + } + import warnings as _warnings + + with tempfile.TemporaryDirectory() as tmp: + cache = _FileCache(Path(tmp), workers=1) + with _warnings.catch_warnings(record=True): + cache._save_all(to_write) + npz_files = list(Path(tmp).glob("*.npz")) + self.assertEqual(len(npz_files), 1) + + def test_save_all_empty_dict_completes_without_error(self): + with tempfile.TemporaryDirectory() as tmp: + cache = _FileCache(Path(tmp), workers=1) + cache._save_all({}) + self.assertEqual(list(Path(tmp).iterdir()), [])