Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@ target/
bin/
.DS_Store
*.msix
.vs/

# Generated files for tree-sitter
grammars/**/bindings/
grammars/**/src/
grammars/**/parser.*
tree-sitter-ssh-server-config/
tree-sitter-dscexpression/
/adapters/__pycache__
/adapters/python/__pycache__
/adapters/python/pyDscAdapter/__pycache__
/adapters/python/tests/__pycache__
/adapters/python/tests/src/__pycache__
Empty file added adapters/__init__.py
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ Describe 'PowerShell adapter resource tests' {
$adapterPath = Join-Path $PSScriptRoot 'TestAdapter'
$env:PATH += [System.IO.Path]::PathSeparator + $adapterPath

$r = '{"TestCaseId": 1}' | dsc resource test -r 'Test/TestCase' -f -
$r = '{"TestCaseId": 1}' | dsc resource test -r 'Test/TestCase' -f - 2> $TestDrive/tracing.txt
$LASTEXITCODE | Should -Be 0
Comment thread
shammu1 marked this conversation as resolved.
Outdated
$resources = $r | ConvertFrom-Json
$resources.actualState.TestCaseId | Should -Be 1
Expand Down
10 changes: 10 additions & 0 deletions adapters/python/.project.data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"Name": "python-adapter",
"Kind": "Adapter",
"CopyFiles": {
"All": [
"pyDscAdapter",
Comment thread
shammu1 marked this conversation as resolved.
Outdated
"pythonadapter.dsc.resource.json"
Comment on lines +2 to +7
]
}
}
Empty file added adapters/python/__init__.py
Empty file.
Empty file.
5 changes: 5 additions & 0 deletions adapters/python/pyDscAdapter/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import sys
from cli import main

if __name__ == "__main__":
sys.exit(main())
286 changes: 286 additions & 0 deletions adapters/python/pyDscAdapter/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
import argparse
import sys
import json
import cProfile
import pstats
import time
import io
import os
from contextlib import contextmanager
from datetime import datetime, timezone
import inspect
Comment on lines +10 to +11
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from dsc_logging import setup_dsc_logging, operation_context
from utils import parse_json
from discovery import get_class_map_from_pyproject, import_class_from_file
Comment on lines +14 to +16

#----------------------------------------------------------------------------
# ResourceAdapter - main adapter class with registry, profiling, and logging
#----------------------------------------------------------------------------
class ResourceAdapter:
"""
Routes adapter operations to Python resource classes discovered from a
pyproject.toml file near the provided resource path.

The adapter also provides:
- profile_block for optional timing and cProfile instrumentation
- log(level, message, target, **kwargs) for structured adapter logging
- direct resource-type to class resolution from pyproject manifest data
"""

def __init__(self) -> None:
# Normalize DSC trace level to standard Python logging levels
# Supported inputs: trace, debug, info, warning, error, critical
dsc_level = (os.getenv("DSC_TRACE_LEVEL", "info") or "info").strip().lower()

self.logger = setup_dsc_logging(dsc_level)

# Enable Profiling based on DSC trace level
self.ENABLE_PROFILING = dsc_level in ("trace", "debug")

self.logger.debug(f"Trace level: '{dsc_level}', profiling: {self.ENABLE_PROFILING}")
self.logger.info("Adapter initialization complete")


def _resolve_pyproject_path(self, resource_path: str = "") -> Optional[Path]:
"""Resolve the nearest pyproject.toml containing [tool.dsc.resources] starting from resource_path."""
self.logger.debug(f"Resolving pyproject.toml for resource_path='{resource_path}'")
candidates: List[Path] = []

if resource_path:
resolved_resource_path = Path(resource_path).resolve()
candidates.append(resolved_resource_path.parent / "pyproject.toml")
candidates.extend(parent / "pyproject.toml" for parent in resolved_resource_path.parents)

seen_candidates = set()
for candidate in candidates:
candidate_key = str(candidate).casefold()
if candidate_key in seen_candidates:
continue
seen_candidates.add(candidate_key)

self.logger.debug(f"Checking pyproject candidate: '{candidate}'")
if candidate.exists() and get_class_map_from_pyproject(candidate):
self.logger.debug(f"Found pyproject.toml with DSC resources at '{candidate}'")
return candidate.resolve()

self.logger.warning(f"No pyproject.toml with [tool.dsc.resources] found for resource_path='{resource_path}'")
return None

@contextmanager
def profile_block(self, label):
"""Context manager for optional profiling of code blocks."""
if self.ENABLE_PROFILING:
start_time = time.perf_counter()
profiler = None
try:
profiler = cProfile.Profile()
profiler.enable()
except Exception:
# Another profiler may already be active; fall back to timing only
profiler = None
try:
yield
finally:
end_time = time.perf_counter()
if profiler:
try:
profiler.disable()
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
ps.print_stats(10)
self.logger.info(f"[PROFILE] {label} took {end_time - start_time:.4f}s")
self.logger.debug(f"[PROFILE DETAILS] {label}:\n{s.getvalue()}")
except Exception:
# If profiling teardown fails, still log duration
self.logger.info(f"[PROFILE] {label} took {end_time - start_time:.4f}s")
else:
self.logger.info(f"[PROFILE] {label} took {end_time - start_time:.4f}s")
else:
yield

