|
| 1 | +"""Tests for symlink containment enforcement across APM subsystems. |
| 2 | +
|
| 3 | +Validates that symlinked primitive files are rejected at discovery and |
| 4 | +resolution time, preventing arbitrary local file reads. |
| 5 | +""" |
| 6 | + |
| 7 | +import json |
| 8 | +import os |
| 9 | +import tempfile |
| 10 | +import shutil |
| 11 | +import unittest |
| 12 | +from pathlib import Path |
| 13 | + |
| 14 | + |
| 15 | +def _try_symlink(link: Path, target: Path): |
| 16 | + """Create a symlink or skip the test on platforms that don't support it.""" |
| 17 | + try: |
| 18 | + link.symlink_to(target) |
| 19 | + except OSError: |
| 20 | + raise unittest.SkipTest("Symlinks not supported on this platform") |
| 21 | + |
| 22 | + |
| 23 | +class TestPromptCompilerSymlinkContainment(unittest.TestCase): |
| 24 | + """PromptCompiler._resolve_prompt_file rejects external symlinks.""" |
| 25 | + |
| 26 | + def setUp(self): |
| 27 | + self.tmpdir = tempfile.mkdtemp() |
| 28 | + self.project = Path(self.tmpdir) / "project" |
| 29 | + self.project.mkdir() |
| 30 | + self.outside = Path(self.tmpdir) / "outside" |
| 31 | + self.outside.mkdir() |
| 32 | + # Create a file outside the project |
| 33 | + self.secret = self.outside / "secret.txt" |
| 34 | + self.secret.write_text("sensitive-data", encoding="utf-8") |
| 35 | + # Create apm.yml so the project is valid |
| 36 | + (self.project / "apm.yml").write_text( |
| 37 | + "name: test\nversion: 1.0.0\n", encoding="utf-8" |
| 38 | + ) |
| 39 | + |
| 40 | + def tearDown(self): |
| 41 | + shutil.rmtree(self.tmpdir, ignore_errors=True) |
| 42 | + |
| 43 | + def test_symlinked_prompt_outside_project_rejected(self): |
| 44 | + """Symlinked .prompt.md is rejected with clear error message.""" |
| 45 | + from apm_cli.core.script_runner import PromptCompiler |
| 46 | + |
| 47 | + prompts_dir = self.project / ".apm" / "prompts" |
| 48 | + prompts_dir.mkdir(parents=True) |
| 49 | + symlink = prompts_dir / "evil.prompt.md" |
| 50 | + _try_symlink(symlink, self.secret) |
| 51 | + |
| 52 | + compiler = PromptCompiler() |
| 53 | + old_cwd = os.getcwd() |
| 54 | + try: |
| 55 | + os.chdir(self.project) |
| 56 | + with self.assertRaises(FileNotFoundError) as ctx: |
| 57 | + compiler._resolve_prompt_file(".apm/prompts/evil.prompt.md") |
| 58 | + self.assertIn("symlink", str(ctx.exception).lower()) |
| 59 | + finally: |
| 60 | + os.chdir(old_cwd) |
| 61 | + |
| 62 | + def test_normal_prompt_within_project_allowed(self): |
| 63 | + """Non-symlinked prompt files within the project are allowed.""" |
| 64 | + from apm_cli.core.script_runner import PromptCompiler |
| 65 | + |
| 66 | + prompts_dir = self.project / ".apm" / "prompts" |
| 67 | + prompts_dir.mkdir(parents=True) |
| 68 | + prompt = prompts_dir / "safe.prompt.md" |
| 69 | + prompt.write_text("# Safe prompt", encoding="utf-8") |
| 70 | + |
| 71 | + compiler = PromptCompiler() |
| 72 | + old_cwd = os.getcwd() |
| 73 | + try: |
| 74 | + os.chdir(self.project) |
| 75 | + result = compiler._resolve_prompt_file(".apm/prompts/safe.prompt.md") |
| 76 | + self.assertTrue(result.exists()) |
| 77 | + finally: |
| 78 | + os.chdir(old_cwd) |
| 79 | + |
| 80 | + |
| 81 | +class TestPrimitiveDiscoverySymlinkContainment(unittest.TestCase): |
| 82 | + """find_primitive_files rejects symlinks outside base directory.""" |
| 83 | + |
| 84 | + def setUp(self): |
| 85 | + self.tmpdir = tempfile.mkdtemp() |
| 86 | + self.project = Path(self.tmpdir) / "project" |
| 87 | + self.project.mkdir() |
| 88 | + self.outside = Path(self.tmpdir) / "outside" |
| 89 | + self.outside.mkdir() |
| 90 | + self.secret = self.outside / "leak.instructions.md" |
| 91 | + self.secret.write_text("---\napplyTo: '**'\n---\nLeaked!", encoding="utf-8") |
| 92 | + |
| 93 | + def tearDown(self): |
| 94 | + shutil.rmtree(self.tmpdir, ignore_errors=True) |
| 95 | + |
| 96 | + def test_symlinked_instruction_outside_base_rejected(self): |
| 97 | + """Symlinked .instructions.md outside base_dir is filtered out.""" |
| 98 | + from apm_cli.primitives.discovery import find_primitive_files |
| 99 | + |
| 100 | + instructions_dir = self.project / ".github" / "instructions" |
| 101 | + instructions_dir.mkdir(parents=True) |
| 102 | + symlink = instructions_dir / "evil.instructions.md" |
| 103 | + _try_symlink(symlink, self.secret) |
| 104 | + |
| 105 | + # Also add a normal file |
| 106 | + normal = instructions_dir / "safe.instructions.md" |
| 107 | + normal.write_text("---\napplyTo: '**'\n---\nSafe", encoding="utf-8") |
| 108 | + |
| 109 | + results = find_primitive_files( |
| 110 | + str(self.project), |
| 111 | + [".github/instructions/*.instructions.md"], |
| 112 | + ) |
| 113 | + names = [f.name for f in results] |
| 114 | + self.assertIn("safe.instructions.md", names) |
| 115 | + self.assertNotIn("evil.instructions.md", names) |
| 116 | + |
| 117 | + |
| 118 | +class TestBaseIntegratorSymlinkContainment(unittest.TestCase): |
| 119 | + """BaseIntegrator.find_files_by_glob rejects external symlinks.""" |
| 120 | + |
| 121 | + def setUp(self): |
| 122 | + self.tmpdir = tempfile.mkdtemp() |
| 123 | + self.pkg = Path(self.tmpdir) / "pkg" |
| 124 | + self.pkg.mkdir() |
| 125 | + self.outside = Path(self.tmpdir) / "outside" |
| 126 | + self.outside.mkdir() |
| 127 | + self.secret = self.outside / "leak.agent.md" |
| 128 | + self.secret.write_text("# Leaked agent", encoding="utf-8") |
| 129 | + |
| 130 | + def tearDown(self): |
| 131 | + shutil.rmtree(self.tmpdir, ignore_errors=True) |
| 132 | + |
| 133 | + def test_symlinked_agent_outside_package_rejected(self): |
| 134 | + """Symlinked .agent.md outside package dir is filtered out.""" |
| 135 | + from apm_cli.integration.base_integrator import BaseIntegrator |
| 136 | + |
| 137 | + agents_dir = self.pkg / ".apm" / "agents" |
| 138 | + agents_dir.mkdir(parents=True) |
| 139 | + symlink = agents_dir / "evil.agent.md" |
| 140 | + _try_symlink(symlink, self.secret) |
| 141 | + |
| 142 | + normal = agents_dir / "safe.agent.md" |
| 143 | + normal.write_text("# Safe agent", encoding="utf-8") |
| 144 | + |
| 145 | + results = BaseIntegrator.find_files_by_glob( |
| 146 | + self.pkg, "*.agent.md", subdirs=[".apm/agents"], |
| 147 | + ) |
| 148 | + names = [f.name for f in results] |
| 149 | + self.assertIn("safe.agent.md", names) |
| 150 | + self.assertNotIn("evil.agent.md", names) |
| 151 | + |
| 152 | + |
| 153 | +class TestHookIntegratorSymlinkContainment(unittest.TestCase): |
| 154 | + """HookIntegrator.find_hook_files rejects external symlinks.""" |
| 155 | + |
| 156 | + def setUp(self): |
| 157 | + self.tmpdir = tempfile.mkdtemp() |
| 158 | + self.pkg = Path(self.tmpdir) / "pkg" |
| 159 | + self.pkg.mkdir() |
| 160 | + self.outside = Path(self.tmpdir) / "outside" |
| 161 | + self.outside.mkdir() |
| 162 | + self.secret = self.outside / "evil.json" |
| 163 | + self.secret.write_text(json.dumps({"hooks": {}}), encoding="utf-8") |
| 164 | + |
| 165 | + def tearDown(self): |
| 166 | + shutil.rmtree(self.tmpdir, ignore_errors=True) |
| 167 | + |
| 168 | + def test_symlinked_hook_json_outside_package_rejected(self): |
| 169 | + """Symlinked hook JSON outside package dir is filtered out.""" |
| 170 | + from apm_cli.integration.hook_integrator import HookIntegrator |
| 171 | + |
| 172 | + hooks_dir = self.pkg / ".apm" / "hooks" |
| 173 | + hooks_dir.mkdir(parents=True) |
| 174 | + symlink = hooks_dir / "evil.json" |
| 175 | + _try_symlink(symlink, self.secret) |
| 176 | + |
| 177 | + normal = hooks_dir / "safe.json" |
| 178 | + normal.write_text(json.dumps({"hooks": {}}), encoding="utf-8") |
| 179 | + |
| 180 | + integrator = HookIntegrator() |
| 181 | + results = integrator.find_hook_files(self.pkg) |
| 182 | + names = [f.name for f in results] |
| 183 | + self.assertIn("safe.json", names) |
| 184 | + self.assertNotIn("evil.json", names) |
| 185 | + |
| 186 | + |
| 187 | +if __name__ == "__main__": |
| 188 | + unittest.main() |
0 commit comments