-
Notifications
You must be signed in to change notification settings - Fork 114
Expand file tree
/
Copy pathlockfile.py
More file actions
454 lines (397 loc) · 17.4 KB
/
lockfile.py
File metadata and controls
454 lines (397 loc) · 17.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
"""Lock file support for APM dependency resolution.
Provides deterministic, reproducible installs by capturing exact resolved versions.
"""
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import yaml
from ..models.apm_package import DependencyReference
logger = logging.getLogger(__name__)
@dataclass
class LockedDependency:
"""A resolved dependency with exact commit/version information."""
repo_url: str
host: Optional[str] = None
registry_prefix: Optional[str] = None # Registry path prefix, e.g. "artifactory/github"
resolved_commit: Optional[str] = None
resolved_ref: Optional[str] = None
version: Optional[str] = None
virtual_path: Optional[str] = None
is_virtual: bool = False
depth: int = 1
resolved_by: Optional[str] = None
package_type: Optional[str] = None
deployed_files: List[str] = field(default_factory=list)
source: Optional[str] = None # "local" for local deps, None/absent for remote
local_path: Optional[str] = None # Original local path (relative to project root)
content_hash: Optional[str] = None # SHA-256 of package file tree
is_dev: bool = False # True for devDependencies
discovered_via: Optional[str] = None # Marketplace name (provenance)
marketplace_plugin_name: Optional[str] = None # Plugin name in marketplace
source_url: Optional[str] = None # URL the index was fetched from
source_digest: Optional[str] = None # sha256 digest of the fetched index
def get_unique_key(self) -> str:
"""Returns unique key for this dependency."""
if self.source == "local" and self.local_path:
return self.local_path
if self.is_virtual and self.virtual_path:
return f"{self.repo_url}/{self.virtual_path}"
return self.repo_url
def to_dict(self) -> Dict[str, Any]:
"""Serialize to dict for YAML output."""
result: Dict[str, Any] = {"repo_url": self.repo_url}
if self.host:
result["host"] = self.host
if self.registry_prefix:
result["registry_prefix"] = self.registry_prefix
if self.resolved_commit:
result["resolved_commit"] = self.resolved_commit
if self.resolved_ref:
result["resolved_ref"] = self.resolved_ref
if self.version:
result["version"] = self.version
if self.virtual_path:
result["virtual_path"] = self.virtual_path
if self.is_virtual:
result["is_virtual"] = self.is_virtual
if self.depth != 1:
result["depth"] = self.depth
if self.resolved_by:
result["resolved_by"] = self.resolved_by
if self.package_type:
result["package_type"] = self.package_type
if self.deployed_files:
result["deployed_files"] = sorted(self.deployed_files)
if self.source:
result["source"] = self.source
if self.local_path:
result["local_path"] = self.local_path
if self.content_hash:
result["content_hash"] = self.content_hash
if self.is_dev:
result["is_dev"] = True
if self.discovered_via:
result["discovered_via"] = self.discovered_via
if self.marketplace_plugin_name:
result["marketplace_plugin_name"] = self.marketplace_plugin_name
if self.source_url is not None:
result["source_url"] = self.source_url
if self.source_digest is not None:
result["source_digest"] = self.source_digest
return result
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "LockedDependency":
"""Deserialize from dict.
Handles backwards compatibility:
- Old ``deployed_skills`` lists are migrated to ``deployed_files``
paths under ``.github/skills/`` and ``.claude/skills/``.
"""
deployed_files = list(data.get("deployed_files", []))
# Migrate legacy deployed_skills -> deployed_files
old_skills = data.get("deployed_skills", [])
if old_skills and not deployed_files:
for skill_name in old_skills:
deployed_files.append(f".github/skills/{skill_name}/")
deployed_files.append(f".claude/skills/{skill_name}/")
return cls(
repo_url=data["repo_url"],
host=data.get("host"),
registry_prefix=data.get("registry_prefix"),
resolved_commit=data.get("resolved_commit"),
resolved_ref=data.get("resolved_ref"),
version=data.get("version"),
virtual_path=data.get("virtual_path"),
is_virtual=data.get("is_virtual", False),
depth=data.get("depth", 1),
resolved_by=data.get("resolved_by"),
package_type=data.get("package_type"),
deployed_files=deployed_files,
source=data.get("source"),
local_path=data.get("local_path"),
content_hash=data.get("content_hash"),
is_dev=data.get("is_dev", False),
discovered_via=data.get("discovered_via"),
marketplace_plugin_name=data.get("marketplace_plugin_name"),
source_url=data.get("source_url"),
source_digest=data.get("source_digest"),
)
@classmethod
def from_dependency_ref(
cls,
dep_ref: DependencyReference,
resolved_commit: Optional[str],
depth: int,
resolved_by: Optional[str],
is_dev: bool = False,
registry_config=None,
) -> "LockedDependency":
"""Create from a DependencyReference with resolution info.
Args:
dep_ref: The resolved dependency reference.
resolved_commit: Exact commit SHA that was installed, or ``None``.
depth: Dependency tree depth.
resolved_by: Parent repo URL, or ``None`` for direct dependencies.
is_dev: Whether this is a dev-only dependency.
registry_config: Optional :class:`~apm_cli.deps.registry_proxy.RegistryConfig`
used for this download. When provided, ``host`` is set to the
pure FQDN (e.g. ``"art.example.com"``) and ``registry_prefix``
is set to the URL path prefix (e.g. ``"artifactory/github"``),
ensuring correct auth routing on subsequent installs.
"""
if registry_config is not None:
host = registry_config.host
registry_prefix = registry_config.prefix
else:
host = dep_ref.host
registry_prefix = None
return cls(
repo_url=dep_ref.repo_url,
host=host,
registry_prefix=registry_prefix,
resolved_commit=resolved_commit,
resolved_ref=dep_ref.reference,
virtual_path=dep_ref.virtual_path,
is_virtual=dep_ref.is_virtual,
depth=depth,
resolved_by=resolved_by,
source="local" if dep_ref.is_local else None,
local_path=dep_ref.local_path if dep_ref.is_local else None,
is_dev=is_dev,
)
@dataclass
class LockFile:
"""APM lock file for reproducible dependency resolution."""
lockfile_version: str = "1"
generated_at: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
apm_version: Optional[str] = None
dependencies: Dict[str, LockedDependency] = field(default_factory=dict)
mcp_servers: List[str] = field(default_factory=list)
mcp_configs: Dict[str, dict] = field(default_factory=dict)
local_deployed_files: List[str] = field(default_factory=list)
def add_dependency(self, dep: LockedDependency) -> None:
"""Add a dependency to the lock file."""
self.dependencies[dep.get_unique_key()] = dep
def get_dependency(self, key: str) -> Optional[LockedDependency]:
"""Get a dependency by its unique key."""
return self.dependencies.get(key)
def has_dependency(self, key: str) -> bool:
"""Check if a dependency exists."""
return key in self.dependencies
def get_all_dependencies(self) -> List[LockedDependency]:
"""Get all dependencies sorted by depth then repo_url."""
return sorted(
self.dependencies.values(), key=lambda d: (d.depth, d.repo_url)
)
def to_yaml(self) -> str:
"""Serialize to YAML string."""
data: Dict[str, Any] = {
"lockfile_version": self.lockfile_version,
"generated_at": self.generated_at,
}
if self.apm_version:
data["apm_version"] = self.apm_version
data["dependencies"] = [dep.to_dict() for dep in self.get_all_dependencies()]
if self.mcp_servers:
data["mcp_servers"] = sorted(self.mcp_servers)
if self.mcp_configs:
data["mcp_configs"] = dict(sorted(self.mcp_configs.items()))
if self.local_deployed_files:
data["local_deployed_files"] = sorted(self.local_deployed_files)
from ..utils.yaml_io import yaml_to_str
return yaml_to_str(data)
@classmethod
def from_yaml(cls, yaml_str: str) -> "LockFile":
"""Deserialize from YAML string."""
data = yaml.safe_load(yaml_str)
if not data:
return cls()
if not isinstance(data, dict):
return cls()
lock = cls(
lockfile_version=data.get("lockfile_version", "1"),
generated_at=data.get("generated_at", ""),
apm_version=data.get("apm_version"),
)
for dep_data in data.get("dependencies", []):
lock.add_dependency(LockedDependency.from_dict(dep_data))
lock.mcp_servers = list(data.get("mcp_servers", []))
lock.mcp_configs = dict(data.get("mcp_configs") or {})
lock.local_deployed_files = list(data.get("local_deployed_files", []))
return lock
def write(self, path: Path) -> None:
"""Write lock file to disk."""
path.write_text(self.to_yaml(), encoding="utf-8")
@classmethod
def read(cls, path: Path) -> Optional["LockFile"]:
"""Read lock file from disk. Returns None if not exists or corrupt."""
if not path.exists():
return None
try:
return cls.from_yaml(path.read_text(encoding="utf-8"))
except (yaml.YAMLError, ValueError, KeyError):
return None
@classmethod
def load_or_create(cls, path: Path) -> "LockFile":
"""Load existing lock file or create a new one."""
return cls.read(path) or cls()
@classmethod
def from_installed_packages(
cls,
installed_packages,
dependency_graph,
) -> "LockFile":
"""Create a lock file from installed packages.
Args:
installed_packages: List of
:class:`~apm_cli.deps.installed_package.InstalledPackage`
objects **or** legacy tuples of the form
``(dep_ref, resolved_commit, depth, resolved_by[, is_dev])``.
The 5th tuple element is optional for backward compatibility.
dependency_graph: The resolved DependencyGraph for additional metadata.
"""
from .installed_package import InstalledPackage
# Get APM version
try:
from importlib.metadata import version
apm_version = version("apm-cli")
except Exception:
apm_version = "unknown"
lock = cls(apm_version=apm_version)
for entry in installed_packages:
if isinstance(entry, InstalledPackage):
dep_ref = entry.dep_ref
resolved_commit = entry.resolved_commit
depth = entry.depth
resolved_by = entry.resolved_by
is_dev = entry.is_dev
registry_config = getattr(entry, "registry_config", None)
elif len(entry) >= 5:
dep_ref, resolved_commit, depth, resolved_by, is_dev = entry[:5]
registry_config = None
else:
dep_ref, resolved_commit, depth, resolved_by = entry[:4]
is_dev = False
registry_config = None
locked_dep = LockedDependency.from_dependency_ref(
dep_ref=dep_ref,
resolved_commit=resolved_commit,
depth=depth,
resolved_by=resolved_by,
is_dev=is_dev,
registry_config=registry_config,
)
lock.add_dependency(locked_dep)
return lock
def get_installed_paths(self, apm_modules_dir: Path) -> List[str]:
"""Get relative installed paths for all dependencies in this lockfile.
Computes expected installed paths for all dependencies, including
transitive ones. Used by:
- Primitive discovery to find all dependency primitives
- Orphan detection to avoid false positives for transitive deps
Args:
apm_modules_dir: Path to the apm_modules directory.
Returns:
List[str]: POSIX-style relative installed paths (e.g., ['owner/repo']),
ordered by depth then repo_url (no duplicates).
"""
seen: set = set()
paths: List[str] = []
for dep in self.get_all_dependencies():
dep_ref = DependencyReference(
repo_url=dep.repo_url,
host=dep.host,
virtual_path=dep.virtual_path,
is_virtual=dep.is_virtual,
is_local=(dep.source == "local"),
local_path=dep.local_path,
)
install_path = dep_ref.get_install_path(apm_modules_dir)
try:
rel_path = install_path.relative_to(apm_modules_dir).as_posix()
except ValueError:
rel_path = Path(install_path).as_posix()
if rel_path not in seen:
seen.add(rel_path)
paths.append(rel_path)
return paths
def save(self, path: Path) -> None:
"""Save lock file to disk (alias for write)."""
self.write(path)
def is_semantically_equivalent(self, other: "LockFile") -> bool:
"""Return True if *other* has the same deps, MCP servers, and configs.
Ignores ``generated_at`` and ``apm_version`` so that a no-change
install does not dirty the lockfile.
"""
if self.lockfile_version != other.lockfile_version:
return False
if set(self.dependencies.keys()) != set(other.dependencies.keys()):
return False
for key, dep in self.dependencies.items():
other_dep = other.dependencies[key]
if dep.to_dict() != other_dep.to_dict():
return False
if sorted(self.mcp_servers) != sorted(other.mcp_servers):
return False
if self.mcp_configs != other.mcp_configs:
return False
if sorted(self.local_deployed_files) != sorted(other.local_deployed_files):
return False
return True
@classmethod
def installed_paths_for_project(cls, project_root: Path) -> List[str]:
"""Load apm.lock.yaml from project_root and return installed paths.
Returns an empty list if the lockfile is missing, corrupt, or
unreadable.
Args:
project_root: Path to project root containing apm.lock.yaml.
Returns:
List[str]: Relative installed paths (e.g., ['owner/repo']),
ordered by depth then repo_url (no duplicates).
"""
try:
lockfile_path = get_lockfile_path(project_root)
if not lockfile_path.exists():
# Fallback to legacy lockfile for pre-migration reads
legacy_path = project_root / LEGACY_LOCKFILE_NAME
if legacy_path.exists():
lockfile_path = legacy_path
lockfile = cls.read(lockfile_path)
if not lockfile:
return []
return lockfile.get_installed_paths(project_root / "apm_modules")
except (FileNotFoundError, yaml.YAMLError, ValueError, KeyError):
return []
# Current lockfile filename (with .yaml extension for IDE syntax highlighting)
LOCKFILE_NAME = "apm.lock.yaml"
# Legacy lockfile filename used in older APM versions
LEGACY_LOCKFILE_NAME = "apm.lock"
def get_lockfile_path(project_root: Path) -> Path:
"""Get the path to the lock file for a project."""
return project_root / LOCKFILE_NAME
def migrate_lockfile_if_needed(project_root: Path) -> bool:
"""Migrate legacy apm.lock to apm.lock.yaml if needed.
Renames ``apm.lock`` to ``apm.lock.yaml`` when the new file does not yet
exist. This is a one-time, transparent migration for users upgrading from
older APM versions.
Args:
project_root: Path to the project root directory.
Returns:
True if a migration was performed, False otherwise.
"""
new_path = get_lockfile_path(project_root)
legacy_path = project_root / LEGACY_LOCKFILE_NAME
if not new_path.exists() and legacy_path.exists():
try:
legacy_path.rename(new_path)
except OSError:
logger.debug("Could not rename %s to %s", legacy_path, new_path, exc_info=True)
return False
return True
return False
def get_lockfile_installed_paths(project_root: Path) -> List[str]:
"""Deprecated: use LockFile.installed_paths_for_project() instead."""
return LockFile.installed_paths_for_project(project_root)