def log(self, level: str, message: str, target: str = None, **kwargs) -> None:
"""Structured logging method for adapter code."""
lvl = level.lower()
method = kwargs.get("method", "?")
core_msg = f"{target} - {method} - {message}" if target else f"{method} - {message}"

if lvl == "trace": # and hasattr(self.logger, "trace"):
self.logger.debug(f"[TRACE] {core_msg}")
return

log_fn = getattr(self.logger, lvl, self.logger.info)
log_fn(core_msg)


def _load_manifest(self, resource_path: str = "") -> Dict[str, str]:
"""
Resolve the nearest pyproject.toml for the supplied resource path and
return the [tool.dsc.resources] class mapping.
"""
if not resource_path:
self.logger.debug("_load_manifest called with empty resource_path; returning empty class map")
return {}

self.logger.debug(f"Loading manifest class map for resource_path='{resource_path}'")

pyproject_path = self._resolve_pyproject_path(resource_path)
if not pyproject_path:
self.logger.warning(f"No pyproject.toml found for '{resource_path}'; class map will be empty")
class_map = get_class_map_from_pyproject(pyproject_path) if pyproject_path else {}
self.logger.debug(f"Class map loaded: {class_map}")
return class_map


def _resolve_resource_class(self, resource_type: str, resource_path: str = "") -> type:
"""Resolve the resource class for a given resource type and path using the manifest mapping."""
self.logger.debug(f"Resolving class for resource_type='{resource_type}', resource_path='{resource_path}'")
if not resource_type.strip():
raise ValueError("resource-type must be provided")

class_map = self._load_manifest(resource_path)
class_name = class_map.get(resource_type)
if not class_name:
lowered = {k.lower(): v for k, v in class_map.items()}
class_name = lowered.get(resource_type.lower())
if class_name:
self.logger.debug(f"Exact lookup missed; using case-insensitive match for '{resource_type}'")

if not class_name:
supported = sorted(set(class_map.keys()))
self.logger.error(f"No class mapping found for '{resource_type}'. Supported: {supported}")
raise ValueError(f"Unsupported resource-type '{resource_type}'. Supported: {supported}")

self.logger.debug(f"Class '{class_name}' found for '{resource_type}'; importing class")
return import_class_from_file(resource_path, resource_type, class_name)


def _instantiate_resource(self, cls: type, json_input: str, operation: Optional[str]) -> Any:
"""Instantiate a resource class from JSON input."""
# Resource classes may expect operation-aware validation
if hasattr(cls, "from_json"):
return cls.from_json(json_input, operation=operation)
# Fallback: direct init from dict if needed
data = json.loads(json_input or "{}")
return cls(**data)

# -----------------
# Operation routing
# -----------------

def run_operation(self, operation: str, json_input: str, resource_type: str, resource_path: str = "") -> Tuple[int, Dict[str, Any]]:
"""
Execute a single adapter operation for one resource instance.

Returns a tuple of (exit_code, result_dict). Most operations return a
JSON-serializable result dictionary for the caller to print. The set
and test operations are exceptions: they write their state and diff
payloads directly to stdout and return a marker dictionary indicating
that stdout has already been emitted.
"""
op = (operation or "").strip().lower()
self.logger.info(f"Operation '{op}' requested for resource_type='{resource_type}'")

with operation_context(op, resource_type):
if op == "list":
self.logger.debug("List operation: returning empty resource list")
return 0, {"resources": []}
Comment on lines +187 to +188
if op == "validate":
self.logger.debug(f"Validate operation: returning valid=True for '{resource_type}'")
return 0, {"valid": True}
Comment on lines +189 to +191

# Resolve resource class
try:
self.logger.debug(f"Resolving resource_type='{resource_type}', resource_path='{resource_path}'")
resolved_type = (resource_type or "").strip() or os.getenv("DSC_RESOURCE_TYPE", "").strip()
if resolved_type != resource_type:
self.logger.debug(f"resource_type resolved from env to '{resolved_type}'")
cls = self._resolve_resource_class(resolved_type, resource_path)
Comment on lines +196 to +199
self.logger.debug(f"Resolved class '{cls.__name__}' for '{resolved_type}'")
Comment on lines +196 to +200
except Exception as e:
self.log("error", str(e), "Adapter", operation=op)
return 2, {"error": str(e)}

try:
if op == "get":
self.logger.info(f"Executing GET on '{resolved_type}'")
with self.profile_block("DSC Get Operation"):
instance = self._instantiate_resource(cls, json_input, operation="get")
data = instance.get()
self.logger.debug(f"GET returned: {data}")

try:
resource_name = json.loads(json_input or "{}").get("name", "") or resource_type
except Exception:
resource_name = resource_type or ""

full = {
"metadata": {"Microsoft.DSC": {"operation": "Get"}},
"name": resource_name,
"type": "Microsoft.DSC.Adapters/Python",
"result": [
{
"name": resource_name,
"type": resource_type,
"result": {
"actualState": data
}
}
]
}
Comment on lines +214 to +231
return (0, full)

