From efe4705c4ceb0c9c87db4373837a3fc03e90742d Mon Sep 17 00:00:00 2001
From: Li Baoming <1508269885@qq.com>
Date: Wed, 11 Mar 2026 18:21:18 +0800
Subject: [PATCH 1/5] feat: dashboard reads data from MongoDB
---
dashboard/app.py | 41 ++-
dashboard/common.py | 31 +-
dashboard/pages/communication.py | 12 +-
dashboard/pages/inference.py | 12 +-
dashboard/pages/operator.py | 22 +-
dashboard/utils/data_loader.py | 387 ++++++++-------------
dashboard/utils/data_sources.py | 255 ++++++++++++++
dashboard/utils/data_utils.py | 114 ++++++
db/__init__.py | 9 +-
db/importer.py | 279 +++++++++++++++
db/utils.py | 163 +++++++++
infinimetrics/hardware/hardware_adapter.py | 3 +-
pyproject.toml | 3 +
13 files changed, 1049 insertions(+), 282 deletions(-)
create mode 100644 dashboard/utils/data_sources.py
create mode 100644 dashboard/utils/data_utils.py
create mode 100644 db/importer.py
create mode 100644 db/utils.py
diff --git a/dashboard/app.py b/dashboard/app.py
index 705177e..2945147 100644
--- a/dashboard/app.py
+++ b/dashboard/app.py
@@ -13,7 +13,8 @@
sys.path.append(str(project_root))
from components.header import render_header
-from utils.data_loader import InfiniMetricsDataLoader, load_summary_file
+from utils.data_loader import InfiniMetricsDataLoader
+from common import show_data_source_info
# Page configuration
st.set_page_config(
@@ -28,6 +29,8 @@
st.session_state.data_loader = InfiniMetricsDataLoader()
if "selected_accelerators" not in st.session_state:
st.session_state.selected_accelerators = []
+if "use_mongodb" not in st.session_state:
+ st.session_state.use_mongodb = False
def main():
@@ -40,11 +43,34 @@ def main():
with st.sidebar:
st.markdown("## ⚙️ 设置")
+ # Data source selection
+ use_mongodb = st.toggle(
+ "使用 MongoDB",
+ value=st.session_state.use_mongodb,
+ help="切换到 MongoDB 数据源(需要 MongoDB 服务运行中)",
+ )
+
+ if use_mongodb != st.session_state.use_mongodb:
+ st.session_state.use_mongodb = use_mongodb
+ if use_mongodb:
+ st.session_state.data_loader = InfiniMetricsDataLoader(
+ use_mongodb=True, fallback_to_files=True
+ )
+ else:
+ st.session_state.data_loader = InfiniMetricsDataLoader()
+
+ # Show current data source
+ show_data_source_info(style="sidebar")
+
+ st.markdown("---")
+
results_dir = st.text_input(
- "测试结果目录", value="./test_output", help="包含 JSON/CSV 测试结果的目录"
+ "测试结果目录", value="../output", help="包含 JSON/CSV 测试结果的目录"
)
- if results_dir != str(st.session_state.data_loader.results_dir):
+ if not use_mongodb and results_dir != str(
+ st.session_state.data_loader.results_dir
+ ):
st.session_state.data_loader = InfiniMetricsDataLoader(results_dir)
auto_refresh = st.toggle("自动刷新", value=False)
@@ -102,8 +128,9 @@ def render_dashboard(run_id_filter: str):
">
InfiniMetrics Dashboard 用于统一展示
通信(NCCL / 集合通信)、
- 推理(Direct / Service)、
- 算子(核心算子性能)
+ 推理(直接推理 / 服务性能)、
+ 算子(核心算子性能)、
+ 硬件(内存带宽 / 缓存性能)
等 AI 加速卡性能测试结果。
测试框架输出 JSON(环境 / 配置 / 标量指标) +
@@ -230,10 +257,10 @@ def _latest(lst):
st.dataframe(df, use_container_width=True, hide_index=True)
# ========== Dispatcher summary ==========
- summaries = load_summary_file()
+ summaries = st.session_state.data_loader.load_summaries()
if not summaries:
- st.info("No dispatcher_summary file found")
+ st.info("未找到 Dispatcher 汇总记录")
return
st.markdown("### 🧾 Dispatcher 汇总记录")
diff --git a/dashboard/common.py b/dashboard/common.py
index 73b15ca..34ec248 100644
--- a/dashboard/common.py
+++ b/dashboard/common.py
@@ -21,8 +21,35 @@ def init_page(page_title: str, page_icon: str):
# Page configuration
st.set_page_config(page_title=page_title, page_icon=page_icon, layout="wide")
- # Initialize DataLoader
+ # Initialize use_mongodb setting if not exists
+ if "use_mongodb" not in st.session_state:
+ st.session_state.use_mongodb = False
+
+ # Initialize DataLoader (respect MongoDB setting)
if "data_loader" not in st.session_state:
from utils.data_loader import InfiniMetricsDataLoader
- st.session_state.data_loader = InfiniMetricsDataLoader()
+ st.session_state.data_loader = InfiniMetricsDataLoader(
+ use_mongodb=st.session_state.use_mongodb,
+ fallback_to_files=True,
+ )
+
+
+def show_data_source_info(style: str = "caption"):
+ """
+ Display current data source info (MongoDB or file system).
+
+ Args:
+ style: Display style - "caption" for pages, "sidebar" for main app sidebar
+ """
+ dl = st.session_state.data_loader
+ if dl.source_type == "mongodb":
+ if style == "sidebar":
+ st.success("🟢 数据源: MongoDB")
+ else:
+ st.caption("数据源: MongoDB")
+ else:
+ if style == "sidebar":
+ st.info(f"📁 数据源: 文件系统 ({dl.results_dir})")
+ else:
+ st.caption(f"数据源: 文件系统 ({dl.results_dir})")
diff --git a/dashboard/pages/communication.py b/dashboard/pages/communication.py
index e0a9c9f..202c40e 100644
--- a/dashboard/pages/communication.py
+++ b/dashboard/pages/communication.py
@@ -4,7 +4,7 @@
import streamlit as st
import pandas as pd
-from common import init_page
+from common import init_page, show_data_source_info
from components.header import render_header
from utils.data_loader import get_friendly_size
from utils.metrics import extract_core_metrics
@@ -17,7 +17,7 @@
create_summary_table_infer,
)
-init_page("推理测试分析 | InfiniMetrics", "🔗")
+init_page("通信测试分析 | InfiniMetrics", "🔗")
def main():
@@ -25,9 +25,11 @@ def main():
render_header()
st.markdown("## 🔗 通信性能测试分析")
+ show_data_source_info()
+
try:
# Load communication test results
- comm_runs = st.session_state.data_loader.list_test_runs("comm")
+ comm_runs = dl.list_test_runs("comm")
if not comm_runs:
st.info("未找到通信测试结果")
@@ -117,7 +119,9 @@ def main():
for name in selected_indices:
idx = run_options[name]
run_info = filtered_runs[idx]
- result = st.session_state.data_loader.load_test_result(run_info["path"])
+ # Use path for file source, run_id for MongoDB
+ identifier = run_info.get("path") or run_info.get("run_id")
+ result = dl.load_test_result(identifier)
run_info["data"] = result
selected_runs.append(run_info)
diff --git a/dashboard/pages/inference.py b/dashboard/pages/inference.py
index e9db234..74768c1 100644
--- a/dashboard/pages/inference.py
+++ b/dashboard/pages/inference.py
@@ -4,9 +4,8 @@
import streamlit as st
import pandas as pd
-from common import init_page
+from common import init_page, show_data_source_info
from components.header import render_header
-from utils.data_loader import InfiniMetricsDataLoader, get_friendly_size
from utils.visualizations import (
plot_timeseries_auto,
create_summary_table_infer,
@@ -19,8 +18,9 @@ def main():
render_header()
st.markdown("## 🚀 推理性能测试分析")
- dl = st.session_state.data_loader
- runs = dl.list_test_runs("infer")
+ show_data_source_info()
+
+ runs = st.session_state.data_loader.list_test_runs("infer")
if not runs:
st.info("未找到推理测试结果(testcase 需以 infer.* 开头)。")
@@ -91,7 +91,9 @@ def _mode_of(r):
selected_runs = []
for k in selected:
ri = filtered[options[k]]
- data = dl.load_test_result(ri["path"])
+ # Use path for file source, run_id for MongoDB
+ identifier = ri.get("path") or ri.get("run_id")
+ data = dl.load_test_result(identifier)
ri = dict(ri)
ri["data"] = data
selected_runs.append(ri)
diff --git a/dashboard/pages/operator.py b/dashboard/pages/operator.py
index a53fd38..f388248 100644
--- a/dashboard/pages/operator.py
+++ b/dashboard/pages/operator.py
@@ -4,7 +4,7 @@
import streamlit as st
import pandas as pd
-from common import init_page
+from common import init_page, show_data_source_info
from components.header import render_header
from utils.visualizations import (
create_summary_table_ops,
@@ -18,24 +18,18 @@ def main():
render_header()
st.markdown("## ⚡ 算子测试分析")
- dl = st.session_state.data_loader
+ show_data_source_info()
- runs = dl.list_test_runs() # Load all test runs first
- # Identify operator runs by checking "operators" in path or testcase starting with operator/ops
+ runs = st.session_state.data_loader.list_test_runs()
+ # Identify operator runs by testcase starting with operator/ops
ops_runs = []
for r in runs:
- p = str(r.get("path", ""))
tc = (r.get("testcase") or "").lower()
- if (
- ("/operators/" in p.replace("\\", "/"))
- or tc.startswith("operator")
- or tc.startswith("operators")
- or tc.startswith("ops")
- ):
+ if tc.startswith(("operator", "operators", "ops")):
ops_runs.append(r)
if not ops_runs:
- st.info("未找到算子测试结果(请确认 JSON 在 test_output/operators/ 下)。")
+ st.info("未找到算子测试结果(请确认 JSON 在 output/operators 目录下)。")
return
with st.sidebar:
@@ -62,7 +56,9 @@ def main():
selected_runs = []
for k in selected:
ri = filtered[options[k]]
- data = dl.load_test_result(ri["path"])
+ # Use path for file source, run_id for MongoDB
+ identifier = ri.get("path") or ri.get("run_id")
+ data = dl.load_test_result(identifier)
ri = dict(ri)
ri["data"] = data
selected_runs.append(ri)
diff --git a/dashboard/utils/data_loader.py b/dashboard/utils/data_loader.py
index f2b9c66..5d6cccf 100644
--- a/dashboard/utils/data_loader.py
+++ b/dashboard/utils/data_loader.py
@@ -1,277 +1,168 @@
#!/usr/bin/env python3
-"""Data loading utilities for InfiniMetrics dashboard."""
+"""Unified data loader for InfiniMetrics dashboard."""
-import json
-import csv
-import pandas as pd
-from pathlib import Path
-from typing import Dict, List, Any, Optional, Tuple
import logging
+from pathlib import Path
+from typing import Any, Dict, List, Optional
-logger = logging.getLogger(__name__)
-
-
-class InfiniMetricsDataLoader:
- """Load and parse InfiniMetrics test results."""
-
- def __init__(self, results_dir: str = "./test_output"):
- self.results_dir = Path(results_dir)
-
- def list_test_runs(self, test_type: str = None) -> List[Dict[str, Any]]:
- """List all test runs, filtering out summary files."""
- runs = []
-
- # Search for JSON result files
- for json_file in self.results_dir.rglob("*.json"):
- try:
- # Skip summary files and dispatcher files
- if (
- "summary" in json_file.name.lower()
- or "dispatcher" in json_file.name.lower()
- ):
- continue
-
- with open(json_file, "r", encoding="utf-8") as f:
- data = json.load(f)
-
- # Filter: must be a test result file, not a summary file
- if not self._is_test_result_file(data):
- continue
-
- # Filter by test type if specified
- testcase = data.get("testcase", "")
- if test_type and not testcase.startswith(test_type):
- continue
-
- # Extract basic info
- run_info = self._extract_run_info(data, json_file)
-
- # Extract the accelerator card type
- run_info["accelerator_types"] = extract_accelerator_types(data)
- runs.append(run_info)
-
- except Exception as e:
- logger.debug(f"Skipping file {json_file}: {e}")
-
- # Sort by time (newest first)
- runs.sort(key=lambda x: x["time"], reverse=True)
- return runs
-
- def load_test_result(self, json_path: Path) -> Dict[str, Any]:
- """Load a single test result with all data."""
- with open(json_path, "r", encoding="utf-8") as f:
- data = json.load(f)
-
- # Load associated CSV files
- for metric in data.get("metrics", []):
- csv_url = metric.get("raw_data_url")
- if csv_url and not csv_url.startswith("http"):
- # Get the correct base directory
- base_dir = self._get_csv_base_dir(data, json_path)
- csv_path = self._resolve_csv_path(csv_url, base_dir)
-
- if csv_path and csv_path.exists():
- try:
- df = pd.read_csv(csv_path)
- metric["data"] = df
- metric["data_columns"] = list(df.columns)
- metric["csv_path"] = str(csv_path)
- except Exception as e:
- logger.warning(f"Failed to load CSV {csv_path}: {e}")
- metric["data"] = None
- else:
- logger.debug(f"CSV not found: {csv_url} (base: {base_dir})")
- metric["data"] = None
-
- return data
-
- def load_csv_data(
- self, csv_url: str, json_data: Dict[str, Any], json_path: Path
- ) -> Optional[pd.DataFrame]:
- """Load CSV data file using proper path resolution."""
- try:
- if csv_url.startswith("http"):
- return None
-
- base_dir = self._get_csv_base_dir(json_data, json_path)
- csv_path = self._resolve_csv_path(csv_url, base_dir)
+import pandas as pd
- if csv_path and csv_path.exists():
- return pd.read_csv(csv_path)
- except Exception as e:
- logger.error(f"Failed to load CSV {csv_url}: {e}")
- return None
+from .data_sources import DataSource, FileDataSource, MongoDataSource
+from .data_utils import extract_accelerator_types, extract_run_info, get_friendly_size
- def _is_test_result_file(self, data: Dict[str, Any]) -> bool:
- """Check if JSON file is a test result (not a summary)."""
- # Must have these fields
- required = ["run_id", "testcase", "config"]
- if not all(key in data for key in required):
- return False
+logger = logging.getLogger(__name__)
- # Should have metrics
- if "metrics" not in data:
- return False
- return True
+class InfiniMetricsDataLoader:
+ """
+ Unified data loader supporting multiple sources.
- def _extract_run_info(
- self, data: Dict[str, Any], json_path: Path
- ) -> Dict[str, Any]:
- """Extract run info from test result data."""
- config = data.get("config", {})
- resolved = data.get("resolved", {})
+ Supports:
+ - File-based loading (default)
+ - MongoDB-based loading
+ - Automatic fallback from MongoDB to files
+ """
- # Device used: try resolved first, then config
- device_used = (
- resolved.get("device_used")
- or config.get("device_used")
- or config.get("device_involved", 1)
- )
+ def __init__(
+ self,
+ results_dir: str = "../output",
+ use_mongodb: bool = False,
+ mongo_config=None,
+ fallback_to_files: bool = True,
+ ):
+ """
+ Initialize the data loader.
- # Nodes: try resolved first, then environment
- nodes = resolved.get("nodes") or data.get("environment", {}).get(
- "cluster_scale", 1
+ Args:
+ results_dir: Directory containing test result files
+ use_mongodb: If True, use MongoDB as primary data source
+ mongo_config: Optional MongoDB configuration
+ fallback_to_files: If True, fall back to file loading if MongoDB fails
+ """
+ self.results_dir = Path(results_dir)
+ self._fallback_to_files = fallback_to_files
+ self._use_mongodb = use_mongodb
+ self._mongo_config = mongo_config
+ self._source: Optional[DataSource] = None
+
+ if use_mongodb:
+ self._init_mongodb_source()
+ else:
+ self._source = FileDataSource(results_dir)
+
+ def _init_mongodb_source(self):
+ """Initialize MongoDB data source with optional fallback."""
+ mongo_source = MongoDataSource(self._mongo_config)
+
+ if mongo_source.is_connected():
+ self._source = mongo_source
+ elif self._fallback_to_files:
+ logger.warning("MongoDB unavailable, falling back to file-based loading")
+ self._source = FileDataSource(str(self.results_dir))
+ self._use_mongodb = False
+ else:
+ raise RuntimeError("MongoDB connection failed and fallback is disabled")
+
+ @property
+ def source_type(self) -> str:
+ """Get the current data source type."""
+ return self._source.source_type if self._source else "none"
+
+ @property
+ def is_connected(self) -> bool:
+ """Check if data source is available."""
+ return self._source is not None
+
+ @property
+ def is_using_mongodb(self) -> bool:
+ """Check if currently using MongoDB."""
+ return (
+ self._use_mongodb and self._source and self._source.source_type == "mongodb"
)
- # Success: use result_code if available, fallback to success field
- result_code = data.get("result_code", 1)
- success = result_code == 0
-
- # Extract metrics count and types
- metrics = data.get("metrics", [])
- metric_types = [
- m.get("name", "").split(".")[0] for m in metrics if m.get("name")
- ]
-
- return {
- "path": json_path,
- "testcase": data.get("testcase", "unknown"),
- "run_id": data.get("run_id", "unknown"),
- "time": data.get("time", ""),
- "success": success,
- "result_code": result_code,
- "test_type": self._extract_test_type(data.get("testcase", "")),
- "operation": self._extract_operation(data.get("testcase", "")),
- "config": config,
- "resolved": resolved,
- "device_used": device_used,
- "nodes": nodes,
- "metrics_count": len(metrics),
- "metric_types": list(set(metric_types)),
- }
-
- def _get_csv_base_dir(self, json_data: Dict[str, Any], json_path: Path) -> Path:
- """Get the correct base directory for CSV files."""
- # First try: use output_dir from config
- config = json_data.get("config", {})
- output_dir = config.get("output_dir")
-
- if output_dir:
- output_path = Path(output_dir)
- if output_path.is_absolute():
- return output_path
- # Relative path: resolve relative to JSON file location
- return json_path.parent / output_dir
-
- # Second try: use JSON file's parent directory
- return json_path.parent
-
- def _resolve_csv_path(self, csv_url: str, base_dir: Path) -> Optional[Path]:
+ def switch_to_mongodb(self, mongo_config=None) -> bool:
"""
- Resolve CSV path from raw_data_url and base_dir.
+ Switch to MongoDB data source.
- Handles cases like:
- - base_dir/output/communication + "./comm/xxx.csv" but file is actually base_dir/"xxx.csv"
- - base_dir/output/infer + "./infer/xxx.csv" but file is base_dir/"xxx.csv"
+ Returns:
+ True if switch was successful
"""
- try:
- if not csv_url:
- return None
-
- # strip leading "./"
- rel = csv_url[2:] if csv_url.startswith("./") else csv_url
- rel_path = Path(rel)
-
- candidates = []
+ if mongo_config:
+ self._mongo_config = mongo_config
- # 1) base_dir / rel
- candidates.append(base_dir / rel_path)
+ mongo_source = MongoDataSource(self._mongo_config)
- # 2) base_dir / basename (most common fallback for your current layout)
- candidates.append(base_dir / rel_path.name)
-
- # 3) base_dir.parent / rel (just in case)
- candidates.append(base_dir.parent / rel_path)
-
- # 4) base_dir.parent / basename
- candidates.append(base_dir.parent / rel_path.name)
-
- for p in candidates:
- if p.exists():
- return p
+ if mongo_source.is_connected():
+ self._source = mongo_source
+ self._use_mongodb = True
+ return True
+ elif self._fallback_to_files:
+ logger.warning("Failed to switch to MongoDB, keeping current source")
+ return False
+ else:
+ raise RuntimeError("MongoDB connection failed")
- return None
- except Exception:
- return None
+ def switch_to_files(self, results_dir: str = None):
+ """Switch to file-based data source."""
+ if results_dir:
+ self.results_dir = Path(results_dir)
+ self._source = FileDataSource(str(self.results_dir))
+ self._use_mongodb = False
- def _extract_test_type(self, testcase: str) -> str:
- """Extract test type from testcase string."""
- parts = testcase.split(".")
- if len(parts) > 0:
- return parts[0] # comm, infer, operator, etc.
- return "unknown"
+ def list_test_runs(self, test_type: str = None) -> List[Dict[str, Any]]:
+ """List all test runs."""
+ if self._source is None:
+ return []
+ return self._source.list_test_runs(test_type)
- def _extract_operation(self, testcase: str) -> str:
- """Extract operation from testcase string."""
- parts = testcase.split(".")
- if len(parts) > 2:
- return parts[2] # AllReduce, Direct, Conv, etc.
- return "unknown"
+ def load_test_result(self, identifier) -> Dict[str, Any]:
+ """
+ Load a single test result with all data.
+ Args:
+ identifier: For file source, a Path to JSON file.
+ For MongoDB source, a run_id string.
+ """
+ if self._source is None:
+ return {}
+ return self._source.load_test_result(identifier)
-def load_summary_file(summary_path: str = "./summary_output") -> List[Dict[str, Any]]:
- """Load dispatcher summary files."""
- summaries = []
- summary_dir = Path(summary_path)
+ def load_summaries(self) -> List[Dict[str, Any]]:
+ """Load dispatcher summaries from the current data source."""
+ if self._source is None:
+ return []
+ return self._source.load_summaries()
- if summary_dir.exists():
- for json_file in sorted(
- summary_dir.glob("dispatcher_summary_*.json"), reverse=True
- ):
+ def load_csv_data(
+ self, csv_url: str, json_data: Dict[str, Any], json_path: Path
+ ) -> Optional[pd.DataFrame]:
+ """Load CSV data file using proper path resolution (file source only)."""
+ if isinstance(self._source, FileDataSource):
try:
- with open(json_file, "r", encoding="utf-8") as f:
- data = json.load(f)
- data["file"] = json_file.name
- data["timestamp"] = json_file.stem.replace("dispatcher_summary_", "")
- summaries.append(data)
- except Exception as e:
- logger.warning(f"Failed to load summary {json_file}: {e}")
+ if csv_url.startswith("http"):
+ return None
- return summaries
+ base_dir = self._source._get_csv_base_dir(json_data, json_path)
+ csv_path = self._source._resolve_csv_path(csv_url, base_dir)
-
-def get_friendly_size(size_bytes: int) -> str:
- """Convert bytes to human-readable size."""
- for unit in ["B", "KB", "MB", "GB", "TB"]:
- if size_bytes < 1024.0:
- return f"{size_bytes:.1f} {unit}"
- size_bytes /= 1024.0
- return f"{size_bytes:.1f} PB"
+ if csv_path and csv_path.exists():
+ return pd.read_csv(csv_path)
+ except Exception as e:
+ logger.error(f"Failed to load CSV {csv_url}: {e}")
+ return None
-def extract_accelerator_types(result_json: dict) -> list[str]:
- """
- Extract the accelerator card type from result_json
- """
- types = set()
- try:
- clusters = result_json.get("environment", {}).get("cluster", [])
- for node in clusters:
- accs = node.get("machine", {}).get("accelerators", [])
- for acc in accs:
- if "type" in acc:
- types.add(acc["type"])
- except Exception:
- pass
- return list(types)
+# Re-export from sibling modules
+from .data_sources import DataSource, FileDataSource, MongoDataSource
+from .data_utils import (
+ get_friendly_size,
+ extract_accelerator_types,
+ extract_run_info,
+)
+
+__all__ = [
+ "InfiniMetricsDataLoader",
+ "get_friendly_size",
+ "extract_accelerator_types",
+ "extract_run_info",
+]
diff --git a/dashboard/utils/data_sources.py b/dashboard/utils/data_sources.py
new file mode 100644
index 0000000..7036834
--- /dev/null
+++ b/dashboard/utils/data_sources.py
@@ -0,0 +1,255 @@
+#!/usr/bin/env python3
+"""Data source implementations for InfiniMetrics dashboard."""
+
+import json
+import logging
+import sys
+from abc import ABC, abstractmethod
+from pathlib import Path
+from typing import Any, Dict, List, Optional
+
+import pandas as pd
+
+from .data_utils import extract_accelerator_types, extract_run_info
+from db.utils import get_csv_base_dir, resolve_csv_path
+
+logger = logging.getLogger(__name__)
+
+
+class DataSource(ABC):
+ """Abstract data source for test results."""
+
+ @abstractmethod
+ def list_test_runs(self, test_type: str = None) -> List[Dict[str, Any]]:
+ """List all test runs."""
+ pass
+
+ @abstractmethod
+ def load_test_result(self, identifier) -> Dict[str, Any]:
+ """Load a single test result with full data."""
+ pass
+
+ @abstractmethod
+ def load_summaries(self) -> List[Dict[str, Any]]:
+ """Load dispatcher summaries."""
+ pass
+
+ @property
+ @abstractmethod
+ def source_type(self) -> str:
+ """Return the data source type name."""
+ pass
+
+
+class FileDataSource(DataSource):
+ """File-based data source (reads from JSON/CSV files)."""
+
+ def __init__(self, results_dir: str = "../output"):
+ self.results_dir = Path(results_dir)
+
+ @property
+ def source_type(self) -> str:
+ return "file"
+
+ def list_test_runs(self, test_type: str = None) -> List[Dict[str, Any]]:
+ """List all test runs, filtering out summary files."""
+ runs = []
+
+ for json_file in self.results_dir.rglob("*.json"):
+ try:
+ if (
+ "summary" in json_file.name.lower()
+ or "dispatcher" in json_file.name.lower()
+ ):
+ continue
+
+ with open(json_file, "r", encoding="utf-8") as f:
+ data = json.load(f)
+
+ if not self._is_test_result_file(data):
+ continue
+
+ testcase = data.get("testcase", "")
+ if test_type and not testcase.startswith(test_type):
+ continue
+
+ run_info = extract_run_info(data, json_file)
+ run_info["accelerator_types"] = extract_accelerator_types(data)
+ runs.append(run_info)
+
+ except Exception as e:
+ logger.debug(f"Skipping file {json_file}: {e}")
+
+ runs.sort(key=lambda x: x["time"], reverse=True)
+ return runs
+
+ def load_test_result(self, json_path: Path) -> Dict[str, Any]:
+ """Load a single test result with all data."""
+ with open(json_path, "r", encoding="utf-8") as f:
+ data = json.load(f)
+
+ for metric in data.get("metrics", []):
+ csv_url = metric.get("raw_data_url")
+ if csv_url and not csv_url.startswith("http"):
+ base_dir = get_csv_base_dir(data, json_path)
+ csv_path = resolve_csv_path(csv_url, base_dir)
+
+ if csv_path and csv_path.exists():
+ try:
+ df = pd.read_csv(csv_path)
+ metric["data"] = df
+ metric["data_columns"] = list(df.columns)
+ metric["csv_path"] = str(csv_path)
+ except Exception as e:
+ logger.warning(f"Failed to load CSV {csv_path}: {e}")
+ metric["data"] = None
+ else:
+ logger.debug(f"CSV not found: {csv_url} (base: {base_dir})")
+ metric["data"] = None
+
+ return data
+
+ def _is_test_result_file(self, data: Dict[str, Any]) -> bool:
+ """Check if JSON file is a test result (not a summary)."""
+ required = ["run_id", "testcase", "config"]
+ return all(key in data for key in required) and "metrics" in data
+
+ def load_summaries(self) -> List[Dict[str, Any]]:
+ """Load dispatcher summary files from summary_output directory."""
+ summaries = []
+ summary_dir = self.results_dir.parent / "summary_output"
+
+ if summary_dir.exists():
+ for json_file in sorted(
+ summary_dir.glob("dispatcher_summary_*.json"), reverse=True
+ ):
+ try:
+ with open(json_file, "r", encoding="utf-8") as f:
+ data = json.load(f)
+ data["file"] = json_file.name
+ data["timestamp"] = json_file.stem.replace(
+ "dispatcher_summary_", ""
+ )
+ summaries.append(data)
+ except Exception as e:
+ logger.warning(f"Failed to load summary {json_file}: {e}")
+
+ return summaries
+
+
+class MongoDataSource(DataSource):
+ """MongoDB-based data source."""
+
+ def __init__(self, config=None):
+ self._config = config
+ self._client = None
+ self._repository = None
+ self._connected = False
+
+ def _connect(self):
+ """Lazy connection to MongoDB."""
+ if self._connected:
+ return self._connected
+
+ try:
+ project_root = Path(__file__).parent.parent.parent
+ if str(project_root) not in sys.path:
+ sys.path.insert(0, str(project_root))
+
+ from db import MongoDBClient, TestRunRepository
+
+ if self._config:
+ self._client = MongoDBClient(self._config)
+ else:
+ self._client = MongoDBClient()
+
+ if self._client.health_check():
+ from db.config import DatabaseConfig
+
+ config = self._config or DatabaseConfig.from_env()
+ self._repository = TestRunRepository(
+ self._client.get_collection(config.collection_name)
+ )
+ self._connected = True
+ logger.info("Connected to MongoDB data source")
+ else:
+ logger.warning("MongoDB health check failed")
+
+ except Exception as e:
+ logger.warning(f"Failed to connect to MongoDB: {e}")
+ self._connected = False
+
+ return self._connected
+
+ @property
+ def source_type(self) -> str:
+ return "mongodb"
+
+ def is_connected(self) -> bool:
+ """Check if MongoDB is connected."""
+ return self._connected or self._connect()
+
+ def list_test_runs(self, test_type: str = None) -> List[Dict[str, Any]]:
+ """List all test runs from MongoDB."""
+ if not self._connect():
+ logger.warning("MongoDB not connected, returning empty list")
+ return []
+
+ runs = self._repository.list_test_runs(test_type=test_type)
+ result = []
+
+ for run in runs:
+ run_info = extract_run_info(run)
+ run_info["accelerator_types"] = extract_accelerator_types(run)
+ result.append(run_info)
+
+ result.sort(key=lambda x: x["time"], reverse=True)
+ return result
+
+ def load_test_result(self, run_id: str) -> Dict[str, Any]:
+ """Load a single test result with full data from MongoDB."""
+ if not self._connect():
+ logger.warning("MongoDB not connected")
+ return {}
+
+ data = self._repository.find_by_run_id(run_id)
+ if not data:
+ return {}
+
+ for metric in data.get("metrics", []):
+ if "data" in metric and isinstance(metric["data"], list):
+ if metric["data"]:
+ metric["data"] = pd.DataFrame(metric["data"])
+ if "data_columns" not in metric:
+ metric["data_columns"] = list(metric["data"].columns)
+
+ data.pop("_id", None)
+ data.pop("_metadata", None)
+
+ return data
+
+ def load_summaries(self) -> List[Dict[str, Any]]:
+ """Load dispatcher summaries from MongoDB."""
+ if not self._connect():
+ logger.warning("MongoDB not connected, returning empty list")
+ return []
+
+ try:
+ from db import DispatcherSummaryRepository
+ from db.config import DatabaseConfig
+
+ config = self._config or DatabaseConfig.from_env()
+ summary_collection = self._client.get_collection(
+ config.summary_collection_name
+ )
+ summary_repo = DispatcherSummaryRepository(summary_collection)
+ summaries = summary_repo.list_summaries()
+
+ for s in summaries:
+ s.pop("_id", None)
+ s.pop("_metadata", None)
+
+ return summaries
+ except Exception as e:
+ logger.warning(f"Failed to load summaries from MongoDB: {e}")
+ return []
diff --git a/dashboard/utils/data_utils.py b/dashboard/utils/data_utils.py
new file mode 100644
index 0000000..33e6e50
--- /dev/null
+++ b/dashboard/utils/data_utils.py
@@ -0,0 +1,114 @@
+#!/usr/bin/env python3
+"""Data utility functions for InfiniMetrics dashboard."""
+import json
+import logging
+
+from pathlib import Path
+from typing import Any, Dict, List
+
+
+def load_summary_file(summary_path: str = "../summary_output") -> List[Dict[str, Any]]:
+ """Load dispatcher summary files."""
+
+ logger = logging.getLogger(__name__)
+ summaries = []
+ summary_dir = Path(summary_path)
+
+ if summary_dir.exists():
+ for json_file in sorted(
+ summary_dir.glob("dispatcher_summary_*.json"), reverse=True
+ ):
+ try:
+ with open(json_file, "r", encoding="utf-8") as f:
+ data = json.load(f)
+ data["file"] = json_file.name
+ data["timestamp"] = json_file.stem.replace("dispatcher_summary_", "")
+ summaries.append(data)
+ except Exception as e:
+ logger.warning(f"Failed to load summary {json_file}: {e}")
+
+ return summaries
+
+
+def get_friendly_size(size_bytes: int) -> str:
+ """Convert bytes to human-readable size."""
+ for unit in ["B", "KB", "MB", "GB", "TB"]:
+ if size_bytes < 1024.0:
+ return f"{size_bytes:.1f} {unit}"
+ size_bytes /= 1024.0
+ return f"{size_bytes:.1f} PB"
+
+
+def extract_accelerator_types(result_json: dict) -> list[str]:
+ """Extract the accelerator card type from result_json."""
+ types = set()
+ try:
+ clusters = result_json.get("environment", {}).get("cluster", [])
+ for node in clusters:
+ accs = node.get("machine", {}).get("accelerators", [])
+ for acc in accs:
+ if "type" in acc:
+ types.add(acc["type"])
+ except Exception:
+ pass
+ return list(types)
+
+
+def extract_test_type(testcase: str) -> str:
+ """Extract test type from testcase string (e.g., 'comm.nccl.allreduce' -> 'comm')."""
+ parts = testcase.split(".")
+ return parts[0] if parts else "unknown"
+
+
+def extract_operation(testcase: str) -> str:
+ """Extract operation from testcase string (e.g., 'comm.nccl.allreduce' -> 'allreduce')."""
+ parts = testcase.split(".")
+ return parts[2] if len(parts) > 2 else "unknown"
+
+
+def extract_run_info(data: Dict[str, Any], path: Path = None) -> Dict[str, Any]:
+ """
+ Extract run info from test result data.
+
+ Args:
+ data: Test result JSON data
+ path: Optional file path (for file-based sources)
+
+ Returns:
+ Dictionary with extracted run information
+ """
+ config = data.get("config", {})
+ resolved = data.get("resolved", {})
+
+ device_used = (
+ resolved.get("device_used")
+ or config.get("device_used")
+ or config.get("device_involved", 1)
+ )
+
+ nodes = resolved.get("nodes") or data.get("environment", {}).get("cluster_scale", 1)
+
+ result_code = data.get("result_code", 1)
+ success = result_code == 0
+
+ metrics = data.get("metrics", [])
+ metric_types = [m.get("name", "").split(".")[0] for m in metrics if m.get("name")]
+
+ testcase = data.get("testcase", "")
+
+ return {
+ "path": path,
+ "testcase": testcase,
+ "run_id": data.get("run_id", "unknown"),
+ "time": data.get("time", ""),
+ "success": success,
+ "result_code": result_code,
+ "test_type": extract_test_type(testcase),
+ "operation": extract_operation(testcase),
+ "config": config,
+ "resolved": resolved,
+ "device_used": device_used,
+ "nodes": nodes,
+ "metrics_count": len(metrics),
+ "metric_types": list(set(metric_types)),
+ }
diff --git a/db/__init__.py b/db/__init__.py
index b5e4703..973a712 100644
--- a/db/__init__.py
+++ b/db/__init__.py
@@ -6,6 +6,7 @@
- MongoDB connection management
- Test result repository
- Dispatcher summary repository
+- Data import from JSON/CSV files
Usage:
from db import MongoDBClient, DatabaseConfig, TestRunRepository
@@ -18,12 +19,15 @@
test_runs = TestRunRepository(client.get_collection("test_runs"))
summaries = DispatcherSummaryRepository(client.get_collection("dispatcher_summaries"))
- # Query test runs
- runs = test_runs.find_all(limit=10)
+ # Import data
+ from db import DataImporter
+ importer = DataImporter(test_runs)
+ importer.import_directory(Path("./output"))
"""
from .client import MongoDBClient, MongoDBConnectionError
from .config import DatabaseConfig
+from .importer import DataImporter
from .repository import DispatcherSummaryRepository, TestRunRepository
__all__ = [
@@ -32,4 +36,5 @@
"MongoDBConnectionError",
"TestRunRepository",
"DispatcherSummaryRepository",
+ "DataImporter",
]
diff --git a/db/importer.py b/db/importer.py
new file mode 100644
index 0000000..12b6186
--- /dev/null
+++ b/db/importer.py
@@ -0,0 +1,279 @@
+#!/usr/bin/env python3
+"""Data importer for loading JSON/CSV test results into MongoDB."""
+
+import json
+import logging
+from pathlib import Path
+from typing import Any, Dict, Optional
+
+from .repository import TestRunRepository
+from .utils import (
+ get_csv_base_dir,
+ is_dispatcher_summary,
+ is_dispatcher_summary_file,
+ is_valid_test_result,
+ load_csv_data,
+ resolve_csv_path,
+ resolve_result_file_path,
+ should_skip_file,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class DataImporter:
+ """
+ Import JSON/CSV test results to MongoDB.
+
+ Supports hierarchical structure:
+ - Dispatcher summary files (summary_output/dispatcher_summary_*.json)
+ - Individual test result files (output/*_results.json)
+ """
+
+ # Return status constants
+ STATUS_IMPORTED = "imported"
+ STATUS_SKIPPED = "skipped"
+ STATUS_FAILED = "failed"
+
+ def __init__(self, repository: TestRunRepository, base_dir: Optional[Path] = None):
+ self._repository = repository
+ self._base_dir = Path(base_dir) if base_dir else Path.cwd()
+
+ def import_dispatcher_summary(
+ self, summary_path: Path, overwrite: bool = False
+ ) -> Dict[str, Any]:
+ """Import a dispatcher summary file and all referenced test results."""
+ summary: Dict[str, Any] = {
+ "imported": [],
+ "skipped": [],
+ "failed": [],
+ "summary_file": str(summary_path),
+ }
+
+ try:
+ with open(summary_path, "r", encoding="utf-8") as f:
+ summary_data = json.load(f)
+
+ if not is_dispatcher_summary(summary_data):
+ logger.debug(f"Not a dispatcher summary: {summary_path}")
+ return summary
+
+ logger.info(
+ f"Processing dispatcher summary: {summary_path} "
+ f"({summary_data.get('total_tests', 0)} tests)"
+ )
+
+ for result_info in summary_data.get("results", []):
+ result_file = result_info.get("result_file")
+ if not result_file:
+ continue
+
+ result_path = resolve_result_file_path(
+ result_file, summary_path, self._base_dir
+ )
+
+ if not result_path or not result_path.exists():
+ logger.warning(f"Result file not found: {result_file}")
+ summary["failed"].append(result_file)
+ continue
+
+ imported_run_id, status = self.import_test_result(
+ result_path,
+ dispatcher_info={
+ "summary_file": str(summary_path),
+ "summary_timestamp": summary_data.get("timestamp"),
+ "total_tests": summary_data.get("total_tests"),
+ },
+ overwrite=overwrite,
+ )
+
+ if status == self.STATUS_IMPORTED:
+ summary["imported"].append(imported_run_id)
+ elif status == self.STATUS_SKIPPED:
+ summary["skipped"].append(
+ imported_run_id or result_info.get("run_id")
+ )
+ else:
+ summary["failed"].append(str(result_path))
+
+ except json.JSONDecodeError as e:
+ logger.error(f"Invalid JSON in {summary_path}: {e}")
+ except Exception as e:
+ logger.error(f"Failed to process summary {summary_path}: {e}")
+
+ logger.info(
+ f"Summary processed: {len(summary['imported'])} imported, "
+ f"{len(summary['skipped'])} skipped, {len(summary['failed'])} failed"
+ )
+ return summary
+
+ def import_test_result(
+ self,
+ result_path: Path,
+ dispatcher_info: Optional[Dict[str, Any]] = None,
+ overwrite: bool = False,
+ ) -> tuple[str, str]:
+ """
+ Import a single test result file to MongoDB.
+
+ Returns:
+ Tuple of (run_id, status) where status is one of:
+ - "imported": Successfully imported
+ - "skipped": Already exists or not a test result
+ - "failed": Error during import
+ """
+ try:
+ with open(result_path, "r", encoding="utf-8") as f:
+ data = json.load(f)
+
+ if not is_valid_test_result(data):
+ logger.debug(f"Skipping non-test file: {result_path}")
+ return (None, self.STATUS_SKIPPED)
+
+ run_id = data.get("run_id")
+ if not run_id:
+ logger.warning(f"No run_id in {result_path}")
+ return (None, self.STATUS_SKIPPED)
+
+ if self._repository.exists(run_id) and not overwrite:
+ logger.debug(f"Skipping existing run_id: {run_id}")
+ return (run_id, self.STATUS_SKIPPED)
+
+ self._embed_csv_data(data, result_path)
+
+ data.setdefault("_metadata", {})
+ data["_metadata"]["source_file"] = str(result_path)
+ data["_metadata"]["version"] = "1.0"
+
+ if dispatcher_info:
+ data["_metadata"]["dispatcher"] = dispatcher_info
+
+ if overwrite and self._repository.exists(run_id):
+ inserted_id = self._repository.upsert(data)
+ else:
+ inserted_id = self._repository.insert(data)
+
+ return (run_id, self.STATUS_IMPORTED if inserted_id else self.STATUS_FAILED)
+
+ except json.JSONDecodeError as e:
+ logger.error(f"Invalid JSON in {result_path}: {e}")
+ return (None, self.STATUS_FAILED)
+ except Exception as e:
+ logger.error(f"Failed to import {result_path}: {e}")
+ return (None, self.STATUS_FAILED)
+
+ def import_directory(
+ self,
+ directory: Path,
+ recursive: bool = True,
+ overwrite: bool = False,
+ include_summaries: bool = True,
+ ) -> Dict[str, Any]:
+ """Import all JSON files from a directory."""
+ summary: Dict[str, Any] = {"imported": [], "skipped": [], "failed": []}
+
+ directory = Path(directory)
+ if not directory.exists():
+ logger.error(f"Directory not found: {directory}")
+ return summary
+
+ pattern = "**/*.json" if recursive else "*.json"
+ json_files = list(directory.glob(pattern))
+
+ logger.info(f"Found {len(json_files)} JSON files in {directory}")
+
+ for json_file in json_files:
+ if is_dispatcher_summary_file(json_file):
+ if include_summaries:
+ result = self.import_dispatcher_summary(json_file, overwrite)
+ summary["imported"].extend(result["imported"])
+ summary["skipped"].extend(result["skipped"])
+ summary["failed"].extend(result["failed"])
+ continue
+
+ if should_skip_file(json_file):
+ continue
+
+ run_id, status = self.import_test_result(json_file, overwrite=overwrite)
+ if status == self.STATUS_IMPORTED:
+ summary["imported"].append(run_id)
+ elif status == self.STATUS_SKIPPED:
+ if run_id:
+ summary["skipped"].append(run_id)
+ else:
+ summary["failed"].append(str(json_file))
+
+ logger.info(
+ f"Import completed: {len(summary['imported'])} imported, "
+ f"{len(summary['skipped'])} skipped, {len(summary['failed'])} failed"
+ )
+ return summary
+
+ def import_all(
+ self,
+ output_dir: Optional[Path] = None,
+ summary_dir: Optional[Path] = None,
+ overwrite: bool = False,
+ ) -> Dict[str, Any]:
+ """Import from both output and summary directories."""
+ combined: Dict[str, Any] = {"imported": [], "skipped": [], "failed": []}
+
+ if summary_dir:
+ summary_dir = Path(summary_dir)
+ if summary_dir.exists():
+ logger.info(f"Importing from summary directory: {summary_dir}")
+ for summary_file in sorted(
+ summary_dir.glob("dispatcher_summary_*.json")
+ ):
+ result = self.import_dispatcher_summary(summary_file, overwrite)
+ combined["imported"].extend(result["imported"])
+ combined["skipped"].extend(result["skipped"])
+ combined["failed"].extend(result["failed"])
+
+ if output_dir:
+ output_dir = Path(output_dir)
+ if output_dir.exists():
+ logger.info(f"Importing from output directory: {output_dir}")
+ result = self.import_directory(
+ output_dir,
+ recursive=True,
+ overwrite=overwrite,
+ include_summaries=False,
+ )
+ for run_id in result["imported"]:
+ if run_id not in combined["imported"]:
+ combined["imported"].append(run_id)
+ combined["skipped"].extend(result["skipped"])
+ combined["failed"].extend(result["failed"])
+
+ logger.info(
+ f"Total import: {len(combined['imported'])} imported, "
+ f"{len(combined['skipped'])} skipped, {len(combined['failed'])} failed"
+ )
+ return combined
+
+ def import_json_file(
+ self, json_path: Path, overwrite: bool = False
+ ) -> Optional[str]:
+ """Import a single JSON result file (legacy method)."""
+ run_id, status = self.import_test_result(json_path, overwrite=overwrite)
+ return run_id if status == self.STATUS_IMPORTED else None
+
+ def _embed_csv_data(self, data: Dict[str, Any], json_path: Path) -> None:
+ """Load CSV files and embed data into metrics."""
+ base_dir = get_csv_base_dir(data, json_path)
+
+ for metric in data.get("metrics", []):
+ csv_url = metric.get("raw_data_url")
+ if csv_url and not csv_url.startswith("http"):
+ csv_path = resolve_csv_path(csv_url, base_dir)
+ if csv_path and csv_path.exists():
+ try:
+ csv_data = load_csv_data(csv_path)
+ metric["data"] = csv_data
+ metric["data_columns"] = (
+ list(csv_data[0].keys()) if csv_data else []
+ )
+ logger.debug(f"Embedded CSV data from {csv_path}")
+ except Exception as e:
+ logger.warning(f"Failed to load CSV {csv_path}: {e}")
diff --git a/db/utils.py b/db/utils.py
new file mode 100644
index 0000000..f950f7f
--- /dev/null
+++ b/db/utils.py
@@ -0,0 +1,163 @@
+#!/usr/bin/env python3
+"""Utility functions for file handling and data processing."""
+
+import csv
+import logging
+from pathlib import Path
+from typing import Any, Dict, List, Optional
+
+logger = logging.getLogger(__name__)
+
+
+# ==================== CSV Utilities ====================
+
+
+def load_csv_data(csv_path: Path) -> List[Dict[str, Any]]:
+ """Load CSV file as list of dictionaries."""
+ with open(csv_path, "r", encoding="utf-8") as f:
+ reader = csv.DictReader(f)
+ return [convert_csv_row(row) for row in reader]
+
+
+def convert_csv_row(row: Dict[str, str]) -> Dict[str, Any]:
+ """Convert CSV row values to appropriate types."""
+ result = {}
+ for key, value in row.items():
+ try:
+ if "." in value:
+ result[key] = float(value)
+ else:
+ result[key] = int(value)
+ except (ValueError, TypeError):
+ result[key] = value
+ return result
+
+
+def resolve_csv_path(csv_url: str, base_dir: Path) -> Optional[Path]:
+ """Resolve CSV path with fallback strategies."""
+ if not csv_url:
+ return None
+
+ rel = csv_url[2:] if csv_url.startswith("./") else csv_url
+ rel_path = Path(rel)
+
+ candidates = [
+ base_dir / rel_path,
+ base_dir / rel_path.name,
+ base_dir.parent / rel_path,
+ base_dir.parent / rel_path.name,
+ ]
+
+ for p in candidates:
+ if p.exists():
+ return p
+ return None
+
+
+def get_csv_base_dir(data: Dict[str, Any], json_path: Path) -> Path:
+ """Get base directory for CSV resolution."""
+ config = data.get("config", {})
+ output_dir = config.get("output_dir")
+ if output_dir:
+ output_path = Path(output_dir)
+ if output_path.is_absolute():
+ return output_path
+ return json_path.parent / output_dir
+ return json_path.parent
+
+
+# ==================== File Type Detection ====================
+
+
+def is_valid_test_result(data: Dict[str, Any]) -> bool:
+ """Check if data is a valid test result."""
+ required = ["run_id", "testcase", "config"]
+ return all(k in data for k in required) and "metrics" in data
+
+
+def is_dispatcher_summary(data: Dict[str, Any]) -> bool:
+ """Check if data is a dispatcher summary file."""
+ return "results" in data and "total_tests" in data
+
+
+def is_dispatcher_summary_file(path: Path) -> bool:
+ """Check if file is a dispatcher summary based on name."""
+ name_lower = path.name.lower()
+ return "dispatcher_summary" in name_lower or (
+ "summary" in name_lower and "dispatcher" in name_lower
+ )
+
+
+def should_skip_file(path: Path) -> bool:
+ """Check if file should be skipped."""
+ name_lower = path.name.lower()
+ if "summary" in name_lower and not is_dispatcher_summary_file(path):
+ return True
+ return False
+
+
+# ==================== Path Resolution ====================
+
+
+# Directory name aliases for flexible path resolution
+_DIR_ALIASES = {
+ "comm": "communication",
+ "communication": "comm",
+ "infer": "inference",
+ "inference": "infer",
+ "hw": "hardware",
+ "hardware": "hw",
+ "op": "operators",
+ "operators": "op",
+}
+
+
+def _get_path_variants(result_path: Path) -> List[Path]:
+ """Generate path variants with directory name aliases."""
+ variants = [result_path]
+ parts = list(result_path.parts)
+
+ for i, part in enumerate(parts):
+ if part in _DIR_ALIASES:
+ new_parts = parts.copy()
+ new_parts[i] = _DIR_ALIASES[part]
+ variants.append(Path(*new_parts))
+
+ # Also try test_output <-> output mapping
+ result_str = str(result_path)
+ if result_str.startswith("test_output/"):
+ variants.append(Path(result_str.replace("test_output/", "output/", 1)))
+ elif result_str.startswith("output/"):
+ variants.append(Path(result_str.replace("output/", "test_output/", 1)))
+
+ return variants
+
+
+def resolve_result_file_path(
+ result_file: str, summary_path: Path, base_dir: Path
+) -> Optional[Path]:
+ """Resolve result file path from dispatcher summary reference."""
+ result_path = Path(result_file)
+
+ if result_path.is_absolute():
+ return result_path
+
+ # Generate path variants with directory name aliases
+ path_variants = _get_path_variants(result_path)
+
+ for variant in path_variants:
+ candidates = [
+ base_dir / variant,
+ summary_path.parent.parent / variant,
+ summary_path.parent / variant,
+ ]
+ for p in candidates:
+ if p.exists():
+ return p
+
+ # Last resort: search by filename in base_dir subdirectories
+ filename = result_path.name
+ for p in base_dir.rglob(filename):
+ return p
+
+ return None
diff --git a/infinimetrics/hardware/hardware_adapter.py b/infinimetrics/hardware/hardware_adapter.py
index 2de5b4a..426c60a 100644
--- a/infinimetrics/hardware/hardware_adapter.py
+++ b/infinimetrics/hardware/hardware_adapter.py
@@ -72,7 +72,8 @@ def process(self, test_input: Any) -> Dict[str, Any]:
logger.info(f"HardwareTestAdapter: Processing {testcase}")
- self.output_dir = Path(config.get("output_dir", "./output"))
+ # Put CSV files in hardware/ subdirectory to match JSON location
+ self.output_dir = Path(config.get("output_dir", "./output")) / "hardware"
self.output_dir.mkdir(parents=True, exist_ok=True)
device = config.get("device", "cuda").lower()
diff --git a/pyproject.toml b/pyproject.toml
index e4b3ab2..50c6741 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -31,6 +31,9 @@ exclude = '''
)/
'''
+[tool.setuptools.packages.find]
+include = ["infinimetrics*", "dashboard*", "db*"]
+
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
From 6e73e319431a70bf397a2b144062c0c319450612 Mon Sep 17 00:00:00 2001
From: Li Baoming <1508269885@qq.com>
Date: Wed, 11 Mar 2026 18:25:10 +0800
Subject: [PATCH 2/5] fix: fix indentation
---
dashboard/common.py | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/dashboard/common.py b/dashboard/common.py
index 34ec248..0a59f07 100644
--- a/dashboard/common.py
+++ b/dashboard/common.py
@@ -50,6 +50,6 @@ def show_data_source_info(style: str = "caption"):
st.caption("数据源: MongoDB")
else:
if style == "sidebar":
- st.info(f"📁 数据源: 文件系统 ({dl.results_dir})")
- else:
- st.caption(f"数据源: 文件系统 ({dl.results_dir})")
+ st.info(f"📁 数据源: 文件系统 ({dl.results_dir})")
+ else:
+ st.caption(f"数据源: 文件系统 ({dl.results_dir})")
From 95e90c47b729afd5f97756a31a6815001d27a40b Mon Sep 17 00:00:00 2001
From: Li Baoming <1508269885@qq.com>
Date: Wed, 11 Mar 2026 18:44:02 +0800
Subject: [PATCH 3/5] fix: remove duplicated code && bug fix
---
dashboard/pages/communication.py | 4 ++--
dashboard/pages/inference.py | 2 +-
dashboard/pages/operator.py | 2 +-
dashboard/utils/data_sources.py | 21 ++-------------------
4 files changed, 6 insertions(+), 23 deletions(-)
diff --git a/dashboard/pages/communication.py b/dashboard/pages/communication.py
index 202c40e..27d9508 100644
--- a/dashboard/pages/communication.py
+++ b/dashboard/pages/communication.py
@@ -29,7 +29,7 @@ def main():
try:
# Load communication test results
- comm_runs = dl.list_test_runs("comm")
+ comm_runs = st.session_state.data_loader.list_test_runs("comm")
if not comm_runs:
st.info("未找到通信测试结果")
@@ -121,7 +121,7 @@ def main():
run_info = filtered_runs[idx]
# Use path for file source, run_id for MongoDB
identifier = run_info.get("path") or run_info.get("run_id")
- result = dl.load_test_result(identifier)
+ result = st.session_state.data_loader.load_test_result(identifier)
run_info["data"] = result
selected_runs.append(run_info)
diff --git a/dashboard/pages/inference.py b/dashboard/pages/inference.py
index 74768c1..14e0741 100644
--- a/dashboard/pages/inference.py
+++ b/dashboard/pages/inference.py
@@ -93,7 +93,7 @@ def _mode_of(r):
ri = filtered[options[k]]
# Use path for file source, run_id for MongoDB
identifier = ri.get("path") or ri.get("run_id")
- data = dl.load_test_result(identifier)
+ data = st.session_state.data_loader.load_test_result(identifier)
ri = dict(ri)
ri["data"] = data
selected_runs.append(ri)
diff --git a/dashboard/pages/operator.py b/dashboard/pages/operator.py
index f388248..5fdfd78 100644
--- a/dashboard/pages/operator.py
+++ b/dashboard/pages/operator.py
@@ -58,7 +58,7 @@ def main():
ri = filtered[options[k]]
# Use path for file source, run_id for MongoDB
identifier = ri.get("path") or ri.get("run_id")
- data = dl.load_test_result(identifier)
+ data = st.session_state.data_loader.load_test_result(identifier)
ri = dict(ri)
ri["data"] = data
selected_runs.append(ri)
diff --git a/dashboard/utils/data_sources.py b/dashboard/utils/data_sources.py
index 7036834..03607af 100644
--- a/dashboard/utils/data_sources.py
+++ b/dashboard/utils/data_sources.py
@@ -10,7 +10,7 @@
import pandas as pd
-from .data_utils import extract_accelerator_types, extract_run_info
+from .data_utils import extract_accelerator_types, extract_run_info, load_summary_file
from db.utils import get_csv_base_dir, resolve_csv_path
logger = logging.getLogger(__name__)
@@ -116,25 +116,8 @@ def _is_test_result_file(self, data: Dict[str, Any]) -> bool:
def load_summaries(self) -> List[Dict[str, Any]]:
"""Load dispatcher summary files from summary_output directory."""
- summaries = []
summary_dir = self.results_dir.parent / "summary_output"
-
- if summary_dir.exists():
- for json_file in sorted(
- summary_dir.glob("dispatcher_summary_*.json"), reverse=True
- ):
- try:
- with open(json_file, "r", encoding="utf-8") as f:
- data = json.load(f)
- data["file"] = json_file.name
- data["timestamp"] = json_file.stem.replace(
- "dispatcher_summary_", ""
- )
- summaries.append(data)
- except Exception as e:
- logger.warning(f"Failed to load summary {json_file}: {e}")
-
- return summaries
+ return load_summary_file(str(summary_dir))
class MongoDataSource(DataSource):
From 77a742331e5386776c99e016cd7ed0dee99bf241 Mon Sep 17 00:00:00 2001
From: Li Baoming <1508269885@qq.com>
Date: Fri, 13 Mar 2026 18:30:07 +0800
Subject: [PATCH 4/5] fix: remove dead code && seperate mongo db logic from
reading file logic
---
dashboard/utils/data_loader.py | 71 +++++++-------
dashboard/utils/data_sources.py | 131 ++------------------------
dashboard/utils/mongo_data_source.py | 132 +++++++++++++++++++++++++++
db/__init__.py | 30 +++---
4 files changed, 197 insertions(+), 167 deletions(-)
create mode 100644 dashboard/utils/mongo_data_source.py
diff --git a/dashboard/utils/data_loader.py b/dashboard/utils/data_loader.py
index 5d6cccf..6eb0a40 100644
--- a/dashboard/utils/data_loader.py
+++ b/dashboard/utils/data_loader.py
@@ -5,9 +5,7 @@
from pathlib import Path
from typing import Any, Dict, List, Optional
-import pandas as pd
-
-from .data_sources import DataSource, FileDataSource, MongoDataSource
+from .data_sources import DataSource, FileDataSource
from .data_utils import extract_accelerator_types, extract_run_info, get_friendly_size
logger = logging.getLogger(__name__)
@@ -50,18 +48,42 @@ def __init__(
else:
self._source = FileDataSource(results_dir)
- def _init_mongodb_source(self):
- """Initialize MongoDB data source with optional fallback."""
- mongo_source = MongoDataSource(self._mongo_config)
+ def _try_connect_mongo(self) -> Optional["MongoDataSource"]:
+ """
+ Try to connect to MongoDB.
- if mongo_source.is_connected():
+ Returns:
+ MongoDataSource if connected, None otherwise
+ """
+ try:
+ from .mongo_data_source import MongoDataSource
+
+ mongo_source = MongoDataSource(self._mongo_config)
+ if mongo_source.is_connected():
+ return mongo_source
+ except ImportError as e:
+ logger.warning(f"MongoDB dependencies not installed ({e})")
+ return None
+
+ def _apply_mongo_or_fallback(self, mongo_source: Optional["MongoDataSource"]):
+ """Apply MongoDB source or fallback to files based on configuration."""
+ if mongo_source:
self._source = mongo_source
+ self._use_mongodb = True
elif self._fallback_to_files:
- logger.warning("MongoDB unavailable, falling back to file-based loading")
+ logger.warning("MongoDB unavailable, using file-based loading")
self._source = FileDataSource(str(self.results_dir))
self._use_mongodb = False
else:
- raise RuntimeError("MongoDB connection failed and fallback is disabled")
+ raise RuntimeError(
+ "MongoDB connection failed and fallback is disabled. "
+ "Install pymongo to use MongoDB."
+ )
+
+ def _init_mongodb_source(self):
+ """Initialize MongoDB data source with optional fallback."""
+ mongo_source = self._try_connect_mongo()
+ self._apply_mongo_or_fallback(mongo_source)
@property
def source_type(self) -> str:
@@ -90,17 +112,14 @@ def switch_to_mongodb(self, mongo_config=None) -> bool:
if mongo_config:
self._mongo_config = mongo_config
- mongo_source = MongoDataSource(self._mongo_config)
-
- if mongo_source.is_connected():
+ mongo_source = self._try_connect_mongo()
+ if mongo_source:
self._source = mongo_source
self._use_mongodb = True
return True
- elif self._fallback_to_files:
+ else:
logger.warning("Failed to switch to MongoDB, keeping current source")
return False
- else:
- raise RuntimeError("MongoDB connection failed")
def switch_to_files(self, results_dir: str = None):
"""Switch to file-based data source."""
@@ -133,27 +152,9 @@ def load_summaries(self) -> List[Dict[str, Any]]:
return []
return self._source.load_summaries()
- def load_csv_data(
- self, csv_url: str, json_data: Dict[str, Any], json_path: Path
- ) -> Optional[pd.DataFrame]:
- """Load CSV data file using proper path resolution (file source only)."""
- if isinstance(self._source, FileDataSource):
- try:
- if csv_url.startswith("http"):
- return None
-
- base_dir = self._source._get_csv_base_dir(json_data, json_path)
- csv_path = self._source._resolve_csv_path(csv_url, base_dir)
-
- if csv_path and csv_path.exists():
- return pd.read_csv(csv_path)
- except Exception as e:
- logger.error(f"Failed to load CSV {csv_url}: {e}")
- return None
-
# Re-export from sibling modules
-from .data_sources import DataSource, FileDataSource, MongoDataSource
+from .data_sources import DataSource, FileDataSource
from .data_utils import (
get_friendly_size,
extract_accelerator_types,
@@ -162,6 +163,8 @@ def load_csv_data(
__all__ = [
"InfiniMetricsDataLoader",
+ "DataSource",
+ "FileDataSource",
"get_friendly_size",
"extract_accelerator_types",
"extract_run_info",
diff --git a/dashboard/utils/data_sources.py b/dashboard/utils/data_sources.py
index 03607af..f3abf21 100644
--- a/dashboard/utils/data_sources.py
+++ b/dashboard/utils/data_sources.py
@@ -11,7 +11,14 @@
import pandas as pd
from .data_utils import extract_accelerator_types, extract_run_info, load_summary_file
-from db.utils import get_csv_base_dir, resolve_csv_path
+
+# Add project root to path for db module access (works regardless of cwd)
+_project_root = Path(__file__).parent.parent.parent
+if str(_project_root) not in sys.path:
+ sys.path.insert(0, str(_project_root))
+
+# Direct import to avoid triggering db/__init__.py (which imports pymongo)
+import db.utils as _db_utils
logger = logging.getLogger(__name__)
@@ -91,8 +98,8 @@ def load_test_result(self, json_path: Path) -> Dict[str, Any]:
for metric in data.get("metrics", []):
csv_url = metric.get("raw_data_url")
if csv_url and not csv_url.startswith("http"):
- base_dir = get_csv_base_dir(data, json_path)
- csv_path = resolve_csv_path(csv_url, base_dir)
+ base_dir = _db_utils.get_csv_base_dir(data, json_path)
+ csv_path = _db_utils.resolve_csv_path(csv_url, base_dir)
if csv_path and csv_path.exists():
try:
@@ -118,121 +125,3 @@ def load_summaries(self) -> List[Dict[str, Any]]:
"""Load dispatcher summary files from summary_output directory."""
summary_dir = self.results_dir.parent / "summary_output"
return load_summary_file(str(summary_dir))
-
-
-class MongoDataSource(DataSource):
- """MongoDB-based data source."""
-
- def __init__(self, config=None):
- self._config = config
- self._client = None
- self._repository = None
- self._connected = False
-
- def _connect(self):
- """Lazy connection to MongoDB."""
- if self._connected:
- return self._connected
-
- try:
- project_root = Path(__file__).parent.parent.parent
- if str(project_root) not in sys.path:
- sys.path.insert(0, str(project_root))
-
- from db import MongoDBClient, TestRunRepository
-
- if self._config:
- self._client = MongoDBClient(self._config)
- else:
- self._client = MongoDBClient()
-
- if self._client.health_check():
- from db.config import DatabaseConfig
-
- config = self._config or DatabaseConfig.from_env()
- self._repository = TestRunRepository(
- self._client.get_collection(config.collection_name)
- )
- self._connected = True
- logger.info("Connected to MongoDB data source")
- else:
- logger.warning("MongoDB health check failed")
-
- except Exception as e:
- logger.warning(f"Failed to connect to MongoDB: {e}")
- self._connected = False
-
- return self._connected
-
- @property
- def source_type(self) -> str:
- return "mongodb"
-
- def is_connected(self) -> bool:
- """Check if MongoDB is connected."""
- return self._connected or self._connect()
-
- def list_test_runs(self, test_type: str = None) -> List[Dict[str, Any]]:
- """List all test runs from MongoDB."""
- if not self._connect():
- logger.warning("MongoDB not connected, returning empty list")
- return []
-
- runs = self._repository.list_test_runs(test_type=test_type)
- result = []
-
- for run in runs:
- run_info = extract_run_info(run)
- run_info["accelerator_types"] = extract_accelerator_types(run)
- result.append(run_info)
-
- result.sort(key=lambda x: x["time"], reverse=True)
- return result
-
- def load_test_result(self, run_id: str) -> Dict[str, Any]:
- """Load a single test result with full data from MongoDB."""
- if not self._connect():
- logger.warning("MongoDB not connected")
- return {}
-
- data = self._repository.find_by_run_id(run_id)
- if not data:
- return {}
-
- for metric in data.get("metrics", []):
- if "data" in metric and isinstance(metric["data"], list):
- if metric["data"]:
- metric["data"] = pd.DataFrame(metric["data"])
- if "data_columns" not in metric:
- metric["data_columns"] = list(metric["data"].columns)
-
- data.pop("_id", None)
- data.pop("_metadata", None)
-
- return data
-
- def load_summaries(self) -> List[Dict[str, Any]]:
- """Load dispatcher summaries from MongoDB."""
- if not self._connect():
- logger.warning("MongoDB not connected, returning empty list")
- return []
-
- try:
- from db import DispatcherSummaryRepository
- from db.config import DatabaseConfig
-
- config = self._config or DatabaseConfig.from_env()
- summary_collection = self._client.get_collection(
- config.summary_collection_name
- )
- summary_repo = DispatcherSummaryRepository(summary_collection)
- summaries = summary_repo.list_summaries()
-
- for s in summaries:
- s.pop("_id", None)
- s.pop("_metadata", None)
-
- return summaries
- except Exception as e:
- logger.warning(f"Failed to load summaries from MongoDB: {e}")
- return []
diff --git a/dashboard/utils/mongo_data_source.py b/dashboard/utils/mongo_data_source.py
new file mode 100644
index 0000000..85daa28
--- /dev/null
+++ b/dashboard/utils/mongo_data_source.py
@@ -0,0 +1,132 @@
+#!/usr/bin/env python3
+"""MongoDB data source for InfiniMetrics dashboard."""
+
+import logging
+import sys
+from pathlib import Path
+from typing import Any, Dict, List, Optional
+
+import pandas as pd
+
+from .data_sources import DataSource
+from .data_utils import extract_accelerator_types, extract_run_info
+
+logger = logging.getLogger(__name__)
+
+
+class MongoDataSource(DataSource):
+ """MongoDB-based data source."""
+
+ def __init__(self, config=None):
+ self._config = config
+ self._client = None
+ self._repository = None
+ self._connected = False
+
+ def _connect(self):
+ """Lazy connection to MongoDB."""
+ if self._connected:
+ return self._connected
+
+ try:
+ project_root = Path(__file__).parent.parent.parent
+ if str(project_root) not in sys.path:
+ sys.path.insert(0, str(project_root))
+
+ from db import MongoDBClient, TestRunRepository
+
+ if self._config:
+ self._client = MongoDBClient(self._config)
+ else:
+ self._client = MongoDBClient()
+
+ if self._client.health_check():
+ from db.config import DatabaseConfig
+
+ config = self._config or DatabaseConfig.from_env()
+ self._repository = TestRunRepository(
+ self._client.get_collection(config.collection_name)
+ )
+ self._connected = True
+ logger.info("Connected to MongoDB data source")
+ else:
+ logger.warning("MongoDB health check failed")
+
+ except Exception as e:
+ logger.warning(f"Failed to connect to MongoDB: {e}")
+ self._connected = False
+
+ return self._connected
+
+ @property
+ def source_type(self) -> str:
+ return "mongodb"
+
+ def is_connected(self) -> bool:
+ """Check if MongoDB is connected."""
+ return self._connected or self._connect()
+
+ def list_test_runs(self, test_type: str = None) -> List[Dict[str, Any]]:
+ """List all test runs from MongoDB."""
+ if not self._connect():
+ logger.warning("MongoDB not connected, returning empty list")
+ return []
+
+ runs = self._repository.list_test_runs(test_type=test_type)
+ result = []
+
+ for run in runs:
+ run_info = extract_run_info(run)
+ run_info["accelerator_types"] = extract_accelerator_types(run)
+ result.append(run_info)
+
+ result.sort(key=lambda x: x["time"], reverse=True)
+ return result
+
+ def load_test_result(self, run_id: str) -> Dict[str, Any]:
+ """Load a single test result with full data from MongoDB."""
+ if not self._connect():
+ logger.warning("MongoDB not connected")
+ return {}
+
+ data = self._repository.find_by_run_id(run_id)
+ if not data:
+ return {}
+
+ for metric in data.get("metrics", []):
+ if "data" in metric and isinstance(metric["data"], list):
+ if metric["data"]:
+ metric["data"] = pd.DataFrame(metric["data"])
+ if "data_columns" not in metric:
+ metric["data_columns"] = list(metric["data"].columns)
+
+ data.pop("_id", None)
+ data.pop("_metadata", None)
+
+ return data
+
+ def load_summaries(self) -> List[Dict[str, Any]]:
+ """Load dispatcher summaries from MongoDB."""
+ if not self._connect():
+ logger.warning("MongoDB not connected, returning empty list")
+ return []
+
+ try:
+ from db import DispatcherSummaryRepository
+ from db.config import DatabaseConfig
+
+ config = self._config or DatabaseConfig.from_env()
+ summary_collection = self._client.get_collection(
+ config.summary_collection_name
+ )
+ summary_repo = DispatcherSummaryRepository(summary_collection)
+ summaries = summary_repo.list_summaries()
+
+ for s in summaries:
+ s.pop("_id", None)
+ s.pop("_metadata", None)
+
+ return summaries
+ except Exception as e:
+ logger.warning(f"Failed to load summaries from MongoDB: {e}")
+ return []
diff --git a/db/__init__.py b/db/__init__.py
index 973a712..8a05b13 100644
--- a/db/__init__.py
+++ b/db/__init__.py
@@ -25,16 +25,22 @@
importer.import_directory(Path("./output"))
"""
-from .client import MongoDBClient, MongoDBConnectionError
-from .config import DatabaseConfig
-from .importer import DataImporter
-from .repository import DispatcherSummaryRepository, TestRunRepository
+# Conditionally import MongoDB-dependent modules
+# This allows db.utils to be imported without pymongo
+try:
+ from .client import MongoDBClient, MongoDBConnectionError
+ from .config import DatabaseConfig
+ from .importer import DataImporter
+ from .repository import DispatcherSummaryRepository, TestRunRepository
-__all__ = [
- "DatabaseConfig",
- "MongoDBClient",
- "MongoDBConnectionError",
- "TestRunRepository",
- "DispatcherSummaryRepository",
- "DataImporter",
-]
+ __all__ = [
+ "DatabaseConfig",
+ "MongoDBClient",
+ "MongoDBConnectionError",
+ "TestRunRepository",
+ "DispatcherSummaryRepository",
+ "DataImporter",
+ ]
+except ImportError:
+ # pymongo not installed - MongoDB features unavailable
+ __all__ = []
From 2170103bbe60169aaff2d6885bf1e8fd93801118 Mon Sep 17 00:00:00 2001
From: Li Baoming <1508269885@qq.com>
Date: Fri, 13 Mar 2026 18:42:50 +0800
Subject: [PATCH 5/5] fix: resolve flake8 error
---
.pre-commit-config.yaml | 10 +++++++---
dashboard/utils/data_loader.py | 11 ++++++++---
2 files changed, 15 insertions(+), 6 deletions(-)
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 73e3029..b43c03d 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -9,6 +9,10 @@ repos:
rev: 6.1.0
hooks:
- id: flake8
- # Use .flake8 for configuration (separate file to avoid TOML parsing issues)
- # --exit-zero: show warnings but don't fail pre-commit
- args: ['--config', '.flake8', '--exit-zero']
+ # Match GitHub CI: check for syntax errors and undefined names
+ # E9: Runtime errors (IndentationError, SyntaxError)
+ # F63: Assertion errors
+ # F7: Various errors
+ # F82: Undefined names
+ args: ['--select=E9,F63,F7,F82', '--show-source', '--statistics']
+
diff --git a/dashboard/utils/data_loader.py b/dashboard/utils/data_loader.py
index 6eb0a40..b605920 100644
--- a/dashboard/utils/data_loader.py
+++ b/dashboard/utils/data_loader.py
@@ -1,9 +1,14 @@
#!/usr/bin/env python3
"""Unified data loader for InfiniMetrics dashboard."""
+from __future__ import annotations
+
import logging
from pathlib import Path
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from .mongo_data_source import MongoDataSource
from .data_sources import DataSource, FileDataSource
from .data_utils import extract_accelerator_types, extract_run_info, get_friendly_size
@@ -48,7 +53,7 @@ def __init__(
else:
self._source = FileDataSource(results_dir)
- def _try_connect_mongo(self) -> Optional["MongoDataSource"]:
+ def _try_connect_mongo(self) -> Optional[MongoDataSource]:
"""
Try to connect to MongoDB.
@@ -65,7 +70,7 @@ def _try_connect_mongo(self) -> Optional["MongoDataSource"]:
logger.warning(f"MongoDB dependencies not installed ({e})")
return None
- def _apply_mongo_or_fallback(self, mongo_source: Optional["MongoDataSource"]):
+ def _apply_mongo_or_fallback(self, mongo_source: Optional[MongoDataSource]):
"""Apply MongoDB source or fallback to files based on configuration."""
if mongo_source:
self._source = mongo_source