diff --git a/README.md b/README.md
index 370005ee..9b3b25ae 100644
--- a/README.md
+++ b/README.md
@@ -116,7 +116,7 @@ options:
Comma-separated built-in names and/or plugin config
JSON paths (e.g. --plugin-
configs=NodeStatus,/path/c.json). Built-ins:
- NodeStatus, AllPlugins (default: None)
+ AllPlugins, NodeStatus (default: None)
--system-config STRING
Path to system config json (default: None)
--connection-config STRING
diff --git a/docs/PLUGIN_DOC.md b/docs/PLUGIN_DOC.md
index 3b6c3a3f..6e9f8a99 100644
--- a/docs/PLUGIN_DOC.md
+++ b/docs/PLUGIN_DOC.md
@@ -4,7 +4,7 @@
| Plugin | Collection | Analyzer Args | Collection Args | DataModel | Collector | Analyzer |
| --- | --- | --- | --- | --- | --- | --- |
-| AmdSmiPlugin | bad-pages
firmware --json
list --json
metric -g all
partition --json
process --json
ras --cper --folder={folder}
ras --afid --cper-file {cper_file}
static -g all --json
static -g {gpu_id} --json
topology
version --json
xgmi -l
xgmi -m | **Analyzer Args:**
- `check_static_data`: bool — If True, run static data checks (e.g. driver version, partition mode).
- `expected_gpu_processes`: Optional[int] — Expected number of GPU processes.
- `expected_max_power`: Optional[int] — Expected maximum power value (e.g. watts).
- `expected_driver_version`: Optional[str] — Expected AMD driver version string.
- `expected_memory_partition_mode`: Optional[str] — Expected memory partition mode (e.g. sp3, dp).
- `expected_compute_partition_mode`: Optional[str] — Expected compute partition mode.
- `expected_firmware_versions`: Optional[dict[str, str]] — Expected firmware versions keyed by amd-smi fw_id (e.g. PLDM_BUNDLE).
- `l0_to_recovery_count_error_threshold`: Optional[int] — L0-to-recovery count above which an error is raised.
- `l0_to_recovery_count_warning_threshold`: Optional[int] — L0-to-recovery count above which a warning is raised.
- `vendorid_ep`: Optional[str] — Expected endpoint vendor ID (e.g. for PCIe).
- `vendorid_ep_vf`: Optional[str] — Expected endpoint VF vendor ID.
- `devid_ep`: Optional[str] — Expected endpoint device ID.
- `devid_ep_vf`: Optional[str] — Expected endpoint VF device ID.
- `sku_name`: Optional[str] — Expected SKU name string for GPU.
- `expected_xgmi_speed`: Optional[list[float]] — Expected xGMI speed value(s) (e.g. link rate).
- `analysis_range_start`: Optional[datetime.datetime] — Start of time range for time-windowed analysis.
- `analysis_range_end`: Optional[datetime.datetime] — End of time range for time-windowed analysis. | **Collection Args:**
- `analysis_firmware_ids`: Optional[list[str]] — amd-smi fw_id values to record in analysis_ref.firmware_versions
- `cper_file_path`: Optional[str] — Path to CPER folder or file for RAS AFID collection (ras --afid --cper-file). | [AmdSmiDataModel](#AmdSmiDataModel-Model) | [AmdSmiCollector](#Collector-Class-AmdSmiCollector) | [AmdSmiAnalyzer](#Data-Analyzer-Class-AmdSmiAnalyzer) |
+| AmdSmiPlugin | bad-pages
firmware --json
list --json
metric -g all
partition --json
process --json
ras --cper --folder={folder}
ras --afid --cper-file {cper_file}
static -g all --json
static -g {gpu_id} --json
topology
version --json
xgmi -l
xgmi -m | **Analyzer Args:**
- `check_static_data`: bool — If True, run static data checks (e.g. driver version, partition mode).
- `expected_gpu_processes`: Optional[int] — Expected number of GPU processes.
- `expected_max_power`: Optional[int] — Expected maximum power value (e.g. watts).
- `expected_power_management`: Optional[str] — Expected amd-smi metric power_management value per GPU (e.g. DISABLED for active/full power, ENABLED for power-manage...
- `expected_driver_version`: Optional[str] — Expected AMD driver version string.
- `expected_memory_partition_mode`: Optional[str] — Expected memory partition mode (e.g. sp3, dp).
- `expected_compute_partition_mode`: Optional[str] — Expected compute partition mode.
- `expected_firmware_versions`: Optional[dict[str, str]] — Expected firmware versions keyed by amd-smi fw_id (e.g. PLDM_BUNDLE).
- `l0_to_recovery_count_error_threshold`: Optional[int] — L0-to-recovery count above which an error is raised.
- `l0_to_recovery_count_warning_threshold`: Optional[int] — L0-to-recovery count above which a warning is raised.
- `vendorid_ep`: Optional[str] — Expected endpoint vendor ID (e.g. for PCIe).
- `vendorid_ep_vf`: Optional[str] — Expected endpoint VF vendor ID.
- `devid_ep`: Optional[str] — Expected endpoint device ID.
- `devid_ep_vf`: Optional[str] — Expected endpoint VF device ID.
- `sku_name`: Optional[str] — Expected SKU name string for GPU.
- `expected_xgmi_speed`: Optional[list[float]] — Expected xGMI speed value(s) (e.g. link rate).
- `analysis_range_start`: Optional[datetime.datetime] — Start of time range for time-windowed analysis.
- `analysis_range_end`: Optional[datetime.datetime] — End of time range for time-windowed analysis. | **Collection Args:**
- `analysis_firmware_ids`: Optional[list[str]] — amd-smi fw_id values to record in analysis_ref.firmware_versions
- `cper_file_path`: Optional[str] — Path to CPER folder or file for RAS AFID collection (ras --afid --cper-file). | [AmdSmiDataModel](#AmdSmiDataModel-Model) | [AmdSmiCollector](#Collector-Class-AmdSmiCollector) | [AmdSmiAnalyzer](#Data-Analyzer-Class-AmdSmiAnalyzer) |
| BiosPlugin | sh -c 'cat /sys/devices/virtual/dmi/id/bios_version'
wmic bios get SMBIOSBIOSVersion /Value | **Analyzer Args:**
- `exp_bios_version`: list[str] — Expected BIOS version(s) to match against collected value (str or list).
- `regex_match`: bool — If True, match exp_bios_version as regex; otherwise exact match. | - | [BiosDataModel](#BiosDataModel-Model) | [BiosCollector](#Collector-Class-BiosCollector) | [BiosAnalyzer](#Data-Analyzer-Class-BiosAnalyzer) |
| CmdlinePlugin | cat /proc/cmdline | **Analyzer Args:**
- `required_cmdline`: Union[str, List] — Command-line parameters that must be present (e.g. 'pci=bfsort').
- `banned_cmdline`: Union[str, List] — Command-line parameters that must not be present.
- `os_overrides`: Dict[str, nodescraper.plugins.inband.cmdline.cmdlineconfig.OverrideConfig] — Per-OS overrides for required_cmdline and banned_cmdline (keyed by OS identifier).
- `platform_overrides`: Dict[str, nodescraper.plugins.inband.cmdline.cmdlineconfig.OverrideConfig] — Per-platform overrides for required_cmdline and banned_cmdline (keyed by platform). | - | [CmdlineDataModel](#CmdlineDataModel-Model) | [CmdlineCollector](#Collector-Class-CmdlineCollector) | [CmdlineAnalyzer](#Data-Analyzer-Class-CmdlineAnalyzer) |
| DeviceEnumerationPlugin | powershell -Command "(Get-WmiObject -Class Win32_Processor | Measure-Object).Count"
lspci -d {vendorid_ep}: | grep -iE 'VGA|Display|3D|Processing accelerators|Co-processor|Accelerator' | grep -vi 'Virtual Function' | wc -l
powershell -Command "(wmic path win32_VideoController get name | findstr AMD | Measure-Object).Count"
lscpu
lshw
lspci -d {vendorid_ep}: | grep -i 'Virtual Function' | wc -l
powershell -Command "(Get-VMHostPartitionableGpu | Measure-Object).Count" | **Analyzer Args:**
- `cpu_count`: Optional[list[int]] — Expected CPU count(s); pass as int or list of ints. Analysis passes if actual is in list.
- `gpu_count`: Optional[list[int]] — Expected GPU count(s); pass as int or list of ints. Analysis passes if actual is in list.
- `vf_count`: Optional[list[int]] — Expected virtual function count(s); pass as int or list of ints. Analysis passes if actual is in list. | - | [DeviceEnumerationDataModel](#DeviceEnumerationDataModel-Model) | [DeviceEnumerationCollector](#Collector-Class-DeviceEnumerationCollector) | [DeviceEnumerationAnalyzer](#Data-Analyzer-Class-DeviceEnumerationAnalyzer) |
@@ -1755,6 +1755,7 @@ Check sysctl matches expected sysctl details
- **check_static_data**: `bool` — If True, run static data checks (e.g. driver version, partition mode).
- **expected_gpu_processes**: `Optional[int]` — Expected number of GPU processes.
- **expected_max_power**: `Optional[int]` — Expected maximum power value (e.g. watts).
+- **expected_power_management**: `Optional[str]` — Expected amd-smi metric power_management value per GPU (e.g. DISABLED for active/full power, ENABLED for power-managed idle).
- **expected_driver_version**: `Optional[str]` — Expected AMD driver version string.
- **expected_memory_partition_mode**: `Optional[str]` — Expected memory partition mode (e.g. sp3, dp).
- **expected_compute_partition_mode**: `Optional[str]` — Expected compute partition mode.
diff --git a/nodescraper/cli/cli.py b/nodescraper/cli/cli.py
index 9890ef39..129ef136 100644
--- a/nodescraper/cli/cli.py
+++ b/nodescraper/cli/cli.py
@@ -64,7 +64,7 @@
from nodescraper.connection.redfish.redfish_params import RedfishConnectionParams
from nodescraper.constants import DEFAULT_LOGGER
from nodescraper.enums import ExecutionStatus, SystemInteractionLevel, SystemLocation
-from nodescraper.models import PluginConfig, SystemInfo
+from nodescraper.models import SystemInfo
from nodescraper.pluginexecutor import PluginExecutor
from nodescraper.pluginregistry import PluginRegistry
@@ -74,17 +74,16 @@ def _parse_plugin_configs_csv(value: str) -> list[str]:
return [p.strip() for p in value.split(",") if p.strip()]
-def _config_registry_with_all_plugins(plugin_reg: PluginRegistry) -> ConfigRegistry:
- """Synthetic ``AllPlugins`` config used for CLI help and :func:`build_global_argument_parser`."""
- config_reg = ConfigRegistry()
- config_reg.configs["AllPlugins"] = PluginConfig(
- name="AllPlugins",
- desc="Run all registered plugins with default arguments",
- global_args={},
- plugins={name: {} for name in plugin_reg.plugins},
- result_collators={},
- )
- return config_reg
+def _default_config_registry(_plugin_reg: PluginRegistry) -> ConfigRegistry:
+ """Build the config registry from bundled JSON and plugin-config entry points.
+
+ Args:
+ _plugin_reg (PluginRegistry): Unused; retained for call-site compatibility.
+
+ Returns:
+ ConfigRegistry: Registry containing bundled and entry-point plugin configs.
+ """
+ return ConfigRegistry()
def _add_cli_root_globals(
@@ -203,7 +202,7 @@ def _add_cli_root_globals(
def build_global_argument_parser(*, add_help: bool = True) -> argparse.ArgumentParser:
"""Globals only (no subcommands), for host CLIs."""
plugin_reg = PluginRegistry()
- config_reg = _config_registry_with_all_plugins(plugin_reg)
+ config_reg = _default_config_registry(plugin_reg)
parser = argparse.ArgumentParser(
description="node scraper CLI (global options only)",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
@@ -406,7 +405,7 @@ def get_cli_top_level_subcommands() -> tuple[str, ...]:
Tuple of ``subcmd`` subparser names; call ``cache_clear()`` if registries change in-process.
"""
plugin_reg = PluginRegistry()
- config_reg = _config_registry_with_all_plugins(plugin_reg)
+ config_reg = _default_config_registry(plugin_reg)
parser, _plugin_subparser_map = build_parser(plugin_reg, config_reg)
return _top_level_subcommand_names(parser)
@@ -474,7 +473,7 @@ def main(
arg_input = sys.argv[1:]
plugin_reg = PluginRegistry()
- config_reg = _config_registry_with_all_plugins(plugin_reg)
+ config_reg = _default_config_registry(plugin_reg)
parser, plugin_subparser_map = build_parser(plugin_reg, config_reg)
try:
diff --git a/nodescraper/cli/helper.py b/nodescraper/cli/helper.py
index 620c8f38..1156e369 100644
--- a/nodescraper/cli/helper.py
+++ b/nodescraper/cli/helper.py
@@ -211,8 +211,8 @@ def parse_describe(
if not parsed_args.name:
out: list[str] = []
if parsed_args.type == "config":
- out.append("Available built-in configs:")
- for name in config_reg.configs:
+ out.append("Available configs:")
+ for name in sorted(config_reg.configs):
out.append(f" {name}")
elif parsed_args.type == "plugin":
out.append("Available plugins:")
diff --git a/nodescraper/configregistry.py b/nodescraper/configregistry.py
index 2fa3f96d..f6e9f37a 100644
--- a/nodescraper/configregistry.py
+++ b/nodescraper/configregistry.py
@@ -23,30 +23,55 @@
# SOFTWARE.
#
###############################################################################
+from __future__ import annotations
+
+import importlib.metadata
+import inspect
import json
import os
from pathlib import Path
-from typing import Optional
+from typing import Any, Optional
from pydantic import ValidationError
from nodescraper.models import PluginConfig
+PLUGIN_CONFIG_ENTRY_POINT_GROUP = "nodescraper.plugin_configs"
+
+
+class PluginConfigEntryPointError(RuntimeError):
+ """Raised when a ``nodescraper.plugin_configs`` entry point cannot be loaded."""
+
class ConfigRegistry:
"""Class to load json plugin configs into models"""
INTERNAL_SEARCH_PATH = os.path.join(os.path.dirname(__file__), "configs")
- def __init__(self, config_path: Optional[str] = None) -> None:
- self.configs = {}
+ def __init__(
+ self,
+ config_path: Optional[str] = None,
+ load_entry_point_configs: bool = True,
+ ) -> None:
+ """Initialize the config registry.
+
+ Args:
+ config_path (Optional[str], optional): Path in which to search for JSON config files.
+ Defaults to None.
+ load_entry_point_configs (bool, optional): Whether to load ``nodescraper.plugin_configs``
+ entry points. Defaults to True.
+ """
+ self.configs: dict[str, PluginConfig] = {}
self.load_configs(config_path)
+ if load_entry_point_configs:
+ self.configs.update(self.load_plugin_configs_from_entry_points())
def load_configs(self, config_path: Optional[str] = None):
- """load plugin config json files into pydantic models
+ """Load plugin config JSON files into pydantic models.
Args:
- config_path (Optional[str], optional): Path in which to search for config files. Defaults to None.
+ config_path (Optional[str], optional): Path in which to search for config files.
+ Defaults to None.
"""
if not config_path:
config_path = self.INTERNAL_SEARCH_PATH
@@ -64,3 +89,95 @@ def load_configs(self, config_path: Optional[str] = None):
self.configs[config_file.name] = config_model
except (ValidationError, json.JSONDecodeError):
pass
+
+ @staticmethod
+ def _entry_points_for_group(group: str):
+ """Return setuptools entry points for the given group name.
+
+ Args:
+ group (str): Entry point group to query.
+
+ Returns:
+ Iterable: Entry points registered under ``group``.
+ """
+ try:
+ return importlib.metadata.entry_points(group=group) # type: ignore[call-arg]
+ except TypeError:
+ all_eps = importlib.metadata.entry_points() # type: ignore[assignment]
+ return all_eps.get(group, []) # type: ignore[assignment, attr-defined, arg-type]
+
+ @staticmethod
+ def _resolve_entry_point_config(loaded: Any) -> PluginConfig | dict[str, Any] | None:
+ """Resolve a loaded entry point object into a plugin config.
+
+ Args:
+ loaded (Any): Object returned by an entry point ``load()`` call.
+
+ Returns:
+ Optional[PluginConfig | dict[str, Any]]: Plugin config, or None if ``loaded`` is unsupported.
+ """
+ if isinstance(loaded, PluginConfig):
+ return loaded
+ if isinstance(loaded, dict):
+ return loaded
+ if inspect.isclass(loaded) and hasattr(loaded, "plugin_config"):
+ config_data = loaded.plugin_config()
+ elif callable(loaded):
+ config_data = loaded()
+ else:
+ return None
+
+ if isinstance(config_data, (PluginConfig, dict)):
+ return config_data
+ return None
+
+ @classmethod
+ def load_plugin_configs_from_entry_points(cls) -> dict[str, PluginConfig]:
+ """Load plugin configs registered under ``nodescraper.plugin_configs`` entry points.
+
+ Returns:
+ dict[str, PluginConfig]: Map of config name to loaded :class:`~nodescraper.models.PluginConfig`.
+
+ Raises:
+ PluginConfigEntryPointError: If an entry point target is missing, invalid, or unsupported.
+ """
+ configs: dict[str, PluginConfig] = {}
+
+ for entry_point in cls._entry_points_for_group(PLUGIN_CONFIG_ENTRY_POINT_GROUP):
+ entry_point_name = getattr(entry_point, "name", None)
+ entry_point_label = entry_point_name or ""
+ try:
+ loaded = entry_point.load() # type: ignore[attr-defined]
+ config_data = cls._resolve_entry_point_config(loaded)
+ if config_data is None:
+ raise PluginConfigEntryPointError(
+ f"Failed to load plugin config entry point {entry_point_label!r}: "
+ f"unsupported target {loaded!r}"
+ )
+
+ config_model = (
+ config_data
+ if isinstance(config_data, PluginConfig)
+ else PluginConfig(**config_data)
+ )
+ config_key = entry_point_name or config_model.name
+ if config_key:
+ configs[config_key] = config_model
+ except ModuleNotFoundError as exc:
+ raise PluginConfigEntryPointError(
+ f"Failed to load plugin config entry point {entry_point_label!r}: "
+ f"module not found ({exc}). Check the entry point target in pyproject.toml."
+ ) from exc
+ except ValidationError as exc:
+ raise PluginConfigEntryPointError(
+ f"Failed to load plugin config entry point {entry_point_label!r}: "
+ "invalid plugin config"
+ ) from exc
+ except PluginConfigEntryPointError:
+ raise
+ except Exception as exc:
+ raise PluginConfigEntryPointError(
+ f"Failed to load plugin config entry point {entry_point_label!r}: {exc}"
+ ) from exc
+
+ return configs
diff --git a/nodescraper/configs/node_status.json b/nodescraper/configs/node_status.json
deleted file mode 100644
index e68ce555..00000000
--- a/nodescraper/configs/node_status.json
+++ /dev/null
@@ -1,19 +0,0 @@
-{
- "name": "NodeStatus",
- "desc": "Check configuration and status of the node",
- "global_args": {},
- "plugins": {
- "BiosPlugin": {},
- "CmdlinePlugin": {},
- "DimmPlugin": {},
- "DkmsPlugin": {},
- "DmesgPlugin": {},
- "KernelPlugin": {},
- "MemoryPlugin": {},
- "OsPlugin": {},
- "RocmPlugin": {},
- "StoragePlugin": {},
- "UptimePlugin": {}
- },
- "result_collators": {}
-}
diff --git a/nodescraper/models/pluginconfig.py b/nodescraper/models/pluginconfig.py
index 8af60682..a060530f 100644
--- a/nodescraper/models/pluginconfig.py
+++ b/nodescraper/models/pluginconfig.py
@@ -23,7 +23,9 @@
# SOFTWARE.
#
###############################################################################
-from typing import Optional
+from __future__ import annotations
+
+from typing import Any, Optional
from pydantic import BaseModel, Field
@@ -36,3 +38,31 @@ class PluginConfig(BaseModel):
result_collators: dict[str, dict] = Field(default_factory=dict)
name: Optional[str] = None
desc: Optional[str] = None
+
+ @classmethod
+ def coerce(cls, config: PluginConfig | dict[str, Any]) -> PluginConfig:
+ """Return a ``PluginConfig`` instance from a model or mapping."""
+ if isinstance(config, cls):
+ return config
+ return cls.model_validate(config)
+
+ @classmethod
+ def merge(cls, *configs: PluginConfig | dict[str, Any]) -> PluginConfig:
+ """Merge recipe plugin configs.
+
+ Plugin entries from later configs overwrite earlier ones with the same name.
+ ``name``, ``desc``, ``global_args``, and ``result_collators`` come from the first
+ config.
+ """
+ normalized = [cls.coerce(config) for config in configs]
+ merged_plugins: dict[str, dict[str, Any]] = {}
+ for config in normalized:
+ merged_plugins.update(config.plugins)
+ first = normalized[0] if normalized else cls()
+ return cls(
+ name=first.name,
+ desc=first.desc,
+ global_args=first.global_args,
+ plugins=merged_plugins,
+ result_collators=first.result_collators,
+ )
diff --git a/nodescraper/pluginrecipe/__init__.py b/nodescraper/pluginrecipe/__init__.py
new file mode 100644
index 00000000..b37e91fd
--- /dev/null
+++ b/nodescraper/pluginrecipe/__init__.py
@@ -0,0 +1,51 @@
+###############################################################################
+#
+# MIT License
+#
+# Copyright (c) 2025 Advanced Micro Devices, Inc.
+#
+###############################################################################
+from nodescraper.models import PluginConfig
+
+from .all_plugins import AllPlugins
+from .discovery import (
+ load_plugin_class,
+ plugin_has_analyzer,
+ plugin_has_collector,
+ plugin_names_matching,
+ plugins_with_analyzer,
+ plugins_with_collector,
+ registered_plugin_names,
+)
+from .node_status import NodeStatus
+from .pluginrecipe import (
+ ANALYZE_ONLY,
+ COLLECT_AND_ANALYZE,
+ COLLECT_ONLY,
+ AnalyzerOnlyPluginRecipe,
+ CollectorOnlyPluginRecipe,
+ PluginRecipe,
+ PluginRunFlags,
+ merge_plugin_configs,
+)
+
+__all__ = [
+ "ANALYZE_ONLY",
+ "COLLECT_AND_ANALYZE",
+ "COLLECT_ONLY",
+ "AllPlugins",
+ "AnalyzerOnlyPluginRecipe",
+ "CollectorOnlyPluginRecipe",
+ "NodeStatus",
+ "PluginConfig",
+ "PluginRecipe",
+ "PluginRunFlags",
+ "load_plugin_class",
+ "merge_plugin_configs",
+ "plugin_has_analyzer",
+ "plugin_has_collector",
+ "plugin_names_matching",
+ "plugins_with_analyzer",
+ "plugins_with_collector",
+ "registered_plugin_names",
+]
diff --git a/nodescraper/pluginrecipe/all_plugins.py b/nodescraper/pluginrecipe/all_plugins.py
new file mode 100644
index 00000000..491ced5d
--- /dev/null
+++ b/nodescraper/pluginrecipe/all_plugins.py
@@ -0,0 +1,24 @@
+###############################################################################
+#
+# MIT License
+#
+# Copyright (c) 2025 Advanced Micro Devices, Inc.
+#
+###############################################################################
+from __future__ import annotations
+
+from .discovery import registered_plugin_names
+from .pluginrecipe import PluginRecipe
+
+
+class AllPlugins(PluginRecipe):
+ """Run all registered plugins with default arguments."""
+
+ @classmethod
+ def plugin_names(cls) -> tuple[str, ...]:
+ """Return every plugin registered at runtime.
+
+ Returns:
+ tuple[str, ...]: Sorted names of all plugins in the plugin registry.
+ """
+ return registered_plugin_names()
diff --git a/nodescraper/pluginrecipe/discovery.py b/nodescraper/pluginrecipe/discovery.py
new file mode 100644
index 00000000..aebeea3c
--- /dev/null
+++ b/nodescraper/pluginrecipe/discovery.py
@@ -0,0 +1,98 @@
+###############################################################################
+#
+# MIT License
+#
+# Copyright (c) 2025 Advanced Micro Devices, Inc.
+#
+###############################################################################
+from __future__ import annotations
+
+from typing import Iterable
+
+
+def registered_plugin_names() -> tuple[str, ...]:
+ """Return all plugin names known to :class:`~nodescraper.pluginregistry.PluginRegistry`.
+
+ Returns:
+ tuple[str, ...]: Sorted registered plugin names.
+ """
+ from nodescraper.pluginregistry import PluginRegistry
+
+ return tuple(sorted(PluginRegistry().plugins))
+
+
+def plugin_names_matching(names: Iterable[str]) -> tuple[str, ...]:
+ """Return plugin names from ``names`` that are registered at runtime.
+
+ Args:
+ names (Iterable[str]): Candidate plugin names to filter.
+
+ Returns:
+ tuple[str, ...]: Sorted subset of ``names`` present in the plugin registry.
+ """
+ available = set(registered_plugin_names())
+ return tuple(sorted(name for name in names if name in available))
+
+
+def load_plugin_class(plugin_name: str) -> type | None:
+ """Return a registered plugin class by name.
+
+ Args:
+ plugin_name (str): Registered plugin name.
+
+ Returns:
+ type | None: Plugin class, or ``None`` if the name is not registered.
+ """
+ from nodescraper.pluginregistry import PluginRegistry
+
+ return PluginRegistry().plugins.get(plugin_name)
+
+
+def plugin_has_collector(plugin_name: str) -> bool:
+ """Return whether the plugin exposes a collector task.
+
+ Args:
+ plugin_name (str): Registered plugin name.
+
+ Returns:
+ bool: ``True`` when the plugin class defines ``COLLECTOR``.
+ """
+ plugin_class = load_plugin_class(plugin_name)
+ return plugin_class is not None and getattr(plugin_class, "COLLECTOR", None) is not None
+
+
+def plugin_has_analyzer(plugin_name: str) -> bool:
+ """Return whether the plugin exposes an analyzer task.
+
+ Args:
+ plugin_name (str): Registered plugin name.
+
+ Returns:
+ bool: ``True`` when the plugin class defines ``ANALYZER``.
+ """
+ plugin_class = load_plugin_class(plugin_name)
+ return plugin_class is not None and getattr(plugin_class, "ANALYZER", None) is not None
+
+
+def plugins_with_collector(plugin_names: Iterable[str]) -> tuple[str, ...]:
+ """Filter plugin names to those that define a collector.
+
+ Args:
+ plugin_names (Iterable[str]): Candidate plugin names.
+
+ Returns:
+ tuple[str, ...]: Sorted plugin names with a ``COLLECTOR`` implementation.
+ """
+ return tuple(sorted(name for name in plugin_names if plugin_has_collector(name)))
+
+
+def plugins_with_analyzer(plugin_names: Iterable[str]) -> tuple[str, ...]:
+ """Filter plugin names to those that define an analyzer.
+
+ Args:
+ plugin_names (Iterable[str]): Candidate plugin names.
+
+ Returns:
+ tuple[str, ...]: Sorted plugin names with an ``ANALYZER`` implementation.
+ """
+ return tuple(sorted(name for name in plugin_names if plugin_has_analyzer(name)))
diff --git a/nodescraper/pluginrecipe/node_status.py b/nodescraper/pluginrecipe/node_status.py
new file mode 100644
index 00000000..42be8d01
--- /dev/null
+++ b/nodescraper/pluginrecipe/node_status.py
@@ -0,0 +1,38 @@
+###############################################################################
+#
+# MIT License
+#
+# Copyright (c) 2025 Advanced Micro Devices, Inc.
+#
+###############################################################################
+from __future__ import annotations
+
+from .discovery import plugin_names_matching
+from .pluginrecipe import PluginRecipe
+
+_NODE_STATUS_PLUGINS = (
+ "BiosPlugin",
+ "CmdlinePlugin",
+ "DimmPlugin",
+ "DkmsPlugin",
+ "DmesgPlugin",
+ "KernelPlugin",
+ "MemoryPlugin",
+ "OsPlugin",
+ "RocmPlugin",
+ "StoragePlugin",
+ "UptimePlugin",
+)
+
+
+class NodeStatus(PluginRecipe):
+ """Check configuration and status of the node."""
+
+ @classmethod
+ def plugin_names(cls) -> tuple[str, ...]:
+ """Return the NodeStatus plugin set resolved at runtime.
+
+ Returns:
+ tuple[str, ...]: Sorted node-status plugin names registered in the plugin registry.
+ """
+ return plugin_names_matching(_NODE_STATUS_PLUGINS)
diff --git a/nodescraper/pluginrecipe/pluginrecipe.py b/nodescraper/pluginrecipe/pluginrecipe.py
new file mode 100644
index 00000000..e9b5a3ba
--- /dev/null
+++ b/nodescraper/pluginrecipe/pluginrecipe.py
@@ -0,0 +1,177 @@
+###############################################################################
+#
+# MIT License
+#
+# Copyright (c) 2025 Advanced Micro Devices, Inc.
+#
+###############################################################################
+from __future__ import annotations
+
+import abc
+from dataclasses import dataclass
+from typing import Any, Iterable
+
+from nodescraper.models import PluginConfig
+
+
+@dataclass(frozen=True)
+class PluginRunFlags:
+ """Collection/analysis toggles passed to nodescraper ``DataPlugin.run``."""
+
+ collection: bool = True
+ analysis: bool = True
+
+ def as_config(self) -> dict[str, bool]:
+ """Return nodescraper per-plugin config fields.
+
+ Returns:
+ dict[str, bool]: ``collection`` and ``analysis`` entries for a plugin config.
+ """
+ return {"collection": self.collection, "analysis": self.analysis}
+
+
+COLLECT_ONLY = PluginRunFlags(collection=True, analysis=False)
+ANALYZE_ONLY = PluginRunFlags(collection=False, analysis=True)
+COLLECT_AND_ANALYZE = PluginRunFlags(collection=True, analysis=True)
+
+
+class PluginRecipe(abc.ABC):
+ """Parent class for node-scraper plugin configuration recipes."""
+
+ DEFAULT_FLAGS: PluginRunFlags = COLLECT_AND_ANALYZE
+
+ @classmethod
+ @abc.abstractmethod
+ def plugin_names(cls) -> tuple[str, ...]:
+ """Return plugin names for this recipe.
+
+ Returns:
+ tuple[str, ...]: Registered plugin names included in this recipe.
+ """
+
+ @classmethod
+ def name(cls) -> str:
+ """Return the config identifier for this recipe.
+
+ Returns:
+ str: Recipe class name used as the config key.
+ """
+ return cls.__name__
+
+ @classmethod
+ def description(cls) -> str:
+ """Return the human-readable recipe description.
+
+ Returns:
+ str: Class docstring text, or an empty string if none is set.
+ """
+ if cls.__doc__:
+ return cls.__doc__.strip()
+ return ""
+
+ @classmethod
+ def flags_for_plugin(cls, plugin_name: str) -> PluginRunFlags:
+ """Return collection/analysis flags for one plugin.
+
+ Args:
+ plugin_name (str): Registered plugin name.
+
+ Returns:
+ PluginRunFlags: Flags to embed in the plugin config entry.
+ """
+ return cls.DEFAULT_FLAGS
+
+ @classmethod
+ def extra_plugin_args(cls, plugin_name: str) -> dict[str, Any]:
+ """Return additional per-plugin config fields beyond collection/analysis.
+
+ Args:
+ plugin_name (str): Registered plugin name.
+
+ Returns:
+ dict[str, Any]: Extra plugin config kwargs merged into the entry dict.
+ """
+ del plugin_name
+ return {}
+
+ @classmethod
+ def plugin_entry(cls, plugin_name: str) -> dict[str, Any]:
+ """Build the nodescraper plugin config entry for one plugin.
+
+ Args:
+ plugin_name (str): Registered plugin name.
+
+ Returns:
+ dict[str, Any]: Per-plugin config passed to node-scraper.
+ """
+ entry: dict[str, Any] = dict(cls.flags_for_plugin(plugin_name).as_config())
+ entry.update(cls.extra_plugin_args(plugin_name))
+ return entry
+
+ @classmethod
+ def plugin_config(cls) -> PluginConfig:
+ """Build a node-scraper plugin config at runtime.
+
+ Returns:
+ PluginConfig: Plugin config with ``name``, ``desc``, ``global_args``,
+ ``plugins``, and ``result_collators`` fields.
+ """
+ return PluginConfig(
+ name=cls.name(),
+ desc=cls.description(),
+ plugins={
+ plugin_name: cls.plugin_entry(plugin_name) for plugin_name in cls.plugin_names()
+ },
+ )
+
+
+class CollectorOnlyPluginRecipe(PluginRecipe):
+ """Recipe base for collector-only plugin configs."""
+
+ DEFAULT_FLAGS = COLLECT_ONLY
+
+ @classmethod
+ def filter_plugin_names(cls, names: Iterable[str]) -> tuple[str, ...]:
+ """Keep only plugins that expose a collector task.
+
+ Args:
+ names (Iterable[str]): Candidate plugin names.
+
+ Returns:
+ tuple[str, ...]: Sorted names with a ``COLLECTOR`` implementation.
+ """
+ from .discovery import plugins_with_collector
+
+ return plugins_with_collector(names)
+
+
+class AnalyzerOnlyPluginRecipe(PluginRecipe):
+ """Recipe base for analyzer-only plugin configs."""
+
+ DEFAULT_FLAGS = ANALYZE_ONLY
+
+ @classmethod
+ def filter_plugin_names(cls, names: Iterable[str]) -> tuple[str, ...]:
+ """Keep only plugins that expose an analyzer task.
+
+ Args:
+ names (Iterable[str]): Candidate plugin names.
+
+ Returns:
+ tuple[str, ...]: Sorted names with an ``ANALYZER`` implementation.
+ """
+ from .discovery import plugins_with_analyzer
+
+ return plugins_with_analyzer(names)
+
+
+def merge_plugin_configs(*configs: PluginConfig | dict[str, Any]) -> PluginConfig:
+ """Merge plugin configs, combining their ``plugins`` entries.
+
+ Args:
+ *configs: Plugin configs to merge.
+
+ Returns:
+ PluginConfig: Combined config with merged ``plugins`` entries.
+ """
+ return PluginConfig.merge(*configs)
diff --git a/nodescraper/plugins/inband/amdsmi/amdsmi_analyzer.py b/nodescraper/plugins/inband/amdsmi/amdsmi_analyzer.py
index 9a9cea71..da427753 100644
--- a/nodescraper/plugins/inband/amdsmi/amdsmi_analyzer.py
+++ b/nodescraper/plugins/inband/amdsmi/amdsmi_analyzer.py
@@ -25,7 +25,7 @@
###############################################################################
import io
from collections import defaultdict
-from typing import Any, Optional, Union
+from typing import Any, Mapping, Optional, Union
from nodescraper.enums import EventCategory, EventPriority
from nodescraper.interfaces import DataAnalyzer
@@ -45,11 +45,129 @@
from .cper import CperAnalysisTaskMixin
+def _gpu_mismatch_description(
+ check_name: str,
+ expected: object,
+ actual_by_gpu: Mapping[int, object],
+ *,
+ unit: str = "",
+) -> tuple[str, str]:
+ """Return (event description, console details) for per-GPU expected vs actual."""
+ gpu_ids = sorted(actual_by_gpu.keys())
+ exp = f"{expected}{unit}" if unit else str(expected)
+ per_gpu_short = ", ".join(f"GPU {gpu}={actual_by_gpu[gpu]}{unit}" for gpu in gpu_ids)
+ description = f"{check_name} mismatch (expected {exp}): {per_gpu_short}"
+ details = "; ".join(
+ f"GPU {gpu}: expected {exp}, actual {actual_by_gpu[gpu]}{unit}" for gpu in gpu_ids
+ )
+ return description, details
+
+
+def _gpu_unavailable_description(
+ check_name: str,
+ gpu_ids: list[int],
+ *,
+ expected: Optional[object] = None,
+) -> tuple[str, str]:
+ """Return (event description, console details) when a metric is missing per GPU."""
+ gpus = sorted(gpu_ids)
+ gpu_text = ", ".join(f"GPU {g}" for g in gpus)
+ if expected is not None:
+ description = f"{check_name} not available on {gpu_text} (expected {expected})"
+ else:
+ description = f"{check_name} not available on {gpu_text}"
+ return description, description
+
+
+def _static_mismatch_description(payload: dict[str, Any]) -> tuple[str, str]:
+ """Build description/details from ``check_static_data`` per-GPU payload."""
+ per_gpu = payload.get("per_gpu") or []
+ if not per_gpu:
+ return "amd-smi static data mismatch", "amd-smi static data mismatch"
+ parts: list[str] = []
+ for entry in per_gpu:
+ gpu = entry["gpu"]
+ for mm in entry.get("mismatches") or []:
+ parts.append(
+ f"GPU {gpu} {mm['field']}: expected {mm['expected']!s}, actual {mm['actual']!s}"
+ )
+ details = "; ".join(parts)
+ summary = payload.get("summary") or {}
+ gpus_affected = summary.get("gpus_affected", len(per_gpu))
+ description = f"amd-smi static data mismatch on {gpus_affected} GPU(s): {details}"
+ return description, details
+
+
class AmdSmiAnalyzer(CperAnalysisTaskMixin, DataAnalyzer[AmdSmiDataModel, None]):
"""Check AMD SMI Application data for PCIe, ECC errors, and CPER data."""
DATA_MODEL = AmdSmiDataModel
+ def check_expected_power_management(
+ self,
+ amdsmi_metric_data: list[AmdSmiMetric],
+ expected_power_management: str,
+ ) -> None:
+ """Check amd-smi metric ``power_management`` matches the expected value per GPU.
+
+ Args:
+ amdsmi_metric_data: Per-GPU metric data from ``amd-smi metric``.
+ expected_power_management: Expected value (e.g. ``DISABLED``).
+ """
+ expected = expected_power_management.strip().upper()
+ mismatches: dict[int, str] = {}
+ missing: list[int] = []
+
+ for metric in amdsmi_metric_data:
+ gpu = metric.gpu
+ actual_raw = metric.power.power_management
+ if actual_raw is None:
+ missing.append(gpu)
+ continue
+ actual_str = str(actual_raw).strip()
+ if actual_str.upper() in {"N/A", "NA", ""}:
+ missing.append(gpu)
+ continue
+ if actual_str.upper() != expected:
+ mismatches[gpu] = actual_str
+
+ if missing:
+ desc, details = _gpu_unavailable_description(
+ "GPU power_management",
+ missing,
+ expected=expected_power_management,
+ )
+ self._log_event(
+ category=EventCategory.PLATFORM,
+ description=desc,
+ priority=EventPriority.WARNING,
+ data={
+ "gpus": missing,
+ "expected_power_management": expected_power_management,
+ "details": details,
+ },
+ console_log=True,
+ )
+
+ if mismatches:
+ desc, details = _gpu_mismatch_description(
+ "GPU power_management",
+ expected_power_management,
+ mismatches,
+ )
+ self._log_event(
+ category=EventCategory.PLATFORM,
+ description=desc,
+ priority=EventPriority.ERROR,
+ data={
+ "gpus": list(mismatches.keys()),
+ "power_management_values": mismatches,
+ "expected_power_management": expected_power_management,
+ "details": details,
+ },
+ console_log=True,
+ )
+
def check_expected_max_power(
self,
amdsmi_static_data: list[AmdSmiStatic],
@@ -63,15 +181,25 @@ def check_expected_max_power(
"""
incorrect_max_power_gpus: dict[int, Union[int, str, float]] = {}
for gpu in amdsmi_static_data:
- if gpu.limit is None or gpu.limit.max_power is None:
+ limit = gpu.limit
+ max_power_vu = limit.resolved_max_power() if limit is not None else None
+ if max_power_vu is None or max_power_vu.value is None:
self._log_event(
category=EventCategory.PLATFORM,
- description=f"GPU: {gpu.gpu} has no max power limit set",
+ description=f"GPU {gpu.gpu}: max power limit not set (expected {expected_max_power} W)",
priority=EventPriority.WARNING,
- data={"gpu": gpu.gpu},
+ data={
+ "gpu": gpu.gpu,
+ "expected_max_power": expected_max_power,
+ "details": (
+ f"GPU {gpu.gpu}: max power limit not set "
+ f"(expected {expected_max_power} W)"
+ ),
+ },
+ console_log=True,
)
continue
- max_power_value = gpu.limit.max_power.value
+ max_power_value = max_power_vu.value
try:
max_power_float = float(max_power_value)
except ValueError:
@@ -88,15 +216,23 @@ def check_expected_max_power(
if max_power_float != expected_max_power:
incorrect_max_power_gpus[gpu.gpu] = max_power_float
if incorrect_max_power_gpus:
+ desc, details = _gpu_mismatch_description(
+ "GPU max power",
+ expected_max_power,
+ incorrect_max_power_gpus,
+ unit=" W",
+ )
self._log_event(
category=EventCategory.PLATFORM,
- description="Max power mismatch",
+ description=desc,
priority=EventPriority.ERROR,
data={
"gpus": list(incorrect_max_power_gpus.keys()),
"max_power_values": incorrect_max_power_gpus,
"expected_max_power": expected_max_power,
+ "details": details,
},
+ console_log=True,
)
def check_expected_driver_version(
@@ -122,15 +258,23 @@ def check_expected_driver_version(
bad_driver_gpus.append(gpu.gpu)
if bad_driver_gpus:
+ bad_versions = {g: versions_by_gpu[g] for g in bad_driver_gpus}
+ desc, details = _gpu_mismatch_description(
+ "GPU driver version",
+ expected_driver_version,
+ bad_versions,
+ )
self._log_event(
category=EventCategory.PLATFORM,
- description="Driver Version Mismatch",
+ description=desc,
priority=EventPriority.ERROR,
data={
"gpus": bad_driver_gpus,
- "driver_version": {g: versions_by_gpu[g] for g in bad_driver_gpus},
+ "driver_version": bad_versions,
"expected_driver_version": expected_driver_version,
+ "details": details,
},
+ console_log=True,
)
def check_amdsmi_metric_pcie(
@@ -283,11 +427,17 @@ def check_amdsmi_metric_ecc_totals(self, amdsmi_metric_data: list[AmdSmiMetric])
for priority, count, desc in ecc_checks:
if count is not None and count > 0:
+ details = f"GPU {gpu}: {desc} count={count}"
self._log_event(
category=EventCategory.RAS,
- description="GPU ECC error count detected",
+ description=details,
priority=priority,
- data={"gpu": gpu, "error_count": count, "error_type": desc},
+ data={
+ "gpu": gpu,
+ "error_count": count,
+ "error_type": desc,
+ "details": details,
+ },
console_log=True,
)
@@ -381,12 +531,19 @@ def expected_gpu_processes(
gpu_exceeds_num_processes[process.gpu] = process_count
if gpu_exceeds_num_processes:
+ desc, details = _gpu_mismatch_description(
+ "GPU process count",
+ max_num_processes,
+ gpu_exceeds_num_processes,
+ )
self._log_event(
category=EventCategory.PLATFORM,
- description="Number of processes exceeds max processes",
+ description=desc,
priority=EventPriority.ERROR,
data={
"gpu_exceeds_num_processes": gpu_exceeds_num_processes,
+ "expected_max_processes": max_num_processes,
+ "details": details,
},
console_log=True,
)
@@ -494,11 +651,14 @@ def check_static_data(
if mismatches:
payload = self._format_static_mismatch_payload(mismatches)
+ desc, details = _static_mismatch_description(payload)
+ payload["details"] = details
self._log_event(
category=EventCategory.PLATFORM,
- description="amd-smi static data mismatch",
+ description=desc,
priority=EventPriority.ERROR,
data=payload,
+ console_log=True,
)
def _format_static_mismatch_payload(
@@ -573,15 +733,27 @@ def check_firmware_versions(
)
if mismatches or missing:
+ detail_parts: list[str] = []
+ for item in mismatches:
+ detail_parts.append(
+ f"GPU {item['gpu']} {item['fw_id']}: "
+ f"expected {item['expected']!s}, actual {item['actual']!s}"
+ )
+ for item in missing:
+ detail_parts.append(f"GPU {item['gpu']} {item['fw_id']}: missing")
+ details = "; ".join(detail_parts)
+ description = f"GPU firmware version mismatch: {details}"
self._log_event(
category=EventCategory.FW,
- description="Firmware version mismatch",
+ description=description,
priority=EventPriority.ERROR,
data={
"expected_firmware_versions": expected_firmware_versions,
"mismatches": mismatches,
"missing": missing,
+ "details": details,
},
+ console_log=True,
)
def check_expected_memory_partition_mode(
@@ -630,15 +802,41 @@ def check_expected_memory_partition_mode(
)
if bad_memory_partition_mode_gpus:
+ mismatch_by_gpu: dict[int, str] = {}
+ for entry in bad_memory_partition_mode_gpus:
+ gpu_id = int(entry["gpu_id"]) # type: ignore[arg-type]
+ if "memory_partition_mode" in entry:
+ mismatch_by_gpu[gpu_id] = (
+ f"memory={entry['memory_partition_mode']!s} "
+ f"(expected {expected_memory_partition_mode!s})"
+ )
+ if "compute_partition_mode" in entry:
+ compute_detail = (
+ f"compute={entry['compute_partition_mode']!s} "
+ f"(expected {expected_compute_partition_mode!s})"
+ )
+ mismatch_by_gpu[gpu_id] = (
+ f"{mismatch_by_gpu[gpu_id]}; {compute_detail}"
+ if gpu_id in mismatch_by_gpu
+ else compute_detail
+ )
+ desc, details = _gpu_mismatch_description(
+ "GPU partition mode",
+ f"memory={expected_memory_partition_mode!s}, "
+ f"compute={expected_compute_partition_mode!s}",
+ mismatch_by_gpu,
+ )
self._log_event(
category=EventCategory.PLATFORM,
- description="Partition Mode Mismatch",
+ description=desc,
priority=EventPriority.ERROR,
data={
"actual_partition_data": bad_memory_partition_mode_gpus,
"expected_memory_partition_mode": expected_memory_partition_mode,
"expected_compute_partition_mode": expected_compute_partition_mode,
+ "details": details,
},
+ console_log=True,
)
def check_expected_xgmi_link_speed(
@@ -674,45 +872,65 @@ def check_expected_xgmi_link_speed(
link_metric = xgmi_data.link_metrics
try:
if link_metric.bit_rate is None or link_metric.bit_rate.value is None:
+ details = (
+ f"GPU {xgmi_data.gpu}: XGMI link speed not available "
+ f"(expected {expected_xgmi_speed} GT/s)"
+ )
self._log_event(
category=EventCategory.IO,
- description="XGMI link speed is not available",
+ description=details,
priority=EventPriority.ERROR,
data={
"gpu": xgmi_data.gpu,
+ "expected_xgmi_speed": expected_xgmi_speed,
"xgmi_bit_rate": (
link_metric.bit_rate.unit if link_metric.bit_rate else "N/A"
),
+ "details": details,
},
+ console_log=True,
)
continue
xgmi_float = float(link_metric.bit_rate.value)
except (ValueError, TypeError):
+ details = (
+ f"GPU {xgmi_data.gpu}: XGMI link speed is not a valid number "
+ f"(expected {expected_xgmi_speed} GT/s)"
+ )
self._log_event(
category=EventCategory.IO,
- description="XGMI link speed is not a valid number",
+ description=details,
priority=EventPriority.ERROR,
data={
"gpu": xgmi_data.gpu,
+ "expected_xgmi_speed": expected_xgmi_speed,
"xgmi_bit_rate": (
link_metric.bit_rate.value if link_metric.bit_rate else "N/A"
),
+ "details": details,
},
+ console_log=True,
)
continue
expected_floats = [float(e) for e in expected_xgmi_speed]
if xgmi_float not in expected_floats:
+ details = (
+ f"GPU {xgmi_data.gpu}: XGMI link speed {xgmi_float} GT/s "
+ f"not in expected {expected_xgmi_speed} GT/s"
+ )
self._log_event(
category=EventCategory.IO,
- description="XGMI link speed is not as expected",
+ description=details,
priority=EventPriority.ERROR,
data={
"expected_xgmi_speed": expected_xgmi_speed,
"xgmi_bit_rate": xgmi_float,
"gpu": xgmi_data.gpu,
+ "details": details,
},
+ console_log=True,
)
def analyze_data(
@@ -733,6 +951,8 @@ def analyze_data(
args = AmdSmiAnalyzerArgs()
if data.metric is not None and len(data.metric) > 0:
+ if args.expected_power_management:
+ self.check_expected_power_management(data.metric, args.expected_power_management)
if args.l0_to_recovery_count_error_threshold is not None:
self.check_amdsmi_metric_pcie(
data.metric,
diff --git a/nodescraper/plugins/inband/amdsmi/amdsmi_collector.py b/nodescraper/plugins/inband/amdsmi/amdsmi_collector.py
index d4f22c46..ef60995a 100644
--- a/nodescraper/plugins/inband/amdsmi/amdsmi_collector.py
+++ b/nodescraper/plugins/inband/amdsmi/amdsmi_collector.py
@@ -60,6 +60,7 @@
StaticClockData,
StaticDriver,
StaticFrequencyLevels,
+ StaticLimit,
StaticNuma,
StaticPolicy,
StaticRas,
@@ -71,6 +72,8 @@
ValueUnit,
XgmiLinks,
XgmiMetrics,
+ normalize_amdsmi_metric_dict,
+ normalize_static_limit_dict,
)
from nodescraper.plugins.inband.amdsmi.collector_args import AmdSmiCollectorArgs
from nodescraper.utils import get_exception_traceback
@@ -408,7 +411,10 @@ def get_metric(self) -> Optional[List[AmdSmiMetric]]:
if isinstance(ret, dict) and "gpu_data" in ret:
ret = ret["gpu_data"]
data = ret if isinstance(ret, list) else [ret]
- built = self._build_amdsmi_sub_data(AmdSmiMetric, data)
+ normalized = [
+ normalize_amdsmi_metric_dict(item) if isinstance(item, dict) else item for item in data
+ ]
+ built = self._build_amdsmi_sub_data(AmdSmiMetric, normalized)
return built if isinstance(built, list) else ([built] if built else [])
def get_xgmi_data(
@@ -519,11 +525,17 @@ def _get_amdsmi_version(self) -> Optional[AmdSmiVersion]:
return None
try:
+ amdgpu_ver = version_data.get("amdgpu_version") or version_data.get("amdgpu")
+ hsmp_ver = version_data.get("amd_hsmp_driver_version") or version_data.get(
+ "amd_hsmp_version"
+ )
return AmdSmiVersion(
tool="amdsmi",
version=version_data.get("amdsmi_library_version", ""),
amdsmi_library_version=version_data.get("amdsmi_library_version", ""),
rocm_version=version_data.get("rocm_version", ""),
+ amdgpu_version=str(amdgpu_ver) if amdgpu_ver not in (None, "") else None,
+ amd_hsmp_driver_version=str(hsmp_ver) if hsmp_ver not in (None, "") else None,
)
except ValidationError as err:
self._log_event(
@@ -902,12 +914,18 @@ def get_static(self) -> Optional[list[AmdSmiStatic]]:
)
# Driver
+ os_kernel = driver.get("os_kernel_version") if driver else None
driver_model = StaticDriver(
name=self._normalize(
- driver.get("driver_name") if driver else None, default="unknown"
+ (driver.get("driver_name") or driver.get("name")) if driver else None,
+ default="unknown",
),
version=self._normalize(
- driver.get("driver_version") if driver else None, default="unknown"
+ (driver.get("driver_version") or driver.get("version")) if driver else None,
+ default="unknown",
+ ),
+ os_kernel_version=(
+ None if os_kernel in (None, "", "N/A") else str(os_kernel).strip()
),
)
@@ -964,6 +982,8 @@ def get_static(self) -> Optional[list[AmdSmiStatic]]:
# Cache info
cache_info_model = self._parse_cache_info(cache)
+ limit_model = self._parse_limit(item.get("limit"))
+
# Clock
clock_dict_model = self._parse_clock_dict(clock)
@@ -974,7 +994,7 @@ def get_static(self) -> Optional[list[AmdSmiStatic]]:
asic=asic_model,
bus=bus_model,
vbios=vbios_model,
- limit=None,
+ limit=limit_model,
driver=driver_model,
board=board_model,
ras=ras_model,
@@ -999,6 +1019,35 @@ def get_static(self) -> Optional[list[AmdSmiStatic]]:
return out
+ def _parse_limit(self, data: Optional[object]) -> Optional[StaticLimit]:
+ """Parse static power/limit block (legacy flat or ROCm 7+ ``ppt0``/``ppt1``)."""
+ if data is None or not isinstance(data, dict) or not data:
+ return None
+ try:
+ return StaticLimit.model_validate(normalize_static_limit_dict(data))
+ except ValidationError as err:
+ self._log_event(
+ category=EventCategory.APPLICATION,
+ description="Failed to build StaticLimit; skipping limit data",
+ data={"errors": err.errors(include_url=False)},
+ priority=EventPriority.WARNING,
+ )
+ return None
+
+ def _parse_current_level(self, data: dict) -> Optional[int]:
+ """Extract current DPM level index from static clock JSON."""
+ cur_raw = data.get("current")
+ if cur_raw is None:
+ cur_raw = data.get("current level")
+ if isinstance(cur_raw, (int, float)):
+ return int(cur_raw)
+ if isinstance(cur_raw, str) and cur_raw.strip() and cur_raw.upper() != "N/A":
+ try:
+ return int(cur_raw.strip())
+ except ValueError:
+ return None
+ return None
+
def _parse_soc_pstate(self, data: dict) -> Optional[StaticSocPstate]:
"""Parse SOC P-state data
@@ -1241,6 +1290,22 @@ def _parse_clock(self, data: dict) -> Optional[StaticClockData]:
if not isinstance(data, dict):
return None
+ current = self._parse_current_level(data)
+ freq_levels_raw = data.get("frequency_levels")
+ if isinstance(freq_levels_raw, dict) and freq_levels_raw:
+ try:
+ levels = StaticFrequencyLevels.model_validate(freq_levels_raw)
+ return StaticClockData.model_validate(
+ {"frequency_levels": levels, "current level": current}
+ )
+ except ValidationError as err:
+ self._log_event(
+ category=EventCategory.APPLICATION,
+ description="Failed to parse static clock frequency_levels",
+ data={"errors": err.errors(include_url=False)},
+ priority=EventPriority.WARNING,
+ )
+
freqs_raw = data.get("frequency")
if not isinstance(freqs_raw, list) or not freqs_raw:
return None
@@ -1272,18 +1337,6 @@ def _fmt(n: Optional[int]) -> Optional[str]:
level1: Optional[str] = _fmt(freqs_mhz[1]) if len(freqs_mhz) > 1 else None
level2: Optional[str] = _fmt(freqs_mhz[2]) if len(freqs_mhz) > 2 else None
- cur_raw = data.get("current")
- current: Optional[int]
- if isinstance(cur_raw, (int, float)):
- current = int(cur_raw)
- elif isinstance(cur_raw, str) and cur_raw.strip() and cur_raw.upper() != "N/A":
- try:
- current = int(cur_raw.strip())
- except Exception:
- current = None
- else:
- current = None
-
try:
levels = StaticFrequencyLevels.model_validate(
{"Level 0": level0, "Level 1": level1, "Level 2": level2}
@@ -1310,9 +1363,19 @@ def _parse_clock_dict(self, data: dict) -> Optional[dict[str, Union[StaticClockD
clock_dict: dict[str, Union[StaticClockData, None]] = {}
- clock_data = self._parse_clock(data)
- if clock_data:
- clock_dict["clk"] = clock_data
+ for key, val in data.items():
+ if val is None or (isinstance(val, str) and val.strip().upper() in {"N/A", "NA", ""}):
+ clock_dict[str(key)] = None
+ continue
+ if isinstance(val, dict):
+ parsed = self._parse_clock(val)
+ if parsed is not None:
+ clock_dict[str(key)] = parsed
+
+ if not clock_dict and (data.get("frequency") or data.get("frequency_levels")):
+ parsed = self._parse_clock(data)
+ if parsed is not None:
+ clock_dict["clk"] = parsed
return clock_dict if clock_dict else None
diff --git a/nodescraper/plugins/inband/amdsmi/amdsmidata.py b/nodescraper/plugins/inband/amdsmi/amdsmidata.py
index 940047ba..fd603028 100644
--- a/nodescraper/plugins/inband/amdsmi/amdsmidata.py
+++ b/nodescraper/plugins/inband/amdsmi/amdsmidata.py
@@ -43,6 +43,43 @@
_NUM_UNIT_RE = re.compile(r"^\s*([-+]?\d+(?:\.\d+)?)(?:\s*([A-Za-z%/][A-Za-z0-9%/._-]*))?\s*$")
+def _value_unit_is_na(x: Any) -> bool:
+ return x is None or (isinstance(x, str) and x.strip().upper() in {"N/A", "NA", ""})
+
+
+def _coerce_value_unit_raw(v: Any) -> Any:
+ """Normalize raw amd-smi input into ``ValueUnit``-compatible data for validation."""
+ if _value_unit_is_na(v):
+ return None
+
+ if isinstance(v, dict):
+ val = v.get("value")
+ unit = v.get("unit", "")
+ if _value_unit_is_na(val):
+ return None
+ if isinstance(val, str):
+ m = _NUM_UNIT_RE.match(val.strip())
+ if m and not unit:
+ num, u = m.groups()
+ unit = u or unit or ""
+ val = float(num) if "." in num else int(num)
+ return {"value": val, "unit": unit}
+
+ if isinstance(v, (int, float)):
+ return {"value": v, "unit": ""}
+
+ if isinstance(v, str):
+ s = v.strip()
+ m = _NUM_UNIT_RE.match(s)
+ if m:
+ num, unit = m.groups()
+ val = float(num) if "." in num else int(num)
+ return {"value": val, "unit": unit or ""}
+ return {"value": s, "unit": ""}
+
+ return v
+
+
def na_to_none(values: Union[int, str]):
if values == "N/A":
return None
@@ -124,40 +161,7 @@ class ValueUnit(BaseModel):
@model_validator(mode="before")
@classmethod
def _coerce(cls, v):
- # treat N/A as None
- def na(x) -> bool:
- return x is None or (isinstance(x, str) and x.strip().upper() in {"N/A", "NA", ""})
-
- if na(v):
- return None
-
- if isinstance(v, dict):
- val = v.get("value")
- unit = v.get("unit", "")
- if na(val):
- return None
- if isinstance(val, str):
- m = _NUM_UNIT_RE.match(val.strip())
- if m and not unit:
- num, u = m.groups()
- unit = u or unit or ""
- val = float(num) if "." in num else int(num)
- return {"value": val, "unit": unit}
-
- # numbers
- if isinstance(v, (int, float)):
- return {"value": v, "unit": ""}
-
- if isinstance(v, str):
- s = v.strip()
- m = _NUM_UNIT_RE.match(s)
- if m:
- num, unit = m.groups()
- val = float(num) if "." in num else int(num)
- return {"value": val, "unit": unit or ""}
- return {"value": s, "unit": ""}
-
- return v
+ return _coerce_value_unit_raw(v)
@field_validator("unit")
@classmethod
@@ -165,6 +169,20 @@ def _clean_unit(cls, u):
return "" if u is None else str(u).strip()
+def coerce_value_unit_input(v: Any) -> Any:
+ """Normalize raw amd-smi values into ``ValueUnit``-compatible input.
+
+ Use as a ``mode='before'`` field validator on ``ValueUnit`` / ``Optional[ValueUnit]`` fields.
+ Accepts ``ValueUnit``, dict, number, or strings such as ``"138 MHz"`` / ``"N/A"``.
+ Returns ``None`` when the value is not present or is N/A.
+ """
+ if v is None:
+ return None
+ if isinstance(v, ValueUnit):
+ return v
+ return _coerce_value_unit_raw(v)
+
+
# Process
class ProcessMemoryUsage(BaseModel):
gtt_mem: Optional[ValueUnit]
@@ -313,7 +331,31 @@ class StaticVbios(BaseModel):
version: str
+class StaticPowerLimit(AmdSmiBaseModel):
+ """Per-PPT power limits (ROCm 7+ static --limit JSON)."""
+
+ model_config = ConfigDict(
+ populate_by_name=True,
+ extra="ignore",
+ )
+
+ max_power_limit: Optional[ValueUnit] = None
+ min_power_limit: Optional[ValueUnit] = None
+ socket_power_limit: Optional[ValueUnit] = None
+ na_validator = field_validator(
+ "max_power_limit", "min_power_limit", "socket_power_limit", mode="before"
+ )(na_to_none)
+ _ppt_vu = field_validator(
+ "max_power_limit", "min_power_limit", "socket_power_limit", mode="before"
+ )(coerce_value_unit_input)
+
+
class StaticLimit(AmdSmiBaseModel):
+ model_config = ConfigDict(
+ populate_by_name=True,
+ extra="ignore",
+ )
+
max_power: Optional[ValueUnit] = None
min_power: Optional[ValueUnit] = None
socket_power: Optional[ValueUnit] = None
@@ -323,6 +365,10 @@ class StaticLimit(AmdSmiBaseModel):
shutdown_edge_temperature: Optional[ValueUnit] = None
shutdown_hotspot_temperature: Optional[ValueUnit] = None
shutdown_vram_temperature: Optional[ValueUnit] = None
+ ppt0: Optional[StaticPowerLimit] = None
+ ppt1: Optional[StaticPowerLimit] = None
+ ptl_state: Optional[str] = None
+ ptl_format: Optional[str] = None
na_validator = field_validator(
"max_power",
"min_power",
@@ -333,13 +379,72 @@ class StaticLimit(AmdSmiBaseModel):
"shutdown_edge_temperature",
"shutdown_hotspot_temperature",
"shutdown_vram_temperature",
+ "ppt0",
+ "ppt1",
+ "ptl_state",
+ "ptl_format",
mode="before",
)(na_to_none)
+ _limit_value_unit = field_validator(
+ "max_power",
+ "min_power",
+ "socket_power",
+ "slowdown_edge_temperature",
+ "slowdown_hotspot_temperature",
+ "slowdown_vram_temperature",
+ "shutdown_edge_temperature",
+ "shutdown_hotspot_temperature",
+ "shutdown_vram_temperature",
+ mode="before",
+ )(coerce_value_unit_input)
+
+ def resolved_max_power(self) -> Optional[ValueUnit]:
+ """Return max power cap from legacy flat fields or ``ppt0`` (ROCm 7+)."""
+ if self.max_power is not None:
+ return self.max_power
+ if self.ppt0 is not None and self.ppt0.max_power_limit is not None:
+ return self.ppt0.max_power_limit
+ return None
+
+ @model_validator(mode="before")
+ @classmethod
+ def _normalize_limit_input(cls, data: Any) -> Any:
+ if isinstance(data, dict):
+ return normalize_static_limit_dict(data)
+ return data
+
+
+def _normalize_static_ppt_block(val: object) -> object:
+ """Drop N/A-only ``ppt0``/``ppt1`` blocks; pass through already-parsed models."""
+ if val is None:
+ return None
+ if isinstance(val, str) and val.strip().upper() in {"N/A", "NA", ""}:
+ return None
+ if not isinstance(val, dict):
+ return val
+ out: dict[str, Any] = {}
+ for key, raw in val.items():
+ if isinstance(raw, str) and raw.strip().upper() in {"N/A", "NA", ""}:
+ continue
+ if raw is not None:
+ out[key] = raw
+ return out or None
+
+
+def normalize_static_limit_dict(data: dict[str, Any]) -> dict[str, Any]:
+ """Normalize static ``limit`` JSON for ``StaticLimit`` (ROCm 7.13 + legacy)."""
+ out = dict(data)
+ for ppt_key in ("ppt0", "ppt1"):
+ out[ppt_key] = _normalize_static_ppt_block(out.get(ppt_key))
+ return out
class StaticDriver(BaseModel):
- name: str
- version: str
+ model_config = ConfigDict(populate_by_name=True)
+
+ name: str = Field(validation_alias=AliasChoices("name", "driver_name"))
+ version: str = Field(validation_alias=AliasChoices("version", "driver_version"))
+ os_kernel_version: Optional[str] = None
class StaticBoard(BaseModel):
@@ -418,14 +523,21 @@ class StaticCacheInfoItem(AmdSmiBaseModel):
na_validator = field_validator("cache_size", mode="before")(na_to_none)
-class StaticFrequencyLevels(BaseModel):
+class StaticFrequencyLevels(AmdSmiBaseModel):
+ """Static clock frequency levels; each level is normalized to ``ValueUnit``."""
+
model_config = ConfigDict(
populate_by_name=True,
+ extra="forbid",
)
- Level_0: str = Field(..., alias="Level 0")
- Level_1: Optional[str] = Field(default=None, alias="Level 1")
- Level_2: Optional[str] = Field(default=None, alias="Level 2")
+ Level_0: ValueUnit = Field(..., alias="Level 0")
+ Level_1: Optional[ValueUnit] = Field(default=None, alias="Level 1")
+ Level_2: Optional[ValueUnit] = Field(default=None, alias="Level 2")
+
+ _level_value_unit = field_validator("Level_0", "Level_1", "Level_2", mode="before")(
+ coerce_value_unit_input
+ )
class StaticClockData(BaseModel):
@@ -444,13 +556,13 @@ class AmdSmiStatic(BaseModel):
gpu: int
asic: StaticAsic
bus: StaticBus
- vbios: Optional[StaticVbios]
- limit: Optional[StaticLimit]
+ vbios: Optional[StaticVbios] = None
+ limit: Optional[StaticLimit] = None
driver: StaticDriver
board: StaticBoard
ras: StaticRas
- soc_pstate: Optional[StaticSocPstate]
- xgmi_plpd: Optional[StaticXgmiPlpd]
+ soc_pstate: Optional[StaticSocPstate] = None
+ xgmi_plpd: Optional[StaticXgmiPlpd] = None
process_isolation: str
numa: StaticNuma
vram: StaticVram
@@ -514,6 +626,7 @@ class MetricPower(BaseModel):
gfx_voltage: Optional[ValueUnit]
soc_voltage: Optional[ValueUnit]
mem_voltage: Optional[ValueUnit]
+ ubb_power: Optional[ValueUnit] = None
throttle_status: Optional[str]
power_management: Optional[str]
na_validator = field_validator(
@@ -521,23 +634,75 @@ class MetricPower(BaseModel):
"gfx_voltage",
"soc_voltage",
"mem_voltage",
+ "ubb_power",
"throttle_status",
"power_management",
mode="before",
)(na_to_none)
+ _ubb_power_vu = field_validator("ubb_power", mode="before")(coerce_value_unit_input)
class MetricClockData(BaseModel):
- clk: Optional[ValueUnit]
- min_clk: Optional[ValueUnit]
- max_clk: Optional[ValueUnit]
- clk_locked: Optional[Union[int, str, dict]]
- deep_sleep: Optional[Union[int, str, dict]]
+ model_config = ConfigDict(extra="ignore")
+
+ clk: Optional[ValueUnit] = None
+ min_clk: Optional[ValueUnit] = None
+ max_clk: Optional[ValueUnit] = None
+ clk_locked: Optional[Union[int, str, dict]] = None
+ deep_sleep: Optional[Union[int, str, dict]] = None
na_validator = field_validator(
"clk", "min_clk", "max_clk", "clk_locked", "deep_sleep", mode="before"
)(na_to_none)
+_METRIC_CLOCK_LEAF_KEYS = frozenset({"clk", "min_clk", "max_clk", "clk_locked", "deep_sleep"})
+
+
+def _looks_like_metric_clock_leaf(value: object) -> bool:
+ """True when *value* matches legacy amd-smi per-clock metric JSON."""
+ return isinstance(value, dict) and bool(_METRIC_CLOCK_LEAF_KEYS & value.keys())
+
+
+def _normalize_metric_clock_map(clock: Any) -> Any:
+ """Coerce standard clock leaves to ``MetricClockData``; keep nested maps as dicts."""
+ if not isinstance(clock, dict):
+ return clock
+ normalized: dict[str, Any] = {}
+ for key, val in clock.items():
+ if val is None or (isinstance(val, str) and _value_unit_is_na(val)):
+ normalized[key] = None
+ elif isinstance(val, MetricClockData):
+ normalized[key] = val
+ elif _looks_like_metric_clock_leaf(val):
+ normalized[key] = MetricClockData.model_validate(val)
+ else:
+ normalized[key] = val
+ return normalized
+
+
+def normalize_amdsmi_metric_dict(item: dict[str, Any]) -> dict[str, Any]:
+ """Normalize raw ``amd-smi metric`` JSON before ``AmdSmiMetric`` validation (ROCm 7.13 + legacy)."""
+ out = dict(item)
+ if "gpu_board" in out and "gpuboard" not in out:
+ out["gpuboard"] = out.pop("gpu_board")
+ if "base_board" in out and "baseboard" not in out:
+ out["baseboard"] = out.pop("base_board")
+ pcie = out.get("pcie")
+ if isinstance(pcie, dict):
+ pcie_out = dict(pcie)
+ count = pcie_out.get("lc_perf_other_end_recovery_count")
+ legacy = pcie_out.get("lc_perf_other_end_recovery")
+ if legacy is not None:
+ pcie_out["lc_perf_other_end_recovery_count"] = legacy
+ elif count is not None:
+ pcie_out["lc_perf_other_end_recovery"] = count
+ out["pcie"] = pcie_out
+ clock = out.get("clock")
+ if isinstance(clock, dict):
+ out["clock"] = _normalize_metric_clock_map(clock)
+ return out
+
+
class MetricTemperature(BaseModel):
edge: Optional[ValueUnit]
hotspot: Optional[ValueUnit]
@@ -546,18 +711,38 @@ class MetricTemperature(BaseModel):
class MetricPcie(BaseModel):
- width: Optional[int]
- speed: Optional[ValueUnit]
- bandwidth: Optional[ValueUnit]
- replay_count: Optional[int]
- l0_to_recovery_count: Optional[int]
- replay_roll_over_count: Optional[int]
- nak_sent_count: Optional[int]
- nak_received_count: Optional[int]
- current_bandwidth_sent: Optional[int]
- current_bandwidth_received: Optional[int]
- max_packet_size: Optional[int]
- lc_perf_other_end_recovery: Optional[int]
+ width: Optional[int] = None
+ speed: Optional[ValueUnit] = None
+ bandwidth: Optional[ValueUnit] = None
+ replay_count: Optional[int] = None
+ l0_to_recovery_count: Optional[int] = None
+ replay_roll_over_count: Optional[int] = None
+ nak_sent_count: Optional[int] = None
+ nak_received_count: Optional[int] = None
+ current_bandwidth_sent: Optional[int] = None
+ current_bandwidth_received: Optional[int] = None
+ max_packet_size: Optional[int] = None
+ lc_perf_other_end_recovery_count: Optional[int] = None
+
+ @model_validator(mode="before")
+ @classmethod
+ def _normalize_recovery_count_key(cls, data: Any) -> Any:
+ if not isinstance(data, dict):
+ return data
+ out = dict(data)
+ count = out.get("lc_perf_other_end_recovery_count")
+ legacy = out.get("lc_perf_other_end_recovery")
+ if legacy is not None:
+ out["lc_perf_other_end_recovery_count"] = legacy
+ elif count is not None:
+ out["lc_perf_other_end_recovery"] = count
+ return out
+
+ @property
+ def lc_perf_other_end_recovery(self) -> Optional[int]:
+ """Legacy amd-smi JSON / attribute name (alias for ``lc_perf_other_end_recovery_count``)."""
+ return self.lc_perf_other_end_recovery_count
+
na_validator = field_validator(
"width",
"speed",
@@ -570,7 +755,7 @@ class MetricPcie(BaseModel):
"current_bandwidth_sent",
"current_bandwidth_received",
"max_packet_size",
- "lc_perf_other_end_recovery",
+ "lc_perf_other_end_recovery_count",
mode="before",
)(na_to_none)
@@ -774,10 +959,12 @@ class EccData(BaseModel):
class AmdSmiMetric(BaseModel):
+ model_config = ConfigDict(extra="ignore", populate_by_name=True)
+
gpu: int
- usage: MetricUsage
+ usage: Union[MetricUsage, str]
power: MetricPower
- clock: dict[str, MetricClockData]
+ clock: dict[str, Union[MetricClockData, dict[str, Any]]]
temperature: MetricTemperature
pcie: MetricPcie
ecc: MetricEccTotals
@@ -789,8 +976,22 @@ class AmdSmiMetric(BaseModel):
energy: Optional[MetricEnergy]
mem_usage: MetricMemUsage
throttle: MetricThrottle
+ gpuboard: Optional[Union[dict[str, Any], str]] = Field(
+ default=None,
+ validation_alias=AliasChoices("gpuboard", "gpu_board"),
+ )
+ baseboard: Optional[Union[dict[str, Any], str]] = Field(
+ default=None,
+ validation_alias=AliasChoices("baseboard", "base_board"),
+ )
na_validator = field_validator("xgmi_err", "perf_level", mode="before")(na_to_none)
+ _board_dict_na = field_validator("gpuboard", "baseboard", mode="before")(na_to_none_dict)
+
+ @field_validator("clock", mode="plain")
+ @classmethod
+ def _normalize_clock(cls, clock: Any) -> Any:
+ return _normalize_metric_clock_map(clock)
@field_validator("ecc_blocks", mode="before")
@classmethod
@@ -1050,10 +1251,13 @@ def ref_max_power_w(self) -> Optional[int]:
"""First available max power limit (W) from static data, lowest GPU index first."""
for gpu in self._sorted_static_gpus():
lim = gpu.limit
- if lim is None or lim.max_power is None or lim.max_power.value is None:
+ if lim is None:
+ continue
+ max_power = lim.resolved_max_power()
+ if max_power is None or max_power.value is None:
continue
try:
- return int(float(lim.max_power.value))
+ return int(float(max_power.value))
except (TypeError, ValueError):
continue
return None
diff --git a/nodescraper/plugins/inband/amdsmi/analyzer_args.py b/nodescraper/plugins/inband/amdsmi/analyzer_args.py
index 3a5d2cfb..cd08bd49 100644
--- a/nodescraper/plugins/inband/amdsmi/analyzer_args.py
+++ b/nodescraper/plugins/inband/amdsmi/analyzer_args.py
@@ -43,6 +43,13 @@ class AmdSmiAnalyzerArgs(AnalyzerArgs):
expected_max_power: Optional[int] = Field(
default=None, description="Expected maximum power value (e.g. watts)."
)
+ expected_power_management: Optional[str] = Field(
+ default=None,
+ description=(
+ "Expected amd-smi metric power_management value per GPU "
+ "(e.g. DISABLED for active/full power, ENABLED for power-managed idle)."
+ ),
+ )
expected_driver_version: Optional[str] = Field(
default=None, description="Expected AMD driver version string."
)
diff --git a/pyproject.toml b/pyproject.toml
index 018c1514..c5495ffc 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -2,7 +2,7 @@
name = "amd-node-scraper"
dynamic = ["version"]
description = "A framework for automated error detection and data collection"
-authors = []
+authors = [{ name = "AMD" }]
readme = "README.md"
requires-python = ">=3.9"
@@ -38,7 +38,7 @@ documentation = "https://github.com/amd/node-scraper"
repository = "https://github.com/amd/node-scraper"
[build-system]
-requires = ["setuptools==70.3.0", "setuptools-scm==8.1.0"]
+requires = ["setuptools==78.1.1", "setuptools-scm==8.1.0"]
build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find]
@@ -50,6 +50,10 @@ include-package-data = true
[project.scripts]
node-scraper = "nodescraper.cli:cli_entry"
+[project.entry-points."nodescraper.plugin_configs"]
+NodeStatus = "nodescraper.pluginrecipe.node_status:NodeStatus"
+AllPlugins = "nodescraper.pluginrecipe.all_plugins:AllPlugins"
+
[tool.black]
line-length = 100
target_version = ['py39']
diff --git a/test/functional/fixtures/amdsmi_plugin_config.json b/test/functional/fixtures/amdsmi_plugin_config.json
new file mode 100644
index 00000000..73de0eb3
--- /dev/null
+++ b/test/functional/fixtures/amdsmi_plugin_config.json
@@ -0,0 +1,14 @@
+{
+ "global_args": {},
+ "plugins": {
+ "AmdSmiPlugin": {
+ "analysis_args": {
+ "expected_power_management": "DISABLED",
+ "expected_xgmi_speed": [32.0]
+ }
+ }
+ },
+ "result_collators": {},
+ "name": "AmdSmiPlugin config",
+ "desc": "Functional test config for AmdSmiPlugin ROCm 7.13 checks"
+}
diff --git a/test/functional/test_plugin_configs.py b/test/functional/test_plugin_configs.py
index 90c68218..6ee1a004 100644
--- a/test/functional/test_plugin_configs.py
+++ b/test/functional/test_plugin_configs.py
@@ -42,6 +42,7 @@ def fixtures_dir():
def plugin_config_files(fixtures_dir):
"""Return dict mapping plugin names to their config file paths."""
return {
+ "AmdSmiPlugin": fixtures_dir / "amdsmi_plugin_config.json",
"BiosPlugin": fixtures_dir / "bios_plugin_config.json",
"CmdlinePlugin": fixtures_dir / "cmdline_plugin_config.json",
"DimmPlugin": fixtures_dir / "dimm_plugin_config.json",
diff --git a/test/unit/cli/test_plugin_configs_cli.py b/test/unit/cli/test_plugin_configs_cli.py
index 5c4af37b..90e1171a 100644
--- a/test/unit/cli/test_plugin_configs_cli.py
+++ b/test/unit/cli/test_plugin_configs_cli.py
@@ -10,20 +10,16 @@
from nodescraper.cli.cli import build_parser
from nodescraper.configregistry import ConfigRegistry
-from nodescraper.models import PluginConfig
+from nodescraper.pluginrecipe.all_plugins import AllPlugins
+from nodescraper.pluginrecipe.node_status import NodeStatus
from nodescraper.pluginregistry import PluginRegistry
def _parser():
plugin_reg = PluginRegistry()
- config_reg = ConfigRegistry()
- config_reg.configs["AllPlugins"] = PluginConfig(
- name="AllPlugins",
- desc="Run all registered plugins with default arguments",
- global_args={},
- plugins={name: {} for name in plugin_reg.plugins},
- result_collators={},
- )
+ config_reg = ConfigRegistry(load_entry_point_configs=False)
+ for recipe in (NodeStatus, AllPlugins):
+ config_reg.configs[recipe.name()] = recipe.plugin_config()
return build_parser(plugin_reg, config_reg)[0]
diff --git a/test/unit/framework/test_cli_helper.py b/test/unit/framework/test_cli_helper.py
index 5b88bf7e..3e09c108 100644
--- a/test/unit/framework/test_cli_helper.py
+++ b/test/unit/framework/test_cli_helper.py
@@ -127,7 +127,10 @@ def test_get_plugin_configs():
def test_config_builder(plugin_registry):
config = build_config(
- config_reg=ConfigRegistry(config_path=os.path.join(os.path.dirname(__file__), "fixtures")),
+ config_reg=ConfigRegistry(
+ config_path=os.path.join(os.path.dirname(__file__), "fixtures"),
+ load_entry_point_configs=False,
+ ),
plugin_reg=plugin_registry,
logger=logging.getLogger(),
plugins=["TestPluginA"],
diff --git a/test/unit/framework/test_config_registry.py b/test/unit/framework/test_config_registry.py
index 9fb343e3..5ba78fd1 100644
--- a/test/unit/framework/test_config_registry.py
+++ b/test/unit/framework/test_config_registry.py
@@ -32,7 +32,8 @@
def test_config_registry():
config_registry = ConfigRegistry(
- config_path=os.path.join(os.path.dirname(__file__), "fixtures")
+ config_path=os.path.join(os.path.dirname(__file__), "fixtures"),
+ load_entry_point_configs=False,
)
assert config_registry.configs == {
diff --git a/test/unit/framework/test_config_registry_entry_points.py b/test/unit/framework/test_config_registry_entry_points.py
new file mode 100644
index 00000000..e1a8774f
--- /dev/null
+++ b/test/unit/framework/test_config_registry_entry_points.py
@@ -0,0 +1,163 @@
+###############################################################################
+#
+# MIT License
+#
+# Copyright (C) 2026 Advanced Micro Devices, Inc.
+#
+###############################################################################
+
+from __future__ import annotations
+
+from types import SimpleNamespace
+from unittest import mock
+
+import pytest
+from pydantic import ValidationError
+
+from nodescraper.configregistry import (
+ PLUGIN_CONFIG_ENTRY_POINT_GROUP,
+ ConfigRegistry,
+ PluginConfigEntryPointError,
+)
+from nodescraper.models import PluginConfig
+from nodescraper.pluginrecipe.pluginrecipe import PluginRecipe
+
+
+class _ExamplePluginRecipe(PluginRecipe):
+ @classmethod
+ def plugin_names(cls) -> tuple[str, ...]:
+ return ("ExamplePlugin",)
+
+ @classmethod
+ def plugin_config(cls) -> PluginConfig:
+ return PluginConfig(
+ name="ExamplePluginRecipe",
+ desc="Example entry-point recipe",
+ plugins={"ExamplePlugin": {}},
+ )
+
+
+def _example_plugin_recipe_factory() -> dict:
+ return {
+ "name": "CallableRecipe",
+ "desc": "Callable entry-point recipe",
+ "global_args": {},
+ "plugins": {"CallablePlugin": {}},
+ "result_collators": {},
+ }
+
+
+def test_load_plugin_configs_from_entry_point_class() -> None:
+ entry_point = SimpleNamespace(
+ name="ExamplePluginRecipe",
+ load=lambda: _ExamplePluginRecipe,
+ )
+ with mock.patch(
+ "nodescraper.configregistry.ConfigRegistry._entry_points_for_group",
+ return_value=[entry_point],
+ ):
+ configs = ConfigRegistry.load_plugin_configs_from_entry_points()
+
+ assert set(configs) == {"ExamplePluginRecipe"}
+ assert configs["ExamplePluginRecipe"] == PluginConfig(
+ name="ExamplePluginRecipe",
+ desc="Example entry-point recipe",
+ global_args={},
+ plugins={"ExamplePlugin": {}},
+ result_collators={},
+ )
+
+
+def test_load_plugin_configs_from_entry_point_uses_entry_point_name_only() -> None:
+ entry_point = SimpleNamespace(
+ name="NodeStatusPluginConfig",
+ load=lambda: _ExamplePluginRecipe,
+ )
+ with mock.patch(
+ "nodescraper.configregistry.ConfigRegistry._entry_points_for_group",
+ return_value=[entry_point],
+ ):
+ configs = ConfigRegistry.load_plugin_configs_from_entry_points()
+
+ assert set(configs) == {"NodeStatusPluginConfig"}
+ assert configs["NodeStatusPluginConfig"].name == "ExamplePluginRecipe"
+
+
+def test_load_plugin_configs_from_entry_point_callable() -> None:
+ entry_point = SimpleNamespace(
+ name="CallableRecipe",
+ load=lambda: _example_plugin_recipe_factory,
+ )
+ with mock.patch(
+ "nodescraper.configregistry.ConfigRegistry._entry_points_for_group",
+ return_value=[entry_point],
+ ):
+ configs = ConfigRegistry.load_plugin_configs_from_entry_points()
+
+ assert configs["CallableRecipe"].plugins == {"CallablePlugin": {}}
+
+
+def test_config_registry_merges_entry_point_configs() -> None:
+ entry_point = SimpleNamespace(
+ name="ExamplePluginRecipe",
+ load=lambda: _ExamplePluginRecipe,
+ )
+ with mock.patch(
+ "nodescraper.configregistry.ConfigRegistry._entry_points_for_group",
+ return_value=[entry_point],
+ ):
+ config_registry = ConfigRegistry(
+ config_path="/nonexistent",
+ load_entry_point_configs=True,
+ )
+
+ assert "ExamplePluginRecipe" in config_registry.configs
+ assert config_registry.configs["ExamplePluginRecipe"].plugins == {"ExamplePlugin": {}}
+
+
+def test_plugin_config_entry_point_group_constant() -> None:
+ assert PLUGIN_CONFIG_ENTRY_POINT_GROUP == "nodescraper.plugin_configs"
+
+
+def test_load_plugin_configs_raises_on_module_not_found() -> None:
+ def _raise_module_not_found() -> None:
+ raise ModuleNotFoundError("No module named 'missing.pluginrecipe'")
+
+ entry_point = SimpleNamespace(
+ name="BrokenRecipe",
+ load=_raise_module_not_found,
+ )
+ with mock.patch(
+ "nodescraper.configregistry.ConfigRegistry._entry_points_for_group",
+ return_value=[entry_point],
+ ):
+ with pytest.raises(PluginConfigEntryPointError, match="module not found"):
+ ConfigRegistry.load_plugin_configs_from_entry_points()
+
+
+def test_load_plugin_configs_raises_on_unsupported_target() -> None:
+ entry_point = SimpleNamespace(
+ name="BrokenRecipe",
+ load=lambda: "not-a-config",
+ )
+ with mock.patch(
+ "nodescraper.configregistry.ConfigRegistry._entry_points_for_group",
+ return_value=[entry_point],
+ ):
+ with pytest.raises(PluginConfigEntryPointError, match="unsupported target"):
+ ConfigRegistry.load_plugin_configs_from_entry_points()
+
+
+def test_load_plugin_configs_raises_on_invalid_config() -> None:
+ entry_point = SimpleNamespace(
+ name="BrokenRecipe",
+ load=lambda: {"plugins": "not-a-dict"},
+ )
+ with mock.patch(
+ "nodescraper.configregistry.ConfigRegistry._entry_points_for_group",
+ return_value=[entry_point],
+ ):
+ with pytest.raises(PluginConfigEntryPointError, match="invalid plugin config") as exc_info:
+ ConfigRegistry.load_plugin_configs_from_entry_points()
+
+ assert isinstance(exc_info.value.__cause__, ValidationError)
diff --git a/test/unit/framework/test_pluginrecipe.py b/test/unit/framework/test_pluginrecipe.py
new file mode 100644
index 00000000..396fbaf1
--- /dev/null
+++ b/test/unit/framework/test_pluginrecipe.py
@@ -0,0 +1,130 @@
+###############################################################################
+#
+# MIT License
+#
+# Copyright (C) 2026 Advanced Micro Devices, Inc.
+#
+###############################################################################
+
+from __future__ import annotations
+
+from unittest.mock import patch
+
+from nodescraper.models import PluginConfig
+from nodescraper.pluginrecipe.all_plugins import AllPlugins
+from nodescraper.pluginrecipe.node_status import NodeStatus
+from nodescraper.pluginrecipe.pluginrecipe import (
+ ANALYZE_ONLY,
+ COLLECT_AND_ANALYZE,
+ COLLECT_ONLY,
+ AnalyzerOnlyPluginRecipe,
+ CollectorOnlyPluginRecipe,
+ merge_plugin_configs,
+)
+from nodescraper.pluginregistry import PluginRegistry
+
+
+class _CollectorOnlyPlugin:
+ COLLECTOR = object()
+
+
+class _AnalyzerOnlyPlugin:
+ ANALYZER = object()
+
+
+class _BothTasksPlugin:
+ COLLECTOR = object()
+ ANALYZER = object()
+
+
+class _CollectOnlyRecipe(CollectorOnlyPluginRecipe):
+ @classmethod
+ def plugin_names(cls) -> tuple[str, ...]:
+ return ("DmesgPlugin",)
+
+
+class _AnalyzeOnlyRecipe(AnalyzerOnlyPluginRecipe):
+ @classmethod
+ def plugin_names(cls) -> tuple[str, ...]:
+ return ("DmesgPlugin",)
+
+
+def test_node_status_recipe_matches_registered_plugins() -> None:
+ available = set(PluginRegistry().plugins)
+ expected = {
+ "BiosPlugin",
+ "CmdlinePlugin",
+ "DimmPlugin",
+ "DkmsPlugin",
+ "DmesgPlugin",
+ "KernelPlugin",
+ "MemoryPlugin",
+ "OsPlugin",
+ "RocmPlugin",
+ "StoragePlugin",
+ "UptimePlugin",
+ }
+ assert set(NodeStatus.plugin_names()) == expected & available
+
+
+def test_all_plugins_recipe_matches_registry() -> None:
+ plugin_reg = PluginRegistry()
+ assert set(AllPlugins.plugin_names()) == set(plugin_reg.plugins)
+
+
+def test_node_status_plugin_config_shape() -> None:
+ config = NodeStatus.plugin_config()
+ assert config.name == "NodeStatus"
+ assert config.desc == "Check configuration and status of the node."
+ assert isinstance(config.plugins, dict)
+ assert config.plugins["DmesgPlugin"] == COLLECT_AND_ANALYZE.as_config()
+
+
+def test_all_plugins_plugin_config_shape() -> None:
+ config = AllPlugins.plugin_config()
+ assert config.name == "AllPlugins"
+ assert config.desc == "Run all registered plugins with default arguments."
+ assert len(config.plugins) == len(PluginRegistry().plugins)
+
+
+def test_collector_only_recipe_sets_analysis_false() -> None:
+ config = _CollectOnlyRecipe.plugin_config()
+ assert config.plugins["DmesgPlugin"] == COLLECT_ONLY.as_config()
+
+
+def test_analyzer_only_recipe_sets_collection_false() -> None:
+ config = _AnalyzeOnlyRecipe.plugin_config()
+ assert config.plugins["DmesgPlugin"] == ANALYZE_ONLY.as_config()
+
+
+@patch("nodescraper.pluginrecipe.discovery.load_plugin_class")
+def test_filter_plugin_names_by_task_type(mock_load_plugin_class) -> None:
+ mock_load_plugin_class.side_effect = lambda name: {
+ "CollectorPlugin": _CollectorOnlyPlugin,
+ "AnalyzerPlugin": _AnalyzerOnlyPlugin,
+ "BothPlugin": _BothTasksPlugin,
+ }[name]
+
+ class _Recipe(CollectorOnlyPluginRecipe):
+ @classmethod
+ def plugin_names(cls) -> tuple[str, ...]:
+ return cls.filter_plugin_names(("CollectorPlugin", "AnalyzerPlugin", "BothPlugin"))
+
+ assert _Recipe.plugin_names() == ("BothPlugin", "CollectorPlugin")
+
+
+def test_merge_plugin_configs_preserves_plugin_flags() -> None:
+ merged = merge_plugin_configs(
+ PluginConfig(
+ name="A",
+ desc="a",
+ plugins={"FooPlugin": COLLECT_ONLY.as_config()},
+ ),
+ PluginConfig(
+ name="B",
+ desc="b",
+ plugins={"BarPlugin": ANALYZE_ONLY.as_config()},
+ ),
+ )
+ assert merged.plugins["FooPlugin"] == COLLECT_ONLY.as_config()
+ assert merged.plugins["BarPlugin"] == ANALYZE_ONLY.as_config()
diff --git a/test/unit/plugin/test_amdsmi_analyzer.py b/test/unit/plugin/test_amdsmi_analyzer.py
index f3966c97..560a9ba4 100644
--- a/test/unit/plugin/test_amdsmi_analyzer.py
+++ b/test/unit/plugin/test_amdsmi_analyzer.py
@@ -29,7 +29,11 @@
import pytest
from nodescraper.enums import EventPriority
-from nodescraper.plugins.inband.amdsmi.amdsmi_analyzer import AmdSmiAnalyzer
+from nodescraper.plugins.inband.amdsmi.amdsmi_analyzer import (
+ AmdSmiAnalyzer,
+ _gpu_mismatch_description,
+ _gpu_unavailable_description,
+)
from nodescraper.plugins.inband.amdsmi.amdsmidata import (
AmdSmiDataModel,
AmdSmiMetric,
@@ -267,7 +271,7 @@ def test_check_expected_max_power_mismatch(mock_analyzer):
assert len(analyzer.result.events) == 1
assert analyzer.result.events[0].category == "PLATFORM"
assert analyzer.result.events[0].priority == EventPriority.ERROR
- assert "Max power mismatch" in analyzer.result.events[0].description
+ assert "GPU max power mismatch" in analyzer.result.events[0].description
def test_check_expected_max_power_missing(mock_analyzer):
@@ -283,7 +287,31 @@ def test_check_expected_max_power_missing(mock_analyzer):
assert len(analyzer.result.events) == 1
assert analyzer.result.events[0].priority == EventPriority.WARNING
- assert "has no max power limit set" in analyzer.result.events[0].description
+ assert "max power limit not set" in analyzer.result.events[0].description
+
+
+def test_check_expected_max_power_ppt0(mock_analyzer):
+ """Test check_expected_max_power uses ppt0.max_power_limit on ROCm 7+ static JSON."""
+ from nodescraper.plugins.inband.amdsmi.amdsmidata import StaticPowerLimit
+
+ analyzer = mock_analyzer
+ gpu = create_static_gpu(0, max_power=550.0)
+ gpu.limit = StaticLimit(
+ ppt0=StaticPowerLimit(
+ max_power_limit=ValueUnit(value=750, unit="W"),
+ min_power_limit=ValueUnit(value=0, unit="W"),
+ socket_power_limit=ValueUnit(value=600, unit="W"),
+ )
+ )
+ gpu.limit.max_power = None
+
+ analyzer.check_expected_max_power([gpu], 750)
+ assert len(analyzer.result.events) == 0
+
+ analyzer.result = analyzer._init_result()
+ analyzer.check_expected_max_power([gpu], 550)
+ assert len(analyzer.result.events) == 1
+ assert "GPU max power mismatch" in analyzer.result.events[0].description
def test_check_expected_driver_version_success(mock_analyzer):
@@ -314,7 +342,7 @@ def test_check_expected_driver_version_mismatch(mock_analyzer):
assert len(analyzer.result.events) == 1
assert analyzer.result.events[0].category == "PLATFORM"
assert analyzer.result.events[0].priority == EventPriority.ERROR
- assert "Driver Version Mismatch" in analyzer.result.events[0].description
+ assert "GPU driver version mismatch" in analyzer.result.events[0].description
def test_expected_gpu_processes_success(mock_analyzer):
@@ -378,7 +406,10 @@ def test_expected_gpu_processes_exceeds(mock_analyzer):
assert len(analyzer.result.events) == 1
assert analyzer.result.events[0].priority == EventPriority.ERROR
- assert "Number of processes exceeds max processes" in analyzer.result.events[0].description
+ assert (
+ "GPU process count mismatch (expected 5): GPU 0=10" in analyzer.result.events[0].description
+ )
+ assert analyzer.result.events[0].data["details"] == "GPU 0: expected 5, actual 10"
def test_expected_gpu_processes_no_data(mock_analyzer):
@@ -650,7 +681,7 @@ def test_check_expected_xgmi_link_speed_mismatch(mock_analyzer):
assert len(analyzer.result.events) == 1
assert analyzer.result.events[0].category == "IO"
assert analyzer.result.events[0].priority == EventPriority.ERROR
- assert "XGMI link speed is not as expected" in analyzer.result.events[0].description
+ assert "not in expected" in analyzer.result.events[0].description
def test_check_expected_xgmi_link_speed_multiple_valid_speeds(mock_analyzer):
@@ -717,7 +748,7 @@ def test_check_expected_xgmi_link_speed_missing_bit_rate(mock_analyzer):
assert len(analyzer.result.events) == 1
assert analyzer.result.events[0].priority == EventPriority.ERROR
- assert "XGMI link speed is not available" in analyzer.result.events[0].description
+ assert "XGMI link speed not available" in analyzer.result.events[0].description
def test_analyze_data_full_workflow(mock_analyzer):
@@ -816,7 +847,7 @@ def test_analyze_data_no_static_data(mock_analyzer):
"current_bandwidth_sent",
"current_bandwidth_received",
"max_packet_size",
- "lc_perf_other_end_recovery",
+ "lc_perf_other_end_recovery_count",
]
_ECC_TOTALS_KEYS = [
"total_correctable_count",
@@ -850,6 +881,7 @@ def _minimal_amdsmi_metric(
pcie: Optional[MetricPcie] = None,
ecc: Optional[MetricEccTotals] = None,
ecc_blocks: Optional[dict] = None,
+ power_management: Optional[str] = None,
) -> AmdSmiMetric:
"""Build minimal AmdSmiMetric for PCIe/ECC tests with all required fields present."""
pcie_dict = pcie.model_dump() if pcie is not None else {k: None for k in _PCIE_KEYS}
@@ -873,7 +905,7 @@ def _minimal_amdsmi_metric(
"soc_voltage": None,
"mem_voltage": None,
"throttle_status": None,
- "power_management": None,
+ "power_management": power_management,
},
"clock": {},
"temperature": {"edge": None, "hotspot": None, "mem": None},
@@ -901,6 +933,85 @@ def _minimal_amdsmi_metric(
)
+def test_gpu_mismatch_description_helpers():
+ """Per-GPU mismatch helpers include GPU id, expected, and actual in output."""
+ desc, details = _gpu_mismatch_description(
+ "GPU power_management", "DISABLED", {0: "ENABLED", 3: "ENABLED"}
+ )
+ assert "GPU 0=ENABLED" in desc
+ assert "GPU 3=ENABLED" in desc
+ assert "expected DISABLED" in desc
+ assert "GPU 0: expected DISABLED, actual ENABLED" in details
+ assert "GPU 3: expected DISABLED, actual ENABLED" in details
+
+ desc_missing, _ = _gpu_unavailable_description(
+ "GPU power_management", [1, 2], expected="DISABLED"
+ )
+ assert "GPU 1" in desc_missing
+ assert "GPU 2" in desc_missing
+ assert "expected DISABLED" in desc_missing
+
+
+def test_check_expected_power_management_success(mock_analyzer):
+ """All GPUs with DISABLED power_management pass."""
+ analyzer = mock_analyzer
+ metrics = [
+ _minimal_amdsmi_metric(0, power_management="DISABLED"),
+ _minimal_amdsmi_metric(1, power_management="disabled"),
+ ]
+ analyzer.check_expected_power_management(metrics, "DISABLED")
+ assert len(analyzer.result.events) == 0
+
+
+def test_check_expected_power_management_mismatch(mock_analyzer):
+ """ENABLED power_management fails when DISABLED is expected."""
+ analyzer = mock_analyzer
+ metrics = [
+ _minimal_amdsmi_metric(0, power_management="DISABLED"),
+ _minimal_amdsmi_metric(1, power_management="ENABLED"),
+ ]
+ analyzer.check_expected_power_management(metrics, "DISABLED")
+ assert len(analyzer.result.events) == 1
+ assert analyzer.result.events[0].priority == EventPriority.ERROR
+ assert "power_management mismatch" in analyzer.result.events[0].description
+ assert "GPU 1=ENABLED" in analyzer.result.events[0].description
+ assert analyzer.result.events[0].data["details"] == ("GPU 1: expected DISABLED, actual ENABLED")
+
+
+def test_check_expected_power_management_missing(mock_analyzer):
+ """Missing power_management logs warning."""
+ analyzer = mock_analyzer
+ metrics = [_minimal_amdsmi_metric(0, power_management=None)]
+ analyzer.check_expected_power_management(metrics, "DISABLED")
+ assert len(analyzer.result.events) == 1
+ assert analyzer.result.events[0].priority == EventPriority.WARNING
+ assert "not available" in analyzer.result.events[0].description
+
+
+def test_analyze_data_expected_power_management(mock_analyzer):
+ """analyze_data accepts expected_power_management from plugin config."""
+ analyzer = mock_analyzer
+ data = AmdSmiDataModel(
+ metric=[
+ _minimal_amdsmi_metric(0, power_management="DISABLED"),
+ _minimal_amdsmi_metric(1, power_management="DISABLED"),
+ ]
+ )
+ args = AmdSmiAnalyzerArgs(expected_power_management="DISABLED")
+ result = analyzer.analyze_data(data, args)
+ assert not any("power_management mismatch" in e.description for e in result.events)
+
+
+def test_amdsmi_analyzer_args_rejects_unknown_fields():
+ """Plugin config must only use declared AmdSmiAnalyzerArgs fields."""
+ from pydantic import ValidationError
+
+ with pytest.raises(ValidationError):
+ AmdSmiAnalyzerArgs.model_validate(
+ {"expected_power_management": "DISABLED", "not_a_field": 1}
+ )
+
+
def test_check_amdsmi_metric_pcie_width_fail(mock_analyzer):
"""PCIe width not x16 generates error."""
analyzer = mock_analyzer
@@ -1014,9 +1125,16 @@ def test_check_amdsmi_metric_ecc_totals(mock_analyzer):
]
analyzer.check_amdsmi_metric_ecc_totals(metrics)
assert len(analyzer.result.events) == 5
- # Analyzer uses generic description "GPU ECC error count detected"; type is in data["error_type"]
+ expected_descriptions = {
+ "GPU 0: Total correctable ECC errors count=1",
+ "GPU 1: Total uncorrectable ECC errors count=1",
+ "GPU 2: Total deferred ECC errors count=1",
+ "GPU 3: Cache correctable ECC errors count=1",
+ "GPU 4: Cache uncorrectable ECC errors count=1",
+ }
+ assert {e.description for e in analyzer.result.events} == expected_descriptions
for e in analyzer.result.events:
- assert e.description == "GPU ECC error count detected"
+ assert e.description == e.data["details"]
assert "error_type" in e.data and "error_count" in e.data
error_types = [e.data["error_type"] for e in analyzer.result.events]
assert "Total correctable ECC errors" in error_types
diff --git a/test/unit/plugin/test_amdsmi_data.py b/test/unit/plugin/test_amdsmi_data.py
new file mode 100644
index 00000000..9e28fbb9
--- /dev/null
+++ b/test/unit/plugin/test_amdsmi_data.py
@@ -0,0 +1,514 @@
+###############################################################################
+#
+# MIT License
+#
+# Copyright (c) 2026 Advanced Micro Devices, Inc.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
+###############################################################################
+"""Unit tests for amd-smi pydantic models (ROCm 7.13 / legacy JSON shapes)."""
+
+from typing import Any, Optional
+
+import pytest
+
+from nodescraper.plugins.inband.amdsmi.amdsmidata import (
+ AmdSmiDataModel,
+ AmdSmiMetric,
+ MetricClockData,
+ MetricPcie,
+ MetricPower,
+ StaticClockData,
+ StaticFrequencyLevels,
+ StaticLimit,
+ ValueUnit,
+)
+
+DUMMY_STATIC_ASIC: dict[str, Any] = {
+ "market_name": "AMD Instinct MI350X",
+ "vendor_id": "0x1002",
+ "vendor_name": "Advanced Micro Devices Inc. [AMD/ATI]",
+ "subvendor_id": "0x1002",
+ "device_id": "0x75a0",
+ "subsystem_id": "0x75a0",
+ "rev_id": "0x00",
+ "asic_serial": "0xDUMMYASIC00000001",
+ "oam_id": 6,
+ "num_compute_units": 256,
+ "target_graphics_version": "gfx950",
+}
+
+DUMMY_STATIC_BUS: dict[str, Any] = {
+ "bdf": "0000:05:00.0",
+ "max_pcie_width": 16,
+ "max_pcie_speed": {"value": 32, "unit": "GT/s"},
+ "pcie_interface_version": "Gen 5",
+ "slot_type": "OAM",
+}
+
+DUMMY_STATIC_BOARD: dict[str, str] = {
+ "model_number": "DUMMY-MI350X-OAM",
+ "product_serial": "DUMMY-SN-000001",
+ "fru_id": "DUMMY-FRU",
+ "product_name": "AMD Instinct MI350X OAM",
+ "manufacturer_name": "AMD",
+}
+
+DUMMY_STATIC_DRIVER: dict[str, Optional[str]] = {
+ "driver_name": "amdgpu",
+ "driver_version": "6.12.12",
+ "os_kernel_version": "6.8.0-dummy",
+}
+
+DUMMY_STATIC_RAS: dict[str, Any] = {
+ "eeprom_version": "N/A",
+ "parity_schema": "DISABLED",
+ "single_bit_schema": "ENABLED",
+ "double_bit_schema": "ENABLED",
+ "poison_schema": "ENABLED",
+ "ecc_block_state": {},
+}
+
+DUMMY_STATIC_VRAM: dict[str, Any] = {
+ "type": "HBM3",
+ "vendor": "Micron",
+ "size": {"value": 192, "unit": "GB"},
+ "bit_width": {"value": 4096, "unit": "bit"},
+}
+
+DUMMY_LIMIT_LEGACY: dict[str, Any] = {
+ "max_power": {"value": 550, "unit": "W"},
+ "min_power": {"value": 0, "unit": "W"},
+ "socket_power": {"value": 0, "unit": "W"},
+}
+
+DUMMY_LIMIT_PPT0: dict[str, Any] = {
+ "ppt0": {
+ "max_power_limit": {"value": 750, "unit": "W"},
+ "min_power_limit": {"value": 0, "unit": "W"},
+ "socket_power_limit": {"value": 600, "unit": "W"},
+ },
+ "ppt1": {
+ "max_power_limit": "N/A",
+ "min_power_limit": "N/A",
+ "socket_power_limit": "N/A",
+ },
+}
+
+DUMMY_CLOCK_LEGACY_FREQUENCY: dict[str, Any] = {
+ "frequency": [500_000_000, 900_000_000, 1_300_000_000],
+ "current": 0,
+}
+
+DUMMY_CLOCK_FREQUENCY_LEVELS: dict[str, Any] = {
+ "GFX_0": {
+ "frequency_levels": {
+ "Level 0": {"value": 500, "unit": "MHz"},
+ "Level 1": "900 MHz",
+ "Level 2": "1300 MHz",
+ },
+ "current level": 0,
+ }
+}
+
+DUMMY_METRIC_USAGE: dict[str, Any] = {
+ "gfx_activity": {"value": 0, "unit": "%"},
+ "umc_activity": {"value": 0, "unit": "%"},
+ "mm_activity": "N/A",
+ "vcn_activity": ["N/A", "N/A", "N/A", "N/A"],
+ "jpeg_activity": ["N/A"] * 12,
+ "gfx_busy_inst": None,
+ "jpeg_busy": None,
+ "vcn_busy": None,
+}
+
+DUMMY_METRIC_POWER: dict[str, Any] = {
+ "socket_power": {"value": 140, "unit": "W"},
+ "gfx_voltage": "N/A",
+ "soc_voltage": "N/A",
+ "mem_voltage": "N/A",
+ "ubb_power": {"value": 1200, "unit": "W"},
+ "throttle_status": "UNTHROTTLED",
+ "power_management": "DISABLED",
+}
+
+DUMMY_METRIC_PCIE: dict[str, Any] = {
+ "width": 16,
+ "speed": {"value": 32, "unit": "GT/s"},
+ "bandwidth": {"value": 512, "unit": "Gb/s"},
+ "replay_count": 0,
+ "l0_to_recovery_count": 0,
+ "replay_roll_over_count": 0,
+ "nak_sent_count": 0,
+ "nak_received_count": 0,
+ "current_bandwidth_sent": 0,
+ "current_bandwidth_received": 0,
+ "max_packet_size": 512,
+ "lc_perf_other_end_recovery_count": 1,
+}
+
+DUMMY_METRIC_TEMPERATURE: dict[str, Any] = {
+ "edge": "N/A",
+ "hotspot": {"value": 37, "unit": "C"},
+ "mem": {"value": 42, "unit": "C"},
+}
+
+DUMMY_METRIC_GPUBOARD: dict[str, Any] = {
+ "temperature": {
+ "NODE_RETIMER_X": 43,
+ "NODE_OAM_X_IBC": 53,
+ }
+}
+
+DUMMY_METRIC_BASEBOARD: dict[str, Any] = {
+ "temperature": {
+ "UBB_FRONT": 55,
+ "UBB_FPGA": 78,
+ }
+}
+
+_PCIE_KEYS = list(DUMMY_METRIC_PCIE.keys())
+
+
+def _deep_update(base: dict[str, Any], overrides: dict[str, Any]) -> dict[str, Any]:
+ """Shallow merge with one level of nested dict merge."""
+ out = base.copy()
+ for key, val in overrides.items():
+ if isinstance(val, dict) and isinstance(out.get(key), dict):
+ nested = out[key].copy()
+ nested.update(val)
+ out[key] = nested
+ else:
+ out[key] = val
+ return out
+
+
+def dummy_static_gpu_dict(
+ *,
+ gpu: int = 0,
+ limit: Optional[dict[str, Any]] = None,
+ clock: Optional[dict[str, Any]] = None,
+ **overrides: Any,
+) -> dict[str, Any]:
+ """Build a minimal static GPU dict for ``AmdSmiStatic.model_validate``."""
+ base: dict[str, Any] = {
+ "gpu": gpu,
+ "asic": dict(DUMMY_STATIC_ASIC),
+ "bus": dict(DUMMY_STATIC_BUS),
+ "driver": dict(DUMMY_STATIC_DRIVER),
+ "board": dict(DUMMY_STATIC_BOARD),
+ "ras": dict(DUMMY_STATIC_RAS),
+ "process_isolation": "",
+ "numa": {"node": 0, "affinity": 0},
+ "vram": dict(DUMMY_STATIC_VRAM),
+ "cache_info": [],
+ "limit": limit,
+ "clock": clock,
+ }
+ return _deep_update(base, overrides)
+
+
+def dummy_metric_dict(
+ *,
+ gpu: int = 0,
+ usage: Any = None,
+ power: Optional[dict[str, Any]] = None,
+ pcie: Optional[dict[str, Any]] = None,
+ **overrides: Any,
+) -> dict[str, Any]:
+ """Build a minimal metric GPU dict for ``AmdSmiMetric.model_validate``."""
+ base: dict[str, Any] = {
+ "gpu": gpu,
+ "usage": DUMMY_METRIC_USAGE if usage is None else usage,
+ "power": dict(DUMMY_METRIC_POWER) if power is None else power,
+ "clock": {
+ "GFX_0": {
+ "clk": {"value": 132, "unit": "MHz"},
+ "min_clk": {"value": 500, "unit": "MHz"},
+ "max_clk": {"value": 2100, "unit": "MHz"},
+ }
+ },
+ "temperature": dict(DUMMY_METRIC_TEMPERATURE),
+ "pcie": dict(DUMMY_METRIC_PCIE) if pcie is None else pcie,
+ "ecc": {
+ "total_correctable_count": 0,
+ "total_uncorrectable_count": 0,
+ "total_deferred_count": 0,
+ "cache_correctable_count": 0,
+ "cache_uncorrectable_count": 0,
+ },
+ "ecc_blocks": {},
+ "fan": {"speed": "N/A", "max": "N/A", "rpm": "N/A", "usage": "N/A"},
+ "voltage_curve": None,
+ "perf_level": "auto",
+ "xgmi_err": "N/A",
+ "energy": None,
+ "mem_usage": {
+ "total_vram": {"value": 192, "unit": "GB"},
+ "used_vram": {"value": 0, "unit": "GB"},
+ "free_vram": {"value": 192, "unit": "GB"},
+ "total_visible_vram": None,
+ "used_visible_vram": None,
+ "free_visible_vram": None,
+ "total_gtt": None,
+ "used_gtt": None,
+ "free_gtt": None,
+ },
+ "throttle": {},
+ "gpuboard": dict(DUMMY_METRIC_GPUBOARD),
+ "baseboard": dict(DUMMY_METRIC_BASEBOARD),
+ }
+ return _deep_update(base, overrides)
+
+
+def dummy_amdsmi_data_dict(
+ *,
+ static: Optional[list[dict[str, Any]]] = None,
+ metric: Optional[list[dict[str, Any]]] = None,
+ **overrides: Any,
+) -> dict[str, Any]:
+ """Build top-level ``AmdSmiDataModel`` input with dummy static + metric lists."""
+ base: dict[str, Any] = {
+ "static": (
+ static
+ if static is not None
+ else [
+ dummy_static_gpu_dict(
+ limit=dict(DUMMY_LIMIT_PPT0), clock=dict(DUMMY_CLOCK_FREQUENCY_LEVELS)
+ ),
+ ]
+ ),
+ "metric": metric if metric is not None else [dummy_metric_dict(gpu=0)],
+ }
+ return _deep_update(base, overrides)
+
+
+def _pcie_dict(**overrides: Any) -> MetricPcie:
+ """Full PCIe model from dummy defaults plus overrides."""
+ base = dict(DUMMY_METRIC_PCIE)
+ base.update(overrides)
+ for key, val in list(base.items()):
+ if isinstance(val, ValueUnit):
+ base[key] = {"value": val.value, "unit": val.unit}
+ return MetricPcie.model_validate(base)
+
+
+@pytest.mark.parametrize(
+ "level_input,expected_value,expected_unit",
+ [
+ ("138 MHz", 138, "MHz"),
+ ({"value": 500, "unit": "MHz"}, 500, "MHz"),
+ (900, 900, ""),
+ (ValueUnit(value=1300, unit="MHz"), 1300, "MHz"),
+ ],
+)
+def test_static_frequency_levels_coerce_to_value_unit(level_input, expected_value, expected_unit):
+ """Static frequency levels accept strings, dicts, numbers, and ValueUnit."""
+ levels = StaticFrequencyLevels.model_validate(
+ {"Level 0": level_input, "Level 1": None, "Level 2": None}
+ )
+ assert isinstance(levels.Level_0, ValueUnit)
+ assert levels.Level_0.value == expected_value
+ assert levels.Level_0.unit == expected_unit
+ assert levels.Level_1 is None
+ assert levels.Level_2 is None
+
+
+def test_static_frequency_levels_optional_levels():
+ """Level 1/2 coerce from amd-smi string and structured JSON forms."""
+ levels = StaticFrequencyLevels.model_validate(
+ DUMMY_CLOCK_FREQUENCY_LEVELS["GFX_0"]["frequency_levels"]
+ )
+ assert levels.Level_0.value == 500
+ assert levels.Level_1 is not None and levels.Level_1.value == 900
+ assert levels.Level_2 is not None and levels.Level_2.value == 1300
+
+
+def test_static_limit_legacy_max_power():
+ """Legacy flat max_power field still resolves."""
+ limit = StaticLimit.model_validate(DUMMY_LIMIT_LEGACY)
+ resolved = limit.resolved_max_power()
+ assert resolved is not None
+ assert resolved.value == 550
+
+
+def test_static_limit_ppt0_max_power():
+ """ROCm 7+ ppt0.max_power_limit resolves when flat max_power is absent."""
+ limit = StaticLimit.model_validate(DUMMY_LIMIT_PPT0)
+ assert limit.max_power is None
+ resolved = limit.resolved_max_power()
+ assert resolved is not None
+ assert resolved.value == 750
+
+
+def test_ref_max_power_w_prefers_ppt0_when_legacy_missing():
+ """Reference snapshot uses ppt0 on newer static JSON."""
+ data = AmdSmiDataModel.model_validate(
+ dummy_amdsmi_data_dict(
+ static=[dummy_static_gpu_dict(limit=dict(DUMMY_LIMIT_PPT0))],
+ )
+ )
+ assert data.ref_max_power_w == 750
+
+
+def test_ref_max_power_w_legacy_limit():
+ """Reference snapshot uses legacy max_power when present."""
+ data = AmdSmiDataModel.model_validate(
+ dummy_amdsmi_data_dict(
+ static=[dummy_static_gpu_dict(limit=dict(DUMMY_LIMIT_LEGACY))],
+ )
+ )
+ assert data.ref_max_power_w == 550
+
+
+def test_static_clock_frequency_levels_json():
+ """Static clock accepts ROCm 7+ frequency_levels object."""
+ clock = StaticClockData.model_validate(
+ {
+ "frequency_levels": DUMMY_CLOCK_FREQUENCY_LEVELS["GFX_0"]["frequency_levels"],
+ "current level": 0,
+ }
+ )
+ assert clock.frequency_levels.Level_0.value == 500
+ assert clock.frequency_levels.Level_1 is not None
+ assert clock.frequency_levels.Level_1.value == 900
+
+
+def test_amdsmi_data_model_dummy_metric_round_trip():
+ """Full dummy metric payload validates and preserves key ROCm 7.13 fields."""
+ metric = AmdSmiMetric.model_validate(dummy_metric_dict())
+ assert metric.gpu == 0
+ assert metric.power.power_management == "DISABLED"
+ assert metric.power.ubb_power is not None
+ assert metric.power.ubb_power.value == 1200
+ assert metric.pcie.width == 16
+ assert metric.pcie.lc_perf_other_end_recovery_count == 1
+ assert metric.gpuboard is not None
+ assert metric.baseboard is not None
+
+
+def test_metric_pcie_lc_perf_alias():
+ """Metric PCIe accepts legacy lc_perf_other_end_recovery JSON key."""
+ pcie = _pcie_dict(lc_perf_other_end_recovery=2)
+ assert pcie.lc_perf_other_end_recovery_count == 2
+ assert pcie.lc_perf_other_end_recovery == 2
+
+
+def test_metric_pcie_lc_perf_count_key():
+ """Metric PCIe accepts renamed lc_perf_other_end_recovery_count key."""
+ pcie = _pcie_dict(lc_perf_other_end_recovery_count=3)
+ assert pcie.lc_perf_other_end_recovery_count == 3
+ assert pcie.lc_perf_other_end_recovery == 3
+
+
+def test_metric_legacy_mi300_shape():
+ """Legacy metric JSON without MI355-only clock/board fields still validates."""
+ pcie_legacy = {
+ k: v for k, v in DUMMY_METRIC_PCIE.items() if k != "lc_perf_other_end_recovery_count"
+ }
+ pcie_legacy["lc_perf_other_end_recovery"] = 1
+ metric = AmdSmiMetric.model_validate(
+ dummy_metric_dict(
+ pcie=pcie_legacy,
+ gpuboard=None,
+ baseboard=None,
+ )
+ )
+ assert metric.gpuboard is None
+ assert metric.baseboard is None
+ assert isinstance(metric.clock["GFX_0"], MetricClockData)
+ assert metric.clock["GFX_0"].clk is not None
+ assert metric.pcie.lc_perf_other_end_recovery_count == 1
+ assert metric.pcie.lc_perf_other_end_recovery == 1
+
+
+def test_metric_power_ubb_coercion():
+ """Metric power accepts ubb_power as string or ValueUnit."""
+ power = MetricPower.model_validate(
+ {
+ **DUMMY_METRIC_POWER,
+ "ubb_power": "1200 W",
+ }
+ )
+ assert power.ubb_power is not None
+ assert power.ubb_power.value == 1200
+ assert power.ubb_power.unit == "W"
+
+
+def test_metric_usage_string_na():
+ """Whole usage block may be N/A on some platforms."""
+ metric = AmdSmiMetric.model_validate(dummy_metric_dict(usage="N/A"))
+ assert metric.usage == "N/A"
+
+
+def test_metric_gpuboard_baseboard_optional():
+ """Store gpuboard/baseboard sections from ROCm 7.1+ metric output."""
+ metric = AmdSmiMetric.model_validate(
+ dummy_metric_dict(
+ gpuboard=DUMMY_METRIC_GPUBOARD,
+ baseboard=DUMMY_METRIC_BASEBOARD,
+ )
+ )
+ assert metric.gpuboard == DUMMY_METRIC_GPUBOARD
+ assert metric.baseboard == DUMMY_METRIC_BASEBOARD
+
+
+def test_metric_gpuboard_gpu_board_alias():
+ """amd-smi may emit gpu_board / base_board instead of gpuboard / baseboard."""
+ metric = AmdSmiMetric.model_validate(
+ dummy_metric_dict(
+ **{
+ "gpu_board": DUMMY_METRIC_GPUBOARD,
+ "base_board": DUMMY_METRIC_BASEBOARD,
+ }
+ )
+ )
+ assert metric.gpuboard == DUMMY_METRIC_GPUBOARD
+ assert metric.baseboard == DUMMY_METRIC_BASEBOARD
+
+
+def test_metric_clock_per_aid_na_maps():
+ """MI355 metric clock may expose per-AID/MID maps instead of MetricClockData leaves."""
+ metric = AmdSmiMetric.model_validate(
+ dummy_metric_dict(
+ clock={
+ "GFX_0": {
+ "clk": {"value": 132, "unit": "MHz"},
+ "min_clk": {"value": 500, "unit": "MHz"},
+ "max_clk": {"value": 2100, "unit": "MHz"},
+ "clk_locked": 0,
+ "deep_sleep": 0,
+ },
+ "uclk_aid": {"AID_0": "N/A", "AID_1": "N/A"},
+ "socclks_mid": {"MID_0": "N/A", "MID_1": "N/A"},
+ },
+ pcie={
+ **DUMMY_METRIC_PCIE,
+ "current_bandwidth_sent": "N/A",
+ "current_bandwidth_received": "N/A",
+ "max_packet_size": "N/A",
+ "lc_perf_other_end_recovery_count": 0,
+ },
+ )
+ )
+ assert isinstance(metric.clock["uclk_aid"], dict)
+ assert metric.clock["uclk_aid"]["AID_0"] == "N/A"
+ assert isinstance(metric.clock["GFX_0"], MetricClockData)
+ assert metric.pcie.lc_perf_other_end_recovery_count == 0