Comment on lines +212 to +233
elif op == "set":
self.logger.info(f"Executing SET on '{resolved_type}'")
with self.profile_block("DSC Set Operation"):
instance = self._instantiate_resource(cls, json_input, operation="set")
state, diffs = instance.set()
self.logger.debug(f"SET completed. diffs={diffs}")

sys.stdout.write(json.dumps(state, ensure_ascii=False) + "\n")
sys.stdout.write(json.dumps(diffs, ensure_ascii=False) + "\n")

# Signal to caller that we've already printed the required stdout
return (0, {"_stdout_emitted": True})

elif op == "test":
self.logger.info(f"Executing TEST on '{resolved_type}'")
with self.profile_block("DSC Test Operation"):
instance = self._instantiate_resource(cls, json_input, operation="test")
actual_state, diffs = instance.test()
self.logger.debug(f"TEST completed. in_desired_state={len(diffs) == 0}, diffs={diffs}")

sys.stdout.write(json.dumps(actual_state if isinstance(actual_state, dict) else {}, ensure_ascii=False) + "\n")
sys.stdout.write(json.dumps(diffs if isinstance(diffs, list) else [], ensure_ascii=False) + "\n")

# Signal stdout already emitted so main() doesn't print a wrapper
return (0, {"_stdout_emitted": True})

elif op == "export":
self.logger.info(f"Executing EXPORT on '{resolved_type}'")
# If your resource supports filtered export with provided input, pass instance; else pass None for full export
with self.profile_block("DSC Export Operation"):
# Determine if filters are provided; otherwise export all (None)
as_obj = parse_json(json_input)
has_filters = any(k in as_obj for k in ("name", "version", "source", "dependencies"))
self.logger.debug(f"Export has_filters={has_filters}")
instance = self._instantiate_resource(cls, json_input, operation="export") if has_filters else None
data = cls.export(instance)
Comment on lines +264 to +269
self.logger.debug("Export completed")
# If export returns None (prints only), still return an empty dict for adapter contract
return (0, data if isinstance(data, dict) else {})
Comment on lines +260 to +272

else:
msg = f"Unsupported operation '{operation}'. Expected one of: list, get, set, test, export, validate"
self.log("error", msg, "Adapter")
return 2, {"error": msg}

except SystemExit as se:
# Resource may call sys.exit(1) on error paths (e.g., export). Normalize.
code = int(getattr(se, "code", 1) or 1)
self.logger.error(f"Operation '{op}' on '{resolved_type}' terminated with sys.exit({code})")
return code, {"error": f"Resource terminated with exit {code}"}
Comment thread
shammu1 marked this conversation as resolved.
Outdated
except Exception as err:
self.logger.error(f"Operation '{op}' on '{resolved_type}' failed: {err}", exc_info=True)
return 1, {"error": str(err)}
64 changes: 64 additions & 0 deletions adapters/python/pyDscAdapter/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import sys
import json
import argparse
from typing import Optional
from adapter import ResourceAdapter
from dsc_logging import setup_dsc_logging
Comment on lines +5 to +6
Comment on lines +5 to +6

# --------------------
# CLI / entrypoint API
# --------------------
def _build_parser() -> argparse.ArgumentParser:
"""Construct the argument parser for the DSC adapter CLI."""
parser = argparse.ArgumentParser(
prog="dsctest",
description="DSC v3 Python adapter CLI compatible with manifest."
)
sub = parser.add_subparsers(dest="command", required=True)

adapter = sub.add_parser("adapter", help="Adapter operations")
adapter.add_argument("--operation", required=True, choices=["list", "get", "set", "test", "export", "validate"],
help="Adapter operation to execute.")
adapter.add_argument("--input", default="{}", help="JSON string with resource configuration (single input).")
adapter.add_argument("--resource", dest="ResourceType", default="", help="Resource type selector (e.g., Microsoft.Linux.Apt/Package).")
adapter.add_argument("--resource-path", dest="ResourcePath", default="", help="Optional resource module file path.")
return parser


def main(argv: Optional[list] = None) -> int:
"""Main entry point for the DSC adapter CLI."""
parser = _build_parser()
args = parser.parse_args(argv)

if args.command != "adapter":
print(json.dumps({"error": "Unsupported command"}))
return 2

adapter = ResourceAdapter()


# 1. Start with --input as the authoritative source
input_str = args.input

# 2. If stdin has data, it overrides --input (DSC convention)
stdin_data = sys.stdin.read().strip() if not sys.stdin.isatty() else ""
if stdin_data:
input_str = stdin_data

# 3. Call operation handler
exit_code, result = adapter.run_operation(
args.operation,
input_str,
args.ResourceType,
getattr(args, "ResourcePath", "")
)

# If set branch (or similar) already wrote to stdout, skip emitting a wrapper
if isinstance(result, dict) and result.get("_stdout_emitted"):
return exit_code

# 4. Capture EXACT output passed to DSC
out_json = json.dumps(result, ensure_ascii=False)

print(out_json)
return exit_code
Loading
Loading