diff --git a/kernel-boot/rdma_topo b/kernel-boot/rdma_topo index eba238b42..aa43faf8b 100755 --- a/kernel-boot/rdma_topo +++ b/kernel-boot/rdma_topo @@ -5,16 +5,23 @@ from __future__ import annotations import argparse import collections +import copy import importlib import inspect import itertools +import json import os import re import subprocess import sys import tempfile +from abc import ABC, abstractmethod +from base64 import b64encode, b64decode from typing import * +from zlib import compress, decompress + +DEVDIR = os.environ.get("RDMA_TOPO_DEVDIR", "/sys/bus/pci/devices/") BDF_RE = re.compile(r"^([0-9a-f]+?):([0-9a-f]{2}?):([0-9a-f]{2}?)\.([0-9a-f])$") KERNEL_ACS_ISOLATED = "xx111x1" @@ -23,20 +30,222 @@ pci_vendors = { "NVIDIA": 0x10DE, } +PCI_EXT_CAP_ID_ACS = 0x000D +PCI_EXT_CAP_ID_ATS = 0x000F + +PCI_VPD_LRDT = 0x80 # Large Resource Data Type flag +PCI_VPD_END_SMALL = 0x78 # Small Resource End Tag +PCI_VPD_END_LARGE = 0x79 # Large Resource End Tag +PCI_VPD_LRDT_ID = 0x82 # Identifier String +PCI_VPD_LRDT_RO = 0x90 # VPD-R (Read-Only) + class CommandError(Exception): pass -def sysfs_read_str(sysdir: str, fn: str) -> str: - """Read the entire content of a sysfs file to a string""" - with open(os.path.join(sysdir, fn)) as F: - return F.read().strip() +TOPO_NOT_SUPPORTED = CommandError("No supported topology detected") + + +def yesno(b: bool) -> str: + return "yes" if b else "no" + + +class SysfsDevice(object): + REQUIRED_KEYS = ["realpath", "config", "modalias"] + ENCODED_KEYS = ["config", "vpd"] + + @property + def realpath(self) -> str: + return self.data["realpath"] + + @property + def config(self) -> bytes: + return self.data["config"] + + @property + def iommu_group(self) -> Optional[int]: + return self.data.get("iommu_group", None) + + @property + def modalias(self) -> str: + return self.data["modalias"] + + @property + def numa_node(self) -> Optional[int]: + return self.data.get("numa_node", None) + + @property + def vpd(self) -> Optional[bytes]: + return self.data.get("vpd", None) + + @property + def subsystems(self) -> Optional[Dict[str, List[str]]]: + return self.data.get("subsystems", None) + + @property + def id(self) -> str: + return os.path.basename(self.data["realpath"]) + + def __init__(self, id: str): + def read(*parts: str) -> bytes: + with open(os.path.join(devdir, *parts), "rb") as F: + return F.read() + + def string(b: bytes) -> str: + return b.decode("ascii").strip() + def subsystems() -> Dict[str, List[str]]: + res: Dict[str, List[str]] = collections.defaultdict(list) + for fn in os.listdir(devdir): + if fn in {"drm", "infiniband", "net", "nvme"}: + res[fn].extend(os.listdir(os.path.join(devdir, fn))) + return dict(res) -def sysfs_read_link(sysdir: str, fn: str) -> str: - """Read a link in sysfs to an absolute path string""" - return os.readlink(os.path.join(sysdir, fn)) + def iommu_group() -> int: + return int( + os.path.basename(os.readlink(os.path.join(devdir, "iommu_group"))) + ) + + devdir = os.path.join(DEVDIR, id) + + readers = { + "realpath": lambda: os.path.realpath(devdir), + "config": lambda: read("config"), + "iommu_group": iommu_group, + "modalias": lambda: string(read("modalias")), + "numa_node": lambda: int(string(read("numa_node"))), + "vpd": lambda: read("vpd"), + "subsystems": subsystems, + } + + self.data: Dict[str, Any] = {} + for k, reader in readers.items(): + try: + self.data[k] = reader() + except FileNotFoundError as e: + if k in SysfsDevice.REQUIRED_KEYS: + raise CommandError(f"Missing required sysfs path: {e.filename}") + self.data[k] = None + except PermissionError as e: + raise CommandError( + f"Cannot read sysfs path: {e.filename}. Are you root?" + ) + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> SysfsDevice: + obj = object.__new__(cls) + + obj.data = copy.deepcopy(data) + for k in SysfsDevice.REQUIRED_KEYS: + if k not in obj.data or obj.data[k] is None: + raise ValueError(f"Missing required key '{k}'") + + for k in SysfsDevice.ENCODED_KEYS: + if k in obj.data and obj.data[k] is not None: + try: + obj.data[k] = decompress(b64decode(obj.data[k])) + except Exception as e: + raise ValueError(f"Invalid encoded value for key '{k}': {e}") + + return obj + + def to_dict(self) -> Dict[str, Any]: + res = copy.deepcopy(self.data) + for k in SysfsDevice.ENCODED_KEYS: + if k not in res: + continue + if res[k] is not None: + res[k] = b64encode(compress(res[k])).decode("ascii") + else: + del res[k] + return res + + +def parse_vpd(vpd: Optional[bytes]) -> Tuple[Optional[str], Optional[str]]: + """Parse VPD name and V3 UUID""" + if vpd is None: + return None, None + + name = None + v3 = None + + def items(data: bytes) -> Generator[Tuple[int, bytes]]: + while len(data) > 0: + tag = data[0] + if tag in [PCI_VPD_END_SMALL, PCI_VPD_END_LARGE]: + break + + if tag & PCI_VPD_LRDT: + offset = 3 + if len(data) < 3: + break + length = int.from_bytes(data[1:3], "little") + else: + offset = 1 + length = tag & 0x07 + + if length > len(data) - offset: + break + + yield (tag, data[offset : offset + length]) + data = data[offset + length :] + + def keywords(data: bytes) -> Generator[Tuple[str, bytes]]: + while len(data) >= 4: + length = int(data[2]) + if length > len(data) - 3: + break + yield (data[:2].decode("ascii"), data[3 : 3 + length]) + data = data[3 + length :] + + try: + for tag, item in items(vpd): + if tag == PCI_VPD_LRDT_ID: + name = item.decode("ascii").strip() + if tag == PCI_VPD_LRDT_RO: + for keyword, value in keywords(item): + if keyword == "V3": + v3 = value.decode("ascii") + except UnicodeDecodeError: + pass + + return (v3, name) + + +def parse_ext_cap(config: bytes, cap_id: int) -> Optional[bytes]: + """Parse an extended capability from the PCI configuration space""" + if len(config) < 0x104: + return None + + offset = 0x100 + while offset and offset < len(config) - 4: + header = int.from_bytes(config[offset : offset + 4], "little") + next_offset = (header >> 20) & 0xFFC + if next_offset == 0 or next_offset <= offset + 4 or next_offset > len(config): + next_offset = len(config) + if (header & 0xFFFF) == cap_id: + return config[offset + 4 : next_offset] + if next_offset == len(config): + break + offset = next_offset + + return None + + +def parse_acs_ctrl(config: bytes) -> Optional[int]: + """Parse the ACS control register from the PCI configuration space""" + raw = parse_ext_cap(config, PCI_EXT_CAP_ID_ACS) + + if raw is None or len(raw) < 4: + return None + + return int.from_bytes(raw[2:4], "little") + + +def has_ats_cap(config: bytes) -> bool: + """True if the device exposes an ATS capability""" + return parse_ext_cap(config, PCI_EXT_CAP_ID_ATS) is not None def PCI_VDEVICE(vendor: str, device_id: int) -> re.Pattern: @@ -60,7 +269,8 @@ def PCI_NVGPU() -> re.Pattern: return re.compile(rf"^pci:v{vendor_id:08X}.*bc{class_id:02X}.*$") -# Table of modalias matches to the device_type string +# Table of modalias matches to the device_type string. +# Order is important. The first matching device type is used. pci_device_types = { PCI_VDEVICE("NVIDIA", 0x22B1): "grace_rp", # NVIDIA Grace PCI Root Port Bridge PCI_VDEVICE("NVIDIA", 0x22B2): "grace_rp", # NVIDIA Grace PCI Root Port Bridge @@ -71,11 +281,23 @@ pci_device_types = { PCI_VDEVICE("MELLANOX", 0x2100): "cx_dma", # ConnectX-8 DMA Controller PCI_VDEVICE("MELLANOX", 0x197B): "bf3_switch", # USP/DSP of a BF3 switch PCI_VDEVICE("MELLANOX", 0x197C): "cx_switch", # USP/DSP of a CX switch + PCI_VDEVICE("MELLANOX", 0x1979): "cx_switch", # USP/DSP of a CX switch PCI_DEVICE_CLASS(0x010802): "nvme", PCI_NVGPU(): "nvgpu", + PCI_DEVICE_CLASS(0x060400): "bridge", # Generic PCI-PCI bridge / Root Port } +dump_ignored = [ + PCI_DEVICE_CLASS(0x060000), # Generic system peripheral + PCI_DEVICE_CLASS(0x060100), # ISA bridge + PCI_DEVICE_CLASS(0x080700), # Non-Essential Instrumentation + PCI_DEVICE_CLASS(0x088000), # System peripheral + PCI_DEVICE_CLASS(0x110100), # Performance counters + PCI_DEVICE_CLASS(0x130000), # Non-Essential Instrumentation +] + + class PCIBDF( collections.namedtuple("PCIBDF", ["segment", "bus", "device", "function"]) ): @@ -100,42 +322,39 @@ def to_pcibdf(s: str) -> Optional[PCIBDF]: class PCIDevice(object): device_type = "" - vpd_v3: str = None + vpd_v3: Optional[str] = None + vpd_name: Optional[str] = None parent: PCIDevice = None - lspci_data: str = None - def __init__(self, sysdir: str, bdf: PCIBDF): - self.sysdir = sysdir + def __init__(self, bdf: PCIBDF, sysfs_device: SysfsDevice): self.bdf = bdf - try: - self.iommu_group = int( - os.path.split(sysfs_read_link(sysdir, "iommu_group"))[-1] - ) - except FileNotFoundError: - self.iommu_group = None + self.sysfs_device = sysfs_device - try: - self.numa_node = int(sysfs_read_str(sysdir, "numa_node")) - except FileNotFoundError: - self.numa_node = None + self.iommu_group = self.sysfs_device.iommu_group + self.numa_node = self.sysfs_device.numa_node + self.modalias = self.sysfs_device.modalias + + parent = os.path.basename(os.path.dirname(self.sysfs_device.realpath)) + self.parent_bdf = to_pcibdf(parent) - self.modalias = sysfs_read_str(sysdir, "modalias") for k, v in pci_device_types.items(): if k.match(self.modalias): + if self.parent_bdf is None and v == "bridge": + v = "generic_rp" self.device_type = v break - sysdir = os.path.realpath(sysdir) - parent = os.path.basename(os.path.dirname(sysdir)) - self.parent_bdf = to_pcibdf(parent) self.children: Set[PCIDevice] = set() + self.has_ats = False def finish_loading(self): """Do more expensive parsing operations""" if self.device_type == "cx_nic" or self.device_type == "cx_dma": - self.vpd_v3 = self.parse_vpd_v3() - if "switch" in self.device_type or self.device_type == "grace_rp": - self.has_acs = self.parse_has_acs() + self.vpd_v3, self.vpd_name = parse_vpd(self.sysfs_device.vpd) + if "switch" in self.device_type or self.device_type.endswith("_rp"): + self.has_acs = self.get_acs_ctrl() is not None + if self.device_type == "cx_nic": + self.has_ats = has_ats_cap(self.sysfs_device.config) def iterdownstream(self) -> Generator[PCIDevice, None, None]: """Iterate over all downstream devices of this device recursively""" @@ -159,73 +378,60 @@ class PCIDevice(object): def __repr__(self): return f"PCIDevice({self.bdf})" - def lspci(self): - """Fetch the verbose output of lspci""" - vpdfn = os.path.join(self.sysdir, "vpd") - if os.path.exists(vpdfn) and not os.access(vpdfn, os.R_OK): - raise CommandError( - f"Need access to the PCI VPD information in {vpdfn}, are you root?" - ) + def get_acs_ctrl(self): + """Read the ACS control register from the PCI configuration space""" + return parse_acs_ctrl(self.sysfs_device.config) - if not self.lspci_data: - self.lspci_data = subprocess.check_output( - ["lspci", "-s", f"{self.bdf.as_pci()}", "-vv"] - ).decode() - return self.lspci_data - - def parse_vpd_v3(self): - """Use lspci to parse the VPD and get the V3 UUID, this only works as - root on non-secure boot systems.""" - g = re.search( - r"Capabilities: \[.+?\] Vital Product Data$.*Read-only fields:$.*\[V3\] Vendor specific: (.*?)$.*End$", - self.lspci(), - re.DOTALL | re.MULTILINE, - ) - if not g: - return None - return g.group(1) - - def parse_has_acs(self): - """True if the device has an ACS capability""" - return bool( - re.search( - r"Capabilities: \[.+?\] Access Control Services$", - self.lspci(), - re.DOTALL | re.MULTILINE, - ) - ) + def get_subsystems(self): + """Return a list of subsystem the PCI device is connected to""" + return self.sysfs_device.subsystems or {} - def parse_vpd_name(self): - g = re.search( - r"Capabilities: \[.+?\] Vital Product Data$.*Product Name: (.*?)$.*End$", - self.lspci(), - re.DOTALL | re.MULTILINE, - ) - if not g: - return None - return g.group(1).strip() - def read_config(self, regname: str): - """Use setpci to read a register""" - return int( - subprocess.check_output( - ["setpci", "-r", "-s", str(self.bdf), "ECAP_ACS+0x6.w"] - ) - .decode() - .strip(), - 16, - ) +class NVCX_Complex(ABC): + @property + @abstractmethod + def primary_nic(self) -> PCIDevice: + """Primary ConnectX PF for this complex.""" + pass - def get_subsystems(self): - """Return a list of subsystem the PCI device is connected to""" - res: Dict[str, Set[str]] = collections.defaultdict(set) - for fn in os.listdir(self.sysdir): - if fn in {"drm", "infiniband", "net", "nvme"}: - res[fn].update(os.listdir(os.path.join(self.sysdir, fn))) - return res + @abstractmethod + def compute_acs(self, virt: Optional[bool]) -> Dict[PCIDevice, str]: + """Computes the ACS values for this complex. + + Used to implement commands which check and/or set ACS values. + """ + pass + + @abstractmethod + def to_dict(self) -> Dict[str, Any]: + """Returns a JSON-serializable dictionary which represents this complex. + + Used to implement topology dump command with `-j / --json` flag. + + Output format should be maintained for backwards compatibility. + """ + pass + + @abstractmethod + def check(self, virt: Optional[bool]) -> bool: + """Runs additional checks on this complex. + Returns True if all checks pass, False otherwise. -class NVCX_Complex(object): + Used to implement the `check` command. + """ + pass + + @abstractmethod + def __str__(self) -> str: + """Returns a string representation of this complex. + + Used to implement the `topo` command. + """ + pass + + +class NVCX_DMA_Complex(NVCX_Complex): """Hold the related PCI functions together. A complex includes a CX PF, a CX DMA function, an GPU and related PCI switches in the DMA function segment.""" @@ -255,6 +461,10 @@ class NVCX_Complex(object): if pdev.device_type == "nvme": self.nvmes.add(pdev) + @property + def primary_nic(self) -> PCIDevice: + return self.cx_pf + def __find_shared_usp(self) -> PCIDevice: """Find the USP that is shared by both devices, the immediate downstream bus is the point in the topology where P2P traffic will switch from an @@ -271,12 +481,284 @@ class NVCX_Complex(object): assert i.device_type == "cx_switch" return pdev - def get_subsystems(self): + def compute_acs(self, _: Optional[bool]) -> Dict[PCIDevice, str]: + acs: Dict[PCIDevice, str] = {} + + # For the DSP in the shared switch toward the CX8 DMA Direct interface: + # Enable these bits: + # bit-4 : ACS Upstream Forwarding + # bit-3 : ACS P2P Completion Redirect + # bit-0 : ACS Source Validation + # Disable these bits: + # bit-2 : ACS P2P Request Redirect + assert self.cx_dma_dsp.has_acs + acs[self.cx_dma_dsp] = "xx110x1" + + # For the DSP in the shared switch toward the GPU: + # Enable the following bits: + # bit-4 : ACS Upstream Forwarding + # bit-2 : ACS P2P Request Redirect + # bit-0 : ACS Source Validation + # Disable the following bits: + # bit-3 : ACS P2P Completion Redirect + assert self.nvgpu_dsp.has_acs + acs[self.nvgpu_dsp] = "xx101x1" + + # Disable ACS SV on the root port, this forces the entire segment + # into one iommu_group and avoids kernel bugs building groups for + # irregular ACS. + for pdev in self.cx_dma_dsp.iterupstream_path(): + if not pdev.parent: + assert pdev.has_acs + acs[pdev] = "xx111x0" + + return acs + + def to_dict(self) -> Dict[str, Any]: + res = { + "rdma_nic_pf_bdf": str(self.cx_pf.bdf), + "rdma_dma_bdf": str(self.cx_dma.bdf), + "gpu_bdf": str(self.nvgpu.bdf), + "subsystems": {}, + } + devname = self.cx_pf.vpd_name + if devname: + res["rdma_nic_vpd_name"] = self.cx_pf.vpd_name + if self.cx_pf.numa_node is not None: + res["numa_node"] = self.cx_pf.numa_node + if self.nvmes: + res["nvme_bdf"] = str(next(iter(self.nvmes)).bdf) + + for pdev in sorted( + itertools.chain(self.cx_pfs, [self.nvgpu, self.cx_dma], self.nvmes), + key=lambda x: x.bdf, + ): + subsys = pdev.get_subsystems() + if subsys: + res["subsystems"][str(pdev.bdf)] = { + subsys: list(devs) for subsys, devs in subsys.items() + } + return res + + def __str__(self): + res = f"RDMA NIC={self.cx_pf.bdf}, GPU={self.nvgpu.bdf}, RDMA DMA Function={self.cx_dma.bdf}\n" + devname = self.cx_pf.vpd_name + if devname: + res += f"\t{devname}\n" + + if self.cx_pf.numa_node is not None: + res += f"\tNUMA Node: {self.cx_pf.numa_node}\n" + + if len(self.cx_pfs): + res += print_list("NIC PCI device", [str(I.bdf) for I in self.cx_pfs]) + subsystems: Dict[str, Set[str]] = collections.defaultdict(set) for pdev in itertools.chain(self.cx_pfs, [self.nvgpu, self.cx_dma], self.nvmes): for k, v in pdev.get_subsystems().items(): subsystems[k].update(v) - return subsystems + res += print_list("RDMA device", subsystems["infiniband"]) + res += print_list("Net device", subsystems["net"]) + res += print_list("DRM device", subsystems["drm"]) + res += print_list("NVMe device", subsystems["nvme"]) + + return res[:-1] + + def check(self, _: Optional[bool]) -> bool: + # Correct iommu_groups are required to avoid NVreg_GrdmaPciTopoCheckOverride + if ( + self.cx_dma.iommu_group == self.nvgpu.iommu_group + and self.cx_dma.iommu_group is not None + ): + check_ok( + f"Kernel iommu_group for DMA {self.cx_dma.bdf} and GPU {self.nvgpu.bdf} are both {self.cx_dma.iommu_group}" + ) + return True + + check_fail( + f"Kernel iommu_group for DMA {self.cx_dma.bdf} and GPU {self.nvgpu.bdf} are not equal {self.cx_dma.iommu_group} != {self.nvgpu.iommu_group}" + ) + return False + + +class NVCX_Inline_Complex(NVCX_Complex): + def __init__( + self, + root_port: PCIDevice, + shared_usp: PCIDevice, + cx_pf: PCIDevice, + nvgpu: PCIDevice, + ): + self.root_port = root_port + self.cx_pf = cx_pf + self.nvgpu = nvgpu + self.cx_pf_dsp = None + self.nvgpu_dsp = None + + for dsp in shared_usp.children: + for pdev in dsp.iterdownstream(): + if pdev.device_type == "cx_nic": + if self.cx_pf_dsp is not None: + raise ValueError( + f"Multiple CX NIC DSPs under the same shared switch not supported" + ) + self.cx_pf_dsp = dsp + break + if pdev.device_type == "nvgpu": + if self.nvgpu_dsp is not None: + raise ValueError( + f"Multiple GPU DSPs under the same shared switch not supported" + ) + self.nvgpu_dsp = dsp + break + + if not self.cx_pf_dsp: + raise ValueError(f"CX NIC DSP not found in the topology") + if not self.nvgpu_dsp: + raise ValueError(f"GPU DSP not found in the topology") + + @property + def primary_nic(self) -> PCIDevice: + return self.cx_pf + + def compute_acs(self, virt: Optional[bool]) -> Dict[PCIDevice, str]: + if not self.cx_pf_dsp.has_acs: + raise CommandError(f"CX NIC DSP {self.cx_pf_dsp.bdf} lacks ACS") + if not self.nvgpu_dsp.has_acs: + raise CommandError(f"GPU DSP {self.nvgpu_dsp.bdf} lacks ACS") + if not self.root_port.has_acs: + raise CommandError(f"Root port {self.root_port.bdf} lacks ACS") + if virt is None: + raise CommandError("Unexpected: Could not determine virt mode") + + if virt: + return { + # The DSPs of the NIC which is non DD in the shared switch should + # have the following enabled: + # bit-6 : ACS Direct Translated P2P + # bit-4 : ACS Upstream Forwarding + # bit-3 : ACS P2P Completion Redirect + # bit-2 : ACS P2P Request Redirect + # bit-0 : ACS Source Validation + self.cx_pf_dsp: "1x111x1", + # The DSPs of the GPU in the shared switch and the RP of the NIC/GPU + # should have the following enabled, matching the kernel default: + # bit-4 : ACS Upstream Forwarding + # bit-3 : ACS P2P Completion Redirect + # bit-2 : ACS P2P Request Redirect + # bit-0 : ACS Source Validation + self.nvgpu_dsp: KERNEL_ACS_ISOLATED, + self.root_port: KERNEL_ACS_ISOLATED, + } + else: + return { + # The DSPs of both the NIC and GPU in the shared switch and + # RPs of the NIC/GPU should have the following disabled: + # bit-4 : ACS Upstream Forwarding + # bit-3 : ACS P2P Completion Redirect + # bit-2 : ACS P2P Request Redirect + # bit-0 : ACS Source Validation + self.cx_pf_dsp: "xx000x0", + self.nvgpu_dsp: "xx000x0", + self.root_port: "xx000x0", + } + + def to_dict(self) -> Dict[str, Any]: + res = { + "rdma_nic_pf_bdf": str(self.cx_pf.bdf), + "gpu_bdf": str(self.nvgpu.bdf), + "subsystems": {}, + } + devname = self.cx_pf.vpd_name + if devname: + res["rdma_nic_vpd_name"] = self.cx_pf.vpd_name + if self.cx_pf.numa_node is not None: + res["numa_node"] = self.cx_pf.numa_node + if self.cx_pf.has_ats: + res["rdma_nic_ats"] = self.cx_pf.has_ats + + for pdev in sorted( + itertools.chain([self.cx_pf, self.nvgpu]), + key=lambda x: x.bdf, + ): + subsys = pdev.get_subsystems() + if subsys: + res["subsystems"][str(pdev.bdf)] = { + subsys: list(devs) for subsys, devs in subsys.items() + } + return res + + def __check_ats(self, virt: bool) -> bool: + status = "available" if self.cx_pf.has_ats else "not available" + msg = f"ATS capability for {self.cx_pf.device_type} {self.cx_pf.bdf} is {status}" + + if self.cx_pf.has_ats != virt: + check_fail(msg) + return False + + check_ok(msg) + return True + + def __check_iommu_group(self, virt: bool) -> bool: + cxpf = f"{self.cx_pf.device_type} {self.cx_pf.bdf}" + nvgpu = f"{self.nvgpu.device_type} {self.nvgpu.bdf}" + prefix = f"Kernel iommu_group for {cxpf} and {nvgpu}" + + equal = f"equal {self.cx_pf.iommu_group} == {self.nvgpu.iommu_group}" + not_equal = f"not equal {self.cx_pf.iommu_group} != {self.nvgpu.iommu_group}" + + if virt: + if self.cx_pf.iommu_group is None: + check_fail(f"Kernel iommu_group is missing for {cxpf}") + return False + + if self.nvgpu.iommu_group is None: + check_fail(f"Kernel iommu_group is missing for {nvgpu}") + return False + + if self.cx_pf.iommu_group == self.nvgpu.iommu_group: + check_fail(f"{prefix} are {equal}") + return False + + check_ok(f"{prefix} are {not_equal}") + return True + else: + if self.cx_pf.iommu_group is None and self.nvgpu.iommu_group is None: + check_ok(f"{prefix} are not set") + return True + + if self.cx_pf.iommu_group != self.nvgpu.iommu_group: + check_fail(f"{prefix} are {not_equal}") + return False + + check_ok(f"{prefix} are {equal}") + return True + + def check(self, virt: Optional[bool]) -> bool: + assert virt is not None + res_ats = self.__check_ats(virt) + res_iommu_group = self.__check_iommu_group(virt) + return res_ats and res_iommu_group + + def __str__(self): + res = f"RDMA NIC={self.cx_pf.bdf}, GPU={self.nvgpu.bdf}\n" + devname = self.cx_pf.vpd_name + if devname: + res += f"\t{devname}\n" + if self.cx_pf.numa_node is not None: + res += f"\tNUMA Node: {self.cx_pf.numa_node}\n" + + res += f"\tNIC ATS: {yesno(self.cx_pf.has_ats)}\n" + + subsystems: Dict[str, Set[str]] = collections.defaultdict(set) + for pdev in [self.cx_pf, self.nvgpu]: + for k, v in pdev.get_subsystems().items(): + subsystems[k].update(v) + res += print_list("RDMA device", subsystems["infiniband"]) + res += print_list("Net device", subsystems["net"]) + res += print_list("DRM device", subsystems["drm"]) + res += print_list("NVMe device", subsystems["nvme"]) + + return res[:-1] def check_parent(pdev: PCIDevice, parent_type: str): @@ -290,24 +772,82 @@ def check_parent(pdev: PCIDevice, parent_type: str): class PCITopo(object): """Load the PCI topology from sysfs and organize it""" - def __init__(self): - self.devices = self.__load_devices("/sys/bus/pci/devices/") + def __init__( + self, + sysfs_dump: Optional[str] = None, + virt: Optional[bool] = None, + autodetect_virt: bool = True, + ): + if sysfs_dump: + sysfs_devices = self.__parse_dump(sysfs_dump) + else: + sysfs_devices = [SysfsDevice(fn) for fn in os.listdir(DEVDIR)] + self.devices = self.__load_devices(sysfs_devices) + self.nvcxs: List[NVCX_Complex] = [] self.has_cx_dma = any( pdev.device_type == "cx_dma" for pdev in self.devices.values() ) - if self.has_cx_dma: + self.has_gpu_and_nic = False + + if self.has_cx_dma and virt is not None: + raise CommandError( + "--virt / --no-virt is not supported on DMA-based topologies" + ) + self.virt = virt + self._autodetect_virt = autodetect_virt + + if not self.has_cx_dma: + found = { + "cx_switch": False, + "nvgpu": False, + "cx_nic": False, + } for pdev in self.devices.values(): - pdev.finish_loading() - self.__build_topo() + if pdev.device_type not in found.keys(): + continue + found[pdev.device_type] = True + self.has_gpu_and_nic = all(found.values()) + + if not self.has_gpu_and_nic: + return + + for pdev in self.devices.values(): + pdev.finish_loading() + self.__build_topo() - def __load_devices(self, sysdir: str): + def __parse_dump(self, filename: str) -> List[SysfsDevice]: + res: List[SysfsDevice] = [] + try: + with open(filename, "rt") as F: + data = json.load(F) + + if not isinstance(data, list): + raise ValueError(f"Expected list, got '{type(data).__name__}'") + + num_items = len(data) + for i, item in enumerate(data): + if not isinstance(item, dict): + raise ValueError( + f"Item {i}/{num_items}: Expected dictionary, got '{type(item).__name__}'" + ) + try: + res.append(SysfsDevice.from_dict(item)) + except Exception as e: + raise ValueError(f"Item {i}/{num_items}: {e}") from e + return res + except (json.JSONDecodeError, ValueError) as e: + raise CommandError(f"Invalid sysfs dump file: {e}") + except (FileNotFoundError, PermissionError) as e: + raise CommandError(f"Failed to read sysfs dump file: {e}") + + def __load_devices(self, sysfs_devices: List[SysfsDevice]): res: Dict[PCIBDF, PCIDevice] = {} - for fn in os.listdir(sysdir): - bdf = to_pcibdf(fn) + for sdev in sysfs_devices: + bdf = to_pcibdf(sdev.id) if not bdf: continue assert bdf not in res - res[bdf] = PCIDevice(os.path.join(sysdir, fn), bdf) + res[bdf] = PCIDevice(bdf, sdev) return res def __get_nvcx_complex(self, cx_dma: PCIDevice): @@ -368,14 +908,58 @@ class PCITopo(object): raise ValueError( f"CX DMA function {cx_dma} has unexpected PCI devices in the topology" ) - return NVCX_Complex(cx_pfs, cx_dma, nvgpu) + + return NVCX_DMA_Complex(cx_pfs, cx_dma, nvgpu) + + def __get_nvcx_inline_complex(self, nvgpu: PCIDevice): + """Match the topology for the inline complex using a GPU and a CX NIC. + + RP --> SW -> CX_NIC + -> SW -> GPU + """ + assert nvgpu.device_type == "nvgpu" + + nvgpu_dsp2 = check_parent(nvgpu, "cx_switch") + nvgpu_usp2 = check_parent(nvgpu_dsp2, "cx_switch") + nvgpu_dsp1 = check_parent(nvgpu_usp2, "cx_switch") + shared_usp1 = check_parent(nvgpu_dsp1, "cx_switch") + if not shared_usp1: + raise ValueError(f"GPU {nvgpu} has an unrecognized upstream path") + + for pdev in shared_usp1.iterupstream_path(): + if pdev.device_type == "generic_rp": + root_port = pdev + break + else: + raise ValueError( + f"Could not find root port for shared USP {shared_usp1.bdf}" + ) + + for pdev in shared_usp1.iterdownstream(): + if pdev.device_type == "cx_nic": + cx_nic = pdev + break + else: + raise ValueError(f"GPU {nvgpu} does not have a nearby CX NIC") + + return NVCX_Inline_Complex(root_port, shared_usp1, cx_nic, nvgpu) + + def __auto_detect_virt(self) -> bool: + """Auto-detect if virtualization will be used on this system""" + first = self.nvcxs[0].primary_nic.has_ats + if not all(nvcx.primary_nic.has_ats == first for nvcx in self.nvcxs): + raise CommandError( + "Could not auto-detect virtualization: CX NICs have different ATS settings" + ) + + return first def __build_topo(self): """Collect cross-device information together and build the NVCX_Complex objects for the cx_dma functions""" self.vpd_v3s: Dict[str, Set[PCIDevice]] = collections.defaultdict(set) for pdev in self.devices.values(): - if pdev.parent_bdf: + if pdev.parent_bdf and pdev.parent_bdf in self.devices: pdev.parent = self.devices[pdev.parent_bdf] pdev.parent.children.add(pdev) @@ -383,71 +967,85 @@ class PCITopo(object): if pdev.vpd_v3: self.vpd_v3s[pdev.vpd_v3].add(pdev) - self.nvcxs: List[NVCX_Complex] = [] - for pdev in self.devices.values(): - if pdev.device_type == "cx_dma": - nvcx = self.__get_nvcx_complex(pdev) - self.nvcxs.append(nvcx) - self.nvcxs.sort(key=lambda x: x.cx_pf.bdf) + if self.has_cx_dma: + for pdev in self.devices.values(): + if pdev.device_type == "cx_dma": + nvcx = self.__get_nvcx_complex(pdev) + self.nvcxs.append(nvcx) + elif self.has_gpu_and_nic: + for pdev in self.devices.values(): + if pdev.device_type == "nvgpu": + nvcx = self.__get_nvcx_inline_complex(pdev) + self.nvcxs.append(nvcx) + + if self.has_gpu_and_nic and len(self.nvcxs) > 0: + if self.virt is None and self._autodetect_virt: + self.virt = self.__auto_detect_virt() + + self.nvcxs.sort(key=lambda x: x.primary_nic.bdf) + + @property + def supported(self) -> bool: + """True if the system has a topology that is supported by the rdma_topo tool""" + return (self.has_cx_dma or self.has_gpu_and_nic) and len(self.nvcxs) > 0 def compute_acs(self): """Return a dictionary of PCI devices and the ACS mask the device should have""" acs: Dict[PCIDevice, str] = {} for nvcx in self.nvcxs: - # For the DSP in the shared switch toward the CX8 DMA Direct interface: - # Enable these bits: - # bit-4 : ACS Upstream Forwarding - # bit-3 : ACS P2P Completion Redirect - # bit-0 : ACS Source Validation - # Disable these bits: - # bit-2 : ACS P2P Request Redirect - assert nvcx.cx_dma_dsp.has_acs - acs[nvcx.cx_dma_dsp] = "xx110x1" - - # For the DSP in the shared switch toward the GPU: - # Enable the following bits: - # bit-4 : ACS Upstream Forwarding - # bit-2 : ACS P2P Request Redirect - # bit-0 : ACS Source Validation - # Disable the following bits: - # bit-3 : ACS P2P Completion Redirect - assert nvcx.nvgpu_dsp.has_acs - acs[nvcx.nvgpu_dsp] = "xx101x1" - - # Disable ACS SV on the root port, this forces the entire segment - # into one iommu_group and avoids kernel bugs building groups for - # irregular ACS. - for pdev in nvcx.cx_dma_dsp.iterupstream_path(): - if not pdev.parent: - assert pdev.has_acs - acs[pdev] = "xx111x0" - - # For all other CX bridges set kernel's default ACS enable - # Enable these bits: + acs.update(nvcx.compute_acs(self.virt)) + + # Enable, using kernel default, or disable ACS on all other CX + # bridges and Grace RP based on the virt parameter or if the topology + # has CX DMA functions. + # + # To enable (matches kernel default): # bit-4 : ACS Upstream Forwarding # bit-3 : ACS P2P Completion Redirect # bit-2 : ACS P2P Request Redirect # bit-0 : ACS Source Validation - # Which match the kernel default for pdev in self.devices.values(): if ( pdev not in acs and ("switch" in pdev.device_type or "grace_rp" in pdev.device_type) and pdev.has_acs ): - acs[pdev] = KERNEL_ACS_ISOLATED + acs[pdev] = ( + KERNEL_ACS_ISOLATED if self.has_cx_dma or self.virt else "xx000x0" + ) return acs +def add_sysfs_dump_argument(parser): + parser.add_argument( + "-F", + "--sysfs-dump-file", + action="store", + default=None, + dest="sysfs_dump", + help="Use a file produced by the rdma_topo dump command as input", + ) + + +def add_virt_argument(parser: argparse.ArgumentParser) -> None: + parser.add_argument( + "--virt", + action=argparse.BooleanOptionalAction, + default=None, + dest="virt", + help="Whether virtualization will be used on this system. Auto-detect if not set.", + ) + + # ------------------------------------------------------------------- def print_list(title: str, items: list[str]): if not items: - return + return "" if len(items) > 1: title = title + "s" list_str = ", ".join(sorted(items)) - print(f"\t{title}: {list_str}") + return f"\t{title}: {list_str}\n" def args_topology(parser): @@ -458,6 +1056,7 @@ def args_topology(parser): dest="json", help="Output in machine readable JSON format", ) + add_sysfs_dump_argument(parser) def topo_json(topo: PCITopo): @@ -465,63 +1064,23 @@ def topo_json(topo: PCITopo): jtop = [] for nvcx in topo.nvcxs: - jnvcx = { - "rdma_nic_pf_bdf": str(nvcx.cx_pf.bdf), - "rdma_dma_bdf": str(nvcx.cx_dma.bdf), - "gpu_bdf": str(nvcx.nvgpu.bdf), - "subsystems": {}, - } - devname = nvcx.cx_pf.parse_vpd_name() - if devname: - jnvcx["rdma_nic_vpd_name"] = nvcx.cx_pf.parse_vpd_name() - if nvcx.cx_pf.numa_node is not None: - jnvcx["numa_node"] = nvcx.cx_pf.numa_node - if nvcx.nvmes: - jnvcx["nvme_bdf"] = str(next(iter(nvcx.nvmes)).bdf) - - for pdev in sorted( - itertools.chain(nvcx.cx_pfs, [nvcx.nvgpu, nvcx.cx_dma], nvcx.nvmes), - key=lambda x: x.bdf, - ): - subsys = pdev.get_subsystems() - if subsys: - jnvcx["subsystems"][str(pdev.bdf)] = { - subsys: list(devs) for subsys, devs in subsys.items() - } - jtop.append(jnvcx) + jtop.append(nvcx.to_dict()) print(json.dumps(jtop, indent=4)) def cmd_topology(args): """List the ConnectX NICs in the system with the corresponding NIC - function, DMA Direct function and associated GPU.""" - topo = PCITopo() - if not topo.has_cx_dma: - raise CommandError("No ConnectX DMA Direct functions detected") + function, associated GPU, and, optionally, DMA Direct function.""" + topo = PCITopo(args.sysfs_dump, virt=None, autodetect_virt=False) + if not topo.supported: + raise TOPO_NOT_SUPPORTED if args.json: return topo_json(topo) for nvcx in topo.nvcxs: - print( - f"RDMA NIC={nvcx.cx_pf.bdf}, GPU={nvcx.nvgpu.bdf}, RDMA DMA Function={nvcx.cx_dma.bdf}" - ) - - devname = nvcx.cx_pf.parse_vpd_name() - if devname: - print(f"\t{devname}") - - if nvcx.cx_pf.numa_node is not None: - print(f"\tNUMA Node: {nvcx.cx_pf.numa_node}") + print(nvcx) - if len(nvcx.cx_pfs): - print_list("NIC PCI device", [str(I.bdf) for I in nvcx.cx_pfs]) - - subsystems = nvcx.get_subsystems() - print_list("RDMA device", subsystems["infiniband"]) - print_list("Net device", subsystems["net"]) - print_list("DRM device", subsystems["drm"]) - print_list("NVMe device", subsystems["nvme"]) cmd_topology.__aliases__ = ("topo",) # ------------------------------------------------------------------- @@ -561,6 +1120,7 @@ def args_write_grub_acs(parser): default="/etc/default/grub.d/config-acs.cfg", help="Grub dropin file to use for the kernel command line", ) + add_virt_argument(parser) def cmd_write_grub_acs(args): @@ -571,11 +1131,14 @@ def cmd_write_grub_acs(args): If the system does not have any need of ACS flags the dropin file will be removed. This command is intended for Debian style systems with a /etc/default/grub.d and update-grub command.""" - topo = PCITopo() - if not topo.has_cx_dma: + topo = PCITopo(None, args.virt) + if not topo.supported: if args.dry_run: - raise CommandError("No ConnectX DMA Direct functions detected") + raise TOPO_NOT_SUPPORTED if os.path.exists(args.output): + print( + f"W: Found ACS drop-in file {args.output} but the system does not have a supported topology. Deleting file." + ) os.unlink(args.output) return @@ -622,6 +1185,7 @@ def args_setpci_acs(parser): dest="dry_run", help="Output the setpci commands to stdout and make no changes", ) + add_virt_argument(parser) def cmd_setpci_acs(args): @@ -634,11 +1198,17 @@ def cmd_setpci_acs(args): NOTE: In this configuration unprivileged userspace can trigger platform RAS failures, use with caution! """ - topo = PCITopo() + topo = PCITopo(None, args.virt) + if not topo.supported: + raise TOPO_NOT_SUPPORTED acs = topo.compute_acs() cmds: List[List[str]] = [] for pdev, acs in sorted(acs.items(), key=lambda x: x[0].bdf): - cur_acs = pdev.read_config("ECAP_ACS+0x6.w") + cur_acs = pdev.get_acs_ctrl() + if cur_acs is None: + raise CommandError( + f"Could not read ACS control register for {pdev.device_type} {pdev.bdf}" + ) new_acs = combine_acs(cur_acs, acs) if new_acs == cur_acs: continue @@ -655,7 +1225,8 @@ def cmd_setpci_acs(args): # ------------------------------------------------------------------- def args_check(parser): - pass + add_sysfs_dump_argument(parser) + add_virt_argument(parser) def check_ok(msg: str): @@ -664,20 +1235,29 @@ def check_ok(msg: str): def check_fail(msg: str): print(f"FAIL\t{msg}") - sys.exit(100) def cmd_check(args): """Check that the running kernel and PCI environment are setup correctly for GPU Direct with ConnectX DMA Direct PCI functions.""" - topo = PCITopo() - if not topo.has_cx_dma: - raise CommandError("No ConnectX DMA Direct functions detected") - check_ok("All ConnectX DMA functions have correct PCI topology") - + topo = PCITopo(args.sysfs_dump, args.virt) + if not topo.supported: + raise TOPO_NOT_SUPPORTED + if topo.has_cx_dma: + check_ok("All ConnectX DMA functions have correct PCI topology") + elif topo.has_gpu_and_nic: + check_ok("All NIC/GPU complexes have correct PCI topology") + + fatal = False acs = topo.compute_acs() for pdev, acs in sorted(acs.items(), key=lambda x: x[0].bdf): - cur_acs = pdev.read_config("ECAP_ACS+0x6.w") + cur_acs = pdev.get_acs_ctrl() + if cur_acs is None: + check_fail( + f"Could not read ACS control register for {pdev.device_type} {pdev.bdf}" + ) + fatal = True + continue new_acs = combine_acs(cur_acs, acs) if new_acs == cur_acs: check_ok( @@ -687,20 +1267,29 @@ def cmd_check(args): check_fail( f"ACS for {pdev.device_type} {pdev.bdf} has incorrect values {cur_acs:07b} != {acs}, (0x{cur_acs:x} != 0x{new_acs:x})" ) + fatal = True - # Correct iommu_groups are required to avoid NVreg_GrdmaPciTopoCheckOverride for nvcx in topo.nvcxs: - if ( - nvcx.cx_dma.iommu_group == nvcx.nvgpu.iommu_group - and nvcx.cx_dma.iommu_group is not None - ): - check_ok( - f"Kernel iommu_group for DMA {nvcx.cx_dma.bdf} and GPU {nvcx.nvgpu.bdf} are both {nvcx.cx_dma.iommu_group}" - ) - else: - check_fail( - f"Kernel iommu_group for DMA {nvcx.cx_dma.bdf} and GPU {nvcx.nvgpu.bdf} are not equal {nvcx.cx_dma.iommu_group} != {nvcx.nvgpu.iommu_group}" - ) + if not nvcx.check(topo.virt): + fatal = True + + if fatal: + sys.exit(100) + +# ------------------------------------------------------------------- +def args_dump(parser): + pass + + +def cmd_dump(args) -> None: + """Dump the PCI topology to a file that can be used as input""" + sd_json: List[Dict[str, Any]] = [] + for fn in sorted(os.listdir(DEVDIR)): + sd = SysfsDevice(fn) + if any(d.match(sd.modalias) for d in dump_ignored): + continue + sd_json.append(sd.to_dict()) + json.dump(sd_json, sys.stdout, indent=4) # -------------------------------------------------------------------