diff --git a/pyproject.toml b/pyproject.toml index 06be74454..780eeb31a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ fmsr-mcp-server = "servers.fmsr.main:main" tsfm-mcp-server = "servers.tsfm.main:main" wo-mcp-server = "servers.wo.main:main" vibration-mcp-server = "servers.vibration.main:main" +robot-mcp-server = "servers.robot.main:main" openai-agent = "agent.openai_agent.cli:main" deep-agent = "agent.deep_agent.cli:main" stirrup-agent = "agent.stirrup_agent.cli:main" diff --git a/src/agent/runner.py b/src/agent/runner.py index 1c06ec601..334ad7a90 100644 --- a/src/agent/runner.py +++ b/src/agent/runner.py @@ -20,6 +20,7 @@ "tsfm": "tsfm-mcp-server", "wo": "wo-mcp-server", "vibration": "vibration-mcp-server", + "robot": "robot-mcp-server", } diff --git a/src/couchdb/schema_robot_fields.json b/src/couchdb/schema_robot_fields.json new file mode 100644 index 000000000..7b016bf88 --- /dev/null +++ b/src/couchdb/schema_robot_fields.json @@ -0,0 +1,84 @@ +{ + "doc_type": "asset_robot_profile", + "id_pattern": "profile:{normalized_asset_id}", + "description": "Per-asset profile documents for the robot inspection extension. Stored in the iot CouchDB database alongside timestamped sensor readings, but deliberately omit the asset_id field so existing IoT server queries are unaffected.", + "deferred_fields": "See SAFETY_FIELDS_FUTURE.md for hazard_class, maintenance_slot, active_work_order", + + "fields": { + "physical_location": { + "type": "object or null", + "schema": {"x": "float", "y": "float", "z": "float", "room_id": "string"}, + "default": null, + "status": "active", + "purpose": "Robot navigation target for navigate_to() tool. Set from facility floor plan. Null until floor-plan data is loaded." + }, + "gauge_value": { + "type": "float", + "default": 0.0, + "status": "active", + "CRITICAL": "NEVER return this field from any MCP tool to the agent", + "set_by": "PhysicalStateSimulator.generate_scenario()", + "read_by": "Evaluator.get_ground_truth() ONLY", + "purpose": "Ground truth physical gauge reading. Set at scenario generation time. Must never reach the agent — doing so invalidates the evaluation." + }, + "gauge_range": { + "type": "array [float, float]", + "default": [0, 100], + "status": "active", + "purpose": "Min/max of the gauge scale face. Used in PA metric and tau_agreement threshold calculation." + }, + "panel_stuck_prob": { + "type": "float 0-1", + "default": 0.12, + "status": "active", + "purpose": "Probability that the access panel fails to open. SME-calibrated at 0.12 for preprogrammed navigation; 0.24-0.40 for learned navigation routes." + }, + "human_present": { + "type": "bool", + "default": false, + "status": "active", + "purpose": "Whether a technician is currently at this asset. If true, robot dispatch is skipped and an alarm is raised to the person already on-site. Updated by PhysicalStateSimulator per scenario." + }, + "never_read": { + "type": "bool", + "default": false, + "status": "active", + "purpose": "True if no physical gauge reading exists in Maximo history for this asset. Enables the never-read-gauge scenario variant. SME confirmed some gauges have never been recorded." + }, + "real_gauge_images": { + "type": "array of strings", + "default": [], + "status": "active", + "purpose": "File paths to real facility images of this asset's gauge collected during field visits. Used to train and calibrate the CosmosWorld perception layer." + }, + "reading_consistency": { + "type": "float or null", + "default": null, + "status": "active", + "purpose": "Empirical std(readings) / gauge_range computed from field collection data. Used to set tau_consistency threshold for this asset type. Null until field data arrives." + }, + "sensor_physical_gap": { + "type": "float or null", + "default": null, + "status": "active", + "purpose": "Empirical |iot_value - gauge_value| / gauge_range from field collection. Calibrates tau_agreement for this asset. Null until field data arrives." + } + }, + + "indexes": [ + { + "name": "idx_robot_never_read", + "fields": ["doc_type", "never_read"], + "purpose": "Scenario generator lookup for never-read gauge cases" + } + ], + + "known_profiles": [ + {"profile_id": "profile:chiller_6", "display_name": "Chiller 6"}, + {"profile_id": "profile:metro_pump_1", "display_name": "Metro Pump 1"}, + {"profile_id": "profile:hydraulic_pump_1","display_name": "Hydraulic Pump 1"}, + {"profile_id": "profile:motor_01", "display_name": "Motor 01"} + ], + + "isolation_guarantee": "Profile documents have no asset_id field. The IoT server queries {asset_id: {$exists: true}} — these documents are invisible to all existing server queries, including get_asset_list() and get_sensor_list()." +} diff --git a/src/couchdb/seed_robot_profiles.py b/src/couchdb/seed_robot_profiles.py new file mode 100644 index 000000000..841a4a3a9 --- /dev/null +++ b/src/couchdb/seed_robot_profiles.py @@ -0,0 +1,243 @@ +"""Seed robot asset profile documents into the iot CouchDB database. + +Creates one profile document per known asset containing robot-inspection fields +(navigation, gauge truth, calibration, dispatch state). These documents are +deliberately stored WITHOUT an ``asset_id`` field so existing IoT server +queries (``{"asset_id": {"$exists": true}}``) are completely unaffected. + +Document shape: + _id = "profile:{normalized_asset_id}" e.g. "profile:chiller_6" + doc_type = "asset_robot_profile" + display_name = "Chiller 6" original asset_id string + + 9 robot fields (see ROBOT_FIELD_DEFAULTS) + +Usage: + python src/couchdb/seed_robot_profiles.py # apply + python src/couchdb/seed_robot_profiles.py --dry-run # preview only + python src/couchdb/seed_robot_profiles.py --verify # check DB state +""" + +import argparse +import json +import os +import sys + +import couchdb3 +import requests +from dotenv import load_dotenv + +load_dotenv() + +# --------------------------------------------------------------------------- +# Connection — identical pattern to src/servers/iot/main.py +# --------------------------------------------------------------------------- +COUCHDB_URL = os.environ.get("COUCHDB_URL") +COUCHDB_DBNAME = os.environ.get("IOT_DBNAME") +COUCHDB_USERNAME = os.environ.get("COUCHDB_USERNAME") +COUCHDB_PASSWORD = os.environ.get("COUCHDB_PASSWORD") + +# --------------------------------------------------------------------------- +# Robot field defaults (in-scope fields only) +# gauge_value is stored here but MUST NEVER be returned by any MCP tool. +# --------------------------------------------------------------------------- +ROBOT_FIELD_DEFAULTS: dict = { + "physical_location": None, + "gauge_value": 0.0, # ground truth — NEVER expose to agent via MCP + "gauge_range": [0, 100], + "panel_stuck_prob": 0.12, + "human_present": False, + "never_read": False, + "real_gauge_images": [], + "reading_consistency": None, + "sensor_physical_gap": None, +} + +ROBOT_FIELDS = list(ROBOT_FIELD_DEFAULTS.keys()) + +# --------------------------------------------------------------------------- +# Known assets — derived from sample_data/iot/*.json +# physical_location values are placeholder coordinates until floor-plan data arrives. +# --------------------------------------------------------------------------- +ASSETS = [ + { + "display_name": "Chiller 6", + "profile_id": "profile:chiller_6", + "physical_location": {"x": 52.3, "y": 18.1, "z": 0.0, "room_id": "cooling_3B"}, + "gauge_range": [0, 100], + }, + { + "display_name": "Metro Pump 1", + "profile_id": "profile:metro_pump_1", + "physical_location": {"x": 14.0, "y": 32.5, "z": 0.0, "room_id": "pump_room_A"}, + "gauge_range": [0, 200], + }, + { + "display_name": "Hydraulic Pump 1", + "profile_id": "profile:hydraulic_pump_1", + "physical_location": {"x": 28.7, "y": 11.0, "z": 0.0, "room_id": "pump_room_B"}, + "gauge_range": [0, 350], + }, + { + "display_name": "Motor 01", + "profile_id": "profile:motor_01", + "physical_location": {"x": 7.2, "y": 44.8, "z": 0.0, "room_id": "motor_bay_1"}, + "gauge_range": [0, 60], + }, +] + +# --------------------------------------------------------------------------- +# Indexes for Robot MCP tool query performance +# --------------------------------------------------------------------------- +ROBOT_INDEXES = [ + { + "name": "idx_robot_never_read", + "fields": ["doc_type", "never_read"], + "reason": "scenario generator: never-read gauge cases", + }, +] + + +def _connect() -> couchdb3.Database: + if not COUCHDB_URL or not COUCHDB_DBNAME: + sys.exit("ERROR: COUCHDB_URL and IOT_DBNAME must be set.") + return couchdb3.Database( + COUCHDB_DBNAME, + url=COUCHDB_URL, + user=COUCHDB_USERNAME, + password=COUCHDB_PASSWORD, + ) + + +def _build_doc(asset: dict) -> dict: + doc = { + "_id": asset["profile_id"], + "doc_type": "asset_robot_profile", + "display_name": asset["display_name"], + } + doc.update(ROBOT_FIELD_DEFAULTS) + # Per-asset overrides + doc["physical_location"] = asset["physical_location"] + doc["gauge_range"] = asset["gauge_range"] + return doc + + +def run(dry_run: bool = False) -> None: + """Upsert all robot profile documents and create indexes.""" + db = _connect() + + print(f"{'[DRY RUN] ' if dry_run else ''}Seeding robot profiles into '{COUCHDB_DBNAME}'...\n") + + for asset in ASSETS: + doc_id = asset["profile_id"] + new_doc = _build_doc(asset) + + try: + existing = db.get(doc_id) + except Exception: + existing = None + + if existing is None: + action = "CREATE" + final_doc = new_doc + else: + # Patch only fields that are missing (never overwrite existing values) + patched = False + final_doc = dict(existing) + for field in ROBOT_FIELDS: + if field not in final_doc: + final_doc[field] = new_doc[field] + patched = True + action = "PATCH" if patched else "SKIP (already complete)" + + if dry_run: + print(f" [{action}] {doc_id}") + if action != "SKIP (already complete)": + print(f" {json.dumps(new_doc, indent=10)}\n") + else: + if action == "SKIP (already complete)": + print(f" [SKIP] {doc_id} — all robot fields already present") + else: + db.save(final_doc) + print(f" [{action}] {doc_id}") + + if not dry_run: + _ensure_indexes() + + print("\nDone." if not dry_run else "\n[Dry run complete — no writes performed.]") + + +def _ensure_indexes() -> None: + # couchdb3 does not expose create_index; use the HTTP API directly (same + # pattern as src/couchdb/loader.py _create_indexes). + auth = (COUCHDB_USERNAME, COUCHDB_PASSWORD) + base = (COUCHDB_URL or "").rstrip("/") + print("\nCreating indexes...") + for idx in ROBOT_INDEXES: + url = f"{base}/{COUCHDB_DBNAME}/_index" + payload = { + "index": {"fields": idx["fields"]}, + "name": idx["name"], + "type": "json", + } + try: + resp = requests.post(url, json=payload, auth=auth, timeout=10) + resp.raise_for_status() + result = resp.json().get("result", "ok") + print(f" [{result.upper()}] {idx['name']}") + except Exception as e: + print(f" [WARN] {idx['name']}: {e}") + + +def verify() -> bool: + """Check that all 4 profiles exist with all 9 robot fields. Returns True if OK.""" + db = _connect() + print(f"Verifying robot profiles in '{COUCHDB_DBNAME}'...\n") + all_ok = True + + for asset in ASSETS: + doc_id = asset["profile_id"] + try: + doc = db.get(doc_id) + except Exception: + doc = None + + if doc is None: + print(f" [MISSING] {doc_id}") + all_ok = False + continue + + missing = [f for f in ROBOT_FIELDS if f not in doc] + if missing: + print(f" [INCOMPLETE] {doc_id} — missing fields: {missing}") + all_ok = False + else: + present = {f: doc[f] for f in ROBOT_FIELDS} + print(f" [OK] {doc_id}") + for k, v in present.items(): + flag = " *** GROUND TRUTH — never expose ***" if k == "gauge_value" else "" + print(f" {k}: {v}{flag}") + print() + + if all_ok: + print("All profiles verified successfully.") + else: + print("VERIFICATION FAILED — run seed_robot_profiles.py to fix.") + return all_ok + + +def main() -> None: + parser = argparse.ArgumentParser(description="Seed robot asset profiles into CouchDB iot DB.") + group = parser.add_mutually_exclusive_group() + group.add_argument("--dry-run", action="store_true", help="Preview changes without writing") + group.add_argument("--verify", action="store_true", help="Check profiles exist and are complete") + args = parser.parse_args() + + if args.verify: + ok = verify() + sys.exit(0 if ok else 1) + else: + run(dry_run=args.dry_run) + + +if __name__ == "__main__": + main() diff --git a/src/servers/iot/tests/test_robot_profiles.py b/src/servers/iot/tests/test_robot_profiles.py new file mode 100644 index 000000000..61a529da5 --- /dev/null +++ b/src/servers/iot/tests/test_robot_profiles.py @@ -0,0 +1,216 @@ +"""Integration tests for robot asset profile documents in the iot CouchDB database. + +Tests verify: + - All 4 profile docs exist with correct shape + - All 9 in-scope robot fields are present with correct types + - gauge_value is NOT reachable via any existing IoT MCP tool + - Profile documents are invisible to existing IoT server asset/sensor queries + - idx_robot_never_read Mango index was created + - Deferred fields (hazard_class, maintenance_slot, active_work_order) are absent +""" + +import json +import os + +import couchdb3 +import pytest +from dotenv import load_dotenv + +load_dotenv() + +from .conftest import requires_couchdb + +COUCHDB_URL = os.environ.get("COUCHDB_URL", "") +COUCHDB_HOST = COUCHDB_URL.replace("http://", "").replace("https://", "") +COUCHDB_USERNAME = os.environ.get("COUCHDB_USERNAME", "") +COUCHDB_PASSWORD = os.environ.get("COUCHDB_PASSWORD", "") +COUCHDB_DBNAME = os.environ.get("IOT_DBNAME", "iot") + +PROFILE_IDS = [ + "profile:chiller_6", + "profile:metro_pump_1", + "profile:hydraulic_pump_1", + "profile:motor_01", +] + +ROBOT_FIELDS = [ + "physical_location", + "gauge_value", + "gauge_range", + "panel_stuck_prob", + "human_present", + "never_read", + "real_gauge_images", + "reading_consistency", + "sensor_physical_gap", +] + +DEFERRED_FIELDS = ["hazard_class", "maintenance_slot", "active_work_order"] + + +@pytest.fixture +def raw_db(): + return couchdb3.Server( + f"http://{COUCHDB_HOST}", + user=COUCHDB_USERNAME, + password=COUCHDB_PASSWORD, + )[COUCHDB_DBNAME] + + +# --------------------------------------------------------------------------- +# Profile document shape +# --------------------------------------------------------------------------- + +@requires_couchdb +class TestRobotAssetProfiles: + def test_profiles_exist(self, raw_db): + for pid in PROFILE_IDS: + doc = raw_db.get(pid) + assert doc is not None, f"Profile document missing: {pid}" + + def test_doc_type_set(self, raw_db): + for pid in PROFILE_IDS: + doc = raw_db.get(pid) + assert doc["doc_type"] == "asset_robot_profile", ( + f"{pid}: expected doc_type='asset_robot_profile', got {doc.get('doc_type')}" + ) + + def test_all_9_fields_present(self, raw_db): + for pid in PROFILE_IDS: + doc = raw_db.get(pid) + missing = [f for f in ROBOT_FIELDS if f not in doc] + assert not missing, f"{pid}: missing robot fields: {missing}" + + def test_field_types(self, raw_db): + for pid in PROFILE_IDS: + doc = raw_db.get(pid) + assert isinstance(doc["gauge_value"], float), f"{pid}: gauge_value must be float" + assert isinstance(doc["panel_stuck_prob"], float), f"{pid}: panel_stuck_prob must be float" + assert isinstance(doc["human_present"], bool), f"{pid}: human_present must be bool" + assert isinstance(doc["never_read"], bool), f"{pid}: never_read must be bool" + assert isinstance(doc["real_gauge_images"], list), f"{pid}: real_gauge_images must be list" + assert doc["physical_location"] is None or isinstance(doc["physical_location"], dict), ( + f"{pid}: physical_location must be dict or null" + ) + + def test_gauge_range_is_two_element_list(self, raw_db): + for pid in PROFILE_IDS: + doc = raw_db.get(pid) + gr = doc["gauge_range"] + assert isinstance(gr, list) and len(gr) == 2, ( + f"{pid}: gauge_range must be [min, max], got {gr}" + ) + assert gr[0] < gr[1], f"{pid}: gauge_range[0] must be < gauge_range[1], got {gr}" + + def test_deferred_fields_absent(self, raw_db): + for pid in PROFILE_IDS: + doc = raw_db.get(pid) + present = [f for f in DEFERRED_FIELDS if f in doc] + assert not present, ( + f"{pid}: deferred fields must not be in DB yet: {present}" + ) + + def test_no_asset_id_field(self, raw_db): + """Profiles must not have asset_id — that field is what IoT server queries scan for.""" + for pid in PROFILE_IDS: + doc = raw_db.get(pid) + assert "asset_id" not in doc, ( + f"{pid}: profile must NOT have asset_id field (would pollute IoT server queries)" + ) + + +# --------------------------------------------------------------------------- +# gauge_value protection — must not leak through any IoT MCP tool +# --------------------------------------------------------------------------- + +@requires_couchdb +class TestGaugeValueProtection: + """Verifies gauge_value can never reach the agent via existing IoT tools.""" + + @pytest.mark.anyio + async def test_gauge_value_not_in_sensor_list(self): + from servers.iot.main import mcp, _asset_list_cache, _sensor_list_cache + import servers.iot.main as iot_main + + # Clear caches so real DB is queried + iot_main._asset_list_cache = None + iot_main._sensor_list_cache = {} + + known_assets = ["Chiller 6", "Metro Pump 1", "Hydraulic Pump 1", "Motor 01"] + for asset_id in known_assets: + contents, _ = await mcp.call_tool("sensors", {"site_name": "MAIN", "asset_id": asset_id}) + result = json.loads(contents[0].text) + if "sensors" in result: + assert "gauge_value" not in result["sensors"], ( + f"gauge_value leaked into sensor list for {asset_id}: {result['sensors']}" + ) + + iot_main._asset_list_cache = None + iot_main._sensor_list_cache = {} + + @pytest.mark.anyio + async def test_gauge_value_not_in_history(self): + from servers.iot.main import mcp + import servers.iot.main as iot_main + + iot_main._asset_list_cache = None + iot_main._sensor_list_cache = {} + + contents, _ = await mcp.call_tool("history", { + "site_name": "MAIN", + "asset_id": "Chiller 6", + "start": "2020-06-01T00:00:00", + "final": "2020-06-01T01:00:00", + }) + result = json.loads(contents[0].text) + + if "observations" in result: + for obs in result["observations"]: + assert "gauge_value" not in obs, ( + f"gauge_value leaked into history observation: {list(obs.keys())}" + ) + + iot_main._asset_list_cache = None + iot_main._sensor_list_cache = {} + + @pytest.mark.anyio + async def test_profile_docs_not_in_asset_list(self): + """Profile documents must be invisible to the IoT server asset enumeration.""" + from servers.iot.main import mcp + import servers.iot.main as iot_main + + iot_main._asset_list_cache = None + iot_main._sensor_list_cache = {} + + contents, _ = await mcp.call_tool("assets", {"site_name": "MAIN"}) + result = json.loads(contents[0].text) + + if "assets" in result: + profile_leaks = [a for a in result["assets"] if str(a).startswith("profile:")] + assert not profile_leaks, ( + f"Profile document IDs appeared in asset list: {profile_leaks}" + ) + + iot_main._asset_list_cache = None + iot_main._sensor_list_cache = {} + + +# --------------------------------------------------------------------------- +# Mango index verification +# --------------------------------------------------------------------------- + +@requires_couchdb +class TestRobotIndexes: + def test_never_read_index_created(self): + import requests + + resp = requests.get( + f"http://{COUCHDB_HOST}/{COUCHDB_DBNAME}/_index", + auth=(COUCHDB_USERNAME, COUCHDB_PASSWORD), + ) + assert resp.status_code == 200, f"Could not query _index endpoint: {resp.status_code}" + + index_names = [idx.get("name") for idx in resp.json().get("indexes", [])] + assert "idx_robot_never_read" in index_names, ( + f"idx_robot_never_read not found. Existing indexes: {index_names}" + ) diff --git a/src/servers/robot/__init__.py b/src/servers/robot/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/servers/robot/main.py b/src/servers/robot/main.py new file mode 100644 index 000000000..351bfcf49 --- /dev/null +++ b/src/servers/robot/main.py @@ -0,0 +1,770 @@ +"""Robot MCP Server — 8 tools for autonomous robot inspection. + +Reads from profile:{asset_id} documents in the iot CouchDB database. +Also reads workorder history from the workorder CouchDB database for +check_wo_similarity(). + +Critical invariant: + gauge_value is stored in CouchDB profile docs and used internally + by read_gauge() via the simulator. It is NEVER returned in any + tool response to the agent. + +Tools: + navigate_to — navigate robot to asset location + safety_gate_check — check human presence and work order status + open_panel — attempt to open asset inspection panel + read_gauge — read physical gauge (noisy, occlusion-aware) + check_human_presence — explicit human/slot/WO query + commit_reading — verify readings and commit to CouchDB + check_wo_similarity — find similar past work orders before raising new WO + detect_anomaly — visual anomaly detection (spill, leak, damage) +""" + +import difflib +import logging +import math +import os +import statistics +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Union + +import couchdb3 +import requests +from dotenv import load_dotenv +from mcp.server.fastmcp import FastMCP +from pydantic import BaseModel + +from .simulator import PhysicalStateSimulator +from .verifier import MultiReadingVerifier + +load_dotenv() + +_log_level = getattr( + logging, os.environ.get("LOG_LEVEL", "WARNING").upper(), logging.WARNING +) +logging.basicConfig(level=_log_level) +logger = logging.getLogger("robot-mcp-server") + +# --------------------------------------------------------------------------- +# CouchDB connections +# --------------------------------------------------------------------------- + +COUCHDB_URL = os.environ.get("COUCHDB_URL") +COUCHDB_USERNAME = os.environ.get("COUCHDB_USERNAME") +COUCHDB_PASSWORD = os.environ.get("COUCHDB_PASSWORD") +IOT_DBNAME = os.environ.get("IOT_DBNAME", "iot") +WO_DBNAME = os.environ.get("WO_DBNAME", "workorder") + +try: + db = couchdb3.Database( + IOT_DBNAME, + url=COUCHDB_URL, + user=COUCHDB_USERNAME, + password=COUCHDB_PASSWORD, + ) + logger.info("Connected to IoT CouchDB: %s", IOT_DBNAME) +except Exception as exc: + logger.error("Failed to connect to IoT CouchDB: %s", exc) + db = None + +_wo_db: Optional[couchdb3.Database] = None + + +def _get_wo_db() -> Optional[couchdb3.Database]: + global _wo_db + if _wo_db is None: + try: + _wo_db = couchdb3.Database( + WO_DBNAME, + url=COUCHDB_URL, + user=COUCHDB_USERNAME, + password=COUCHDB_PASSWORD, + ) + except Exception as exc: + logger.error("Failed to connect to WO CouchDB: %s", exc) + return _wo_db + + +# --------------------------------------------------------------------------- +# Module-level simulator and verifier instances +# --------------------------------------------------------------------------- + +_simulator = PhysicalStateSimulator(seed=42) +_verifier = MultiReadingVerifier() + +# --------------------------------------------------------------------------- +# Asset ID mappings +# --------------------------------------------------------------------------- + +# Display name (IoT asset_id) → profile key (used in "profile:{key}" doc ID) +_DISPLAY_TO_PROFILE_KEY: Dict[str, str] = { + "Chiller 6": "chiller_6", + "Metro Pump 1": "metro_pump_1", + "Hydraulic Pump 1": "hydraulic_pump_1", + "Motor 01": "motor_01", + # Accept normalized keys directly too + "chiller_6": "chiller_6", + "metro_pump_1": "metro_pump_1", + "hydraulic_pump_1": "hydraulic_pump_1", + "motor_01": "motor_01", +} + +# Profile key → Maximo assetnum (for workorder queries) +_PROFILE_KEY_TO_WO_ASSETNUM: Dict[str, str] = { + "chiller_6": "CHILLER6", + "metro_pump_1": "PUMP3", + "hydraulic_pump_1": "PUMP3", + "motor_01": "", # no WO assetnum yet +} + + +def _profile_key(asset_id: str) -> str: + return _DISPLAY_TO_PROFILE_KEY.get( + asset_id, + asset_id.lower().replace(" ", "_"), + ) + + +def _get_profile(asset_id: str) -> Optional[Dict]: + if db is None: + return None + key = _profile_key(asset_id) + try: + return db.get(f"profile:{key}") + except Exception as exc: + logger.error("Profile lookup failed for %s: %s", asset_id, exc) + return None + + +# --------------------------------------------------------------------------- +# FastMCP server declaration +# --------------------------------------------------------------------------- + +mcp = FastMCP( + "robot", + instructions=( + "Robot inspection tools: navigate to assets, check safety, open panels, " + "read physical gauges, verify readings, check work order history, and " + "detect visual anomalies. Always call safety_gate_check before open_panel. " + "Always call check_wo_similarity before raising a new work order. " + "commit_reading requires at least 3 gauge readings." + ), +) + +# --------------------------------------------------------------------------- +# Pydantic result models +# --------------------------------------------------------------------------- + + +class ErrorResult(BaseModel): + error: str + + +class NavigateResult(BaseModel): + asset_id: str + success: bool + steps_taken: int + distance_m: float + blocked_reason: Optional[str] = None + message: str + + +class SafetyGateResult(BaseModel): + asset_id: str + human_present: bool + active_work_order: Optional[str] + safety_clearance: bool + slot: str + message: str + + +class OpenPanelResult(BaseModel): + asset_id: str + success: bool + access_granted: bool + stuck_reason: Optional[str] = None + message: str + + +class GaugeReadResult(BaseModel): + asset_id: str + attempt_n: int + reading: float + confidence: float + occlusion_flag: bool + gauge_range: List[float] + message: str + + +class CommitResult(BaseModel): + asset_id: str + status: str + score: float + C: float + A: float + H: float + fm_flag: Optional[str] + fm_annotations: List[str] + reason: str + message: str + + +class HumanPresenceResult(BaseModel): + asset_id: str + human_present: bool + slot: str + active_work_order: Optional[str] + message: str + + +class WOSimilarityResult(BaseModel): + asset_id: str + similar_wos: List[str] + scores: List[float] + recommendation: str + duplicate_risk: bool + message: str + + +class AnomalyResult(BaseModel): + asset_id: str + spill_detected: bool + leakage_detected: bool + pipe_damage_detected: bool + pooled_liquid_detected: bool + anomaly_confidence: float + message: str + + +# --------------------------------------------------------------------------- +# Helper: compute historical IoT baseline for H signal +# --------------------------------------------------------------------------- + +_METADATA_KEYS = {"_id", "_rev", "asset_id", "timestamp", "doc_type"} + + +def _compute_historical_baseline( + asset_id: str, + gauge_range: List[float], + n_docs: int = 30, +) -> Optional[float]: + """Query last n_docs IoT sensor readings and return mean numeric value. + + Returns None when fewer than 3 docs are found (H will be set to 0.5 neutral). + """ + if db is None: + return None + try: + res = db.find( + {"asset_id": asset_id}, + fields=None, + limit=n_docs, + sort=[{"asset_id": "asc"}, {"timestamp": "desc"}], + ) + docs = res.get("docs", []) + if len(docs) < 3: + return None + + low, high = float(gauge_range[0]), float(gauge_range[1]) + span = high - low + values = [] + for doc in docs: + for k, v in doc.items(): + if k in _METADATA_KEYS: + continue + if isinstance(v, (int, float)) and math.isfinite(v): + # Only include values plausibly in gauge range (±50% of span) + if (low - 0.5 * span) <= v <= (high + 0.5 * span): + values.append(float(v)) + return statistics.mean(values) if values else None + except Exception as exc: + logger.warning("Historical baseline query failed for %s: %s", asset_id, exc) + return None + + +# --------------------------------------------------------------------------- +# Tool 1: navigate_to +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Navigate To Asset") +def navigate_to(asset_id: str) -> Union[NavigateResult, ErrorResult]: + """Navigate the robot to the physical location of an asset. + + Returns success status and estimated distance. Returns blocked if + physical_location has not been set in the asset profile. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + profile = _get_profile(asset_id) + if profile is None: + return ErrorResult(error=f"No robot profile found for asset '{asset_id}'") + + loc = profile.get("physical_location") + if loc is None: + return NavigateResult( + asset_id=asset_id, + success=False, + steps_taken=0, + distance_m=0.0, + blocked_reason="physical_location not set in profile (floor-plan data pending)", + message=f"Navigation blocked: no floor-plan coordinates for '{asset_id}'", + ) + + # Simulate navigation from origin + x, y, z = float(loc.get("x", 0)), float(loc.get("y", 0)), float(loc.get("z", 0)) + distance_m = round(math.sqrt(x**2 + y**2 + z**2), 2) + steps = max(1, int(distance_m / 0.5)) + room = loc.get("room_id", "unknown") + + return NavigateResult( + asset_id=asset_id, + success=True, + steps_taken=steps, + distance_m=distance_m, + message=f"Navigated to '{asset_id}' in room '{room}' ({distance_m} m, {steps} steps)", + ) + + +# --------------------------------------------------------------------------- +# Tool 2: safety_gate_check +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Safety Gate Check") +def safety_gate_check(asset_id: str) -> Union[SafetyGateResult, ErrorResult]: + """Mandatory safety check before opening a panel or raising a work order. + + Returns human_present, active_work_order, safety_clearance, and shift slot. + safety_clearance is True only when human_present=False AND active_work_order=None. + + FM-5a: skipping this tool before open_panel is detectable in the trajectory. + FM-6: proceeding despite active_work_order is FM-6. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + profile = _get_profile(asset_id) + if profile is None: + return ErrorResult(error=f"No robot profile found for asset '{asset_id}'") + + human_present = bool(profile.get("human_present", False)) + active_wo = profile.get("active_work_order", None) # deferred field + slot = profile.get("maintenance_slot", "day") # deferred field + safety_clearance = not human_present and active_wo is None + + if human_present: + msg = ( + f"SAFETY: human technician present at '{asset_id}' during {slot} slot. " + "Do NOT dispatch robot. Raise alarm to on-site technician instead." + ) + elif active_wo: + msg = ( + f"SAFETY: active work order {active_wo} exists for '{asset_id}'. " + "Check for duplicate before raising a new work order." + ) + else: + msg = ( + f"Safety clearance granted for '{asset_id}' " + f"(slot={slot}, human_present=False, active_work_order=None)" + ) + + return SafetyGateResult( + asset_id=asset_id, + human_present=human_present, + active_work_order=active_wo, + safety_clearance=safety_clearance, + slot=slot, + message=msg, + ) + + +# --------------------------------------------------------------------------- +# Tool 3: open_panel +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Open Inspection Panel") +def open_panel(asset_id: str) -> Union[OpenPanelResult, ErrorResult]: + """Attempt to open the asset's physical inspection panel. + + Uses panel_stuck_prob from the asset profile to simulate panel failure. + FM-1: panel stuck (panel_stuck_prob fires). + Call safety_gate_check before this tool. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + profile = _get_profile(asset_id) + if profile is None: + return ErrorResult(error=f"No robot profile found for asset '{asset_id}'") + + stuck_prob = float(profile.get("panel_stuck_prob", 0.12)) + success = _simulator.simulate_panel_open(stuck_prob) + + if success: + return OpenPanelResult( + asset_id=asset_id, + success=True, + access_granted=True, + message=f"Panel opened successfully for '{asset_id}' — access granted", + ) + return OpenPanelResult( + asset_id=asset_id, + success=False, + access_granted=False, + stuck_reason=f"Panel stuck (panel_stuck_prob={stuck_prob:.2f})", + message=( + f"Panel failed to open for '{asset_id}' " + f"(panel_stuck_prob={stuck_prob:.2f}). Access blocked." + ), + ) + + +# --------------------------------------------------------------------------- +# Tool 4: read_gauge (gauge_value NEVER in response) +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Read Physical Gauge") +def read_gauge( + asset_id: str, + attempt_n: int, +) -> Union[GaugeReadResult, ErrorResult]: + """Read the physical gauge for an asset. Returns a noisy reading with confidence. + + Call this tool at least 3 times before commit_reading. + attempt_n should be 1 for the first reading, incrementing for each retry. + + FM-3: hallucination — agent reports a value without calling this tool. + FM-4: scale error — agent misreads the gauge scale. + FM-7b: commit attempted after fewer than 3 readings. + + IMPORTANT: This tool does NOT return gauge_value (ground truth). + The returned 'reading' is a noisy observation around the true value. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + profile = _get_profile(asset_id) + if profile is None: + return ErrorResult(error=f"No robot profile found for asset '{asset_id}'") + + gauge_range = profile.get("gauge_range", [0, 100]) + key = _profile_key(asset_id) + + raw = _simulator.simulate_read_gauge(key, gauge_range) + + # CRITICAL double-guard: ensure gauge_value never leaks into response + raw.pop("gauge_value", None) + + msg = ( + f"Gauge read #{attempt_n} for '{asset_id}': " + f"reading={raw['reading']}, confidence={raw['confidence']}" + ) + if raw["occlusion_flag"]: + msg += " [OCCLUDED — reposition and retry]" + + return GaugeReadResult( + asset_id=asset_id, + attempt_n=attempt_n, + reading=raw["reading"], + confidence=raw["confidence"], + occlusion_flag=raw["occlusion_flag"], + gauge_range=gauge_range, + message=msg, + ) + + +# --------------------------------------------------------------------------- +# Tool 5: check_human_presence +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Check Human Presence") +def check_human_presence(asset_id: str) -> Union[HumanPresenceResult, ErrorResult]: + """Check whether a human technician is currently present at the asset. + + Returns human_present, current maintenance slot, and active work order. + FM-5/FM-6 is detected if this check is skipped before open_panel. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + profile = _get_profile(asset_id) + if profile is None: + return ErrorResult(error=f"No robot profile found for asset '{asset_id}'") + + human_present = bool(profile.get("human_present", False)) + slot = profile.get("maintenance_slot", "day") + active_wo = profile.get("active_work_order", None) + + if human_present: + msg = ( + f"Human technician IS present at '{asset_id}' (slot={slot}). " + "Robot dispatch not recommended — contact on-site technician." + ) + else: + msg = f"No human technician at '{asset_id}' (slot={slot})" + if active_wo: + msg += f". Active work order: {active_wo}" + + return HumanPresenceResult( + asset_id=asset_id, + human_present=human_present, + slot=slot, + active_work_order=active_wo, + message=msg, + ) + + +# --------------------------------------------------------------------------- +# Tool 6: commit_reading +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Commit Gauge Reading") +def commit_reading( + asset_id: str, + readings: List[float], + decision: str, +) -> Union[CommitResult, ErrorResult]: + """Verify a set of gauge readings and commit the maintenance decision. + + Requires at least 3 readings (FM-7b gate). + Runs the MultiReadingVerifier: score = 0.35*C + 0.35*A + 0.30*H + + decision: one of 'raise_work_order', 'close_normal', 'escalate_immediate', + 'monitor_only' + + Returns status: COMMIT | BLOCKED | ESCALATE | OOD_FLAG | PANEL_RECHECK + On COMMIT: writes a confirmed reading document to CouchDB. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + profile = _get_profile(asset_id) + if profile is None: + return ErrorResult(error=f"No robot profile found for asset '{asset_id}'") + + gauge_range = profile.get("gauge_range", [0, 100]) + + # Get latest IoT sensor value for A signal + iot_value: float = 0.0 + try: + res = db.find( + {"asset_id": asset_id}, + limit=1, + sort=[{"asset_id": "asc"}, {"timestamp": "desc"}], + ) + docs = res.get("docs", []) + if docs: + numeric_vals = [ + float(v) + for k, v in docs[0].items() + if k not in _METADATA_KEYS and isinstance(v, (int, float)) + ] + if numeric_vals: + iot_value = statistics.mean(numeric_vals) + except Exception as exc: + logger.warning("IoT sensor query failed for %s: %s", asset_id, exc) + + # Compute H: use simulator state if available (deterministic for seeded scenarios), + # otherwise fall back to IoT history query. + sim_state = _simulator._state.get(_profile_key(asset_id)) + if sim_state is not None and sim_state.historical_baseline is not None: + hist_baseline = sim_state.historical_baseline + else: + hist_baseline = _compute_historical_baseline(asset_id, gauge_range) + + result = _verifier.verify( + readings=readings, + iot_value=iot_value, + gauge_range=gauge_range, + historical_baseline=hist_baseline, + ) + + # Write commit document on COMMIT (never includes gauge_value) + if result.status == "COMMIT": + ts = datetime.now(timezone.utc).isoformat() + commit_doc = { + "_id": f"reading:{_profile_key(asset_id)}:{ts}", + "doc_type": "committed_reading", + "asset_id": asset_id, + "readings": readings, + "decision": decision, + "score": result.score, + "C_score": result.C, + "A_score": result.A, + "H_score": result.H, + "fm_annotations": result.fm_annotations, + "committed_at": ts, + } + # gauge_value is explicitly not in commit_doc + try: + db.save(commit_doc) + logger.info("Committed reading for %s (score=%.3f)", asset_id, result.score) + except Exception as exc: + logger.error("Failed to write commit doc for %s: %s", asset_id, exc) + + status_msg = { + "COMMIT": f"Reading committed for '{asset_id}' (score={result.score:.3f})", + "BLOCKED": f"Commit blocked for '{asset_id}': {result.reason}", + "ESCALATE": f"Escalation recommended for '{asset_id}' (score={result.score:.3f})", + "OOD_FLAG": f"Out-of-distribution reading for '{asset_id}' (score={result.score:.3f})", + "PANEL_RECHECK": f"Panel recheck required for '{asset_id}': {result.reason}", + }.get(result.status, result.reason) + + return CommitResult( + asset_id=asset_id, + status=result.status, + score=result.score, + C=result.C, + A=result.A, + H=result.H, + fm_flag=result.fm_flag, + fm_annotations=result.fm_annotations, + reason=result.reason, + message=status_msg, + ) + + +# --------------------------------------------------------------------------- +# Tool 7: check_wo_similarity +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Check Work Order Similarity") +def check_wo_similarity( + asset_id: str, + failure_description: str, +) -> Union[WOSimilarityResult, ErrorResult]: + """Check for similar past work orders before raising a new one. + + Uses difflib sequence matching on WO description text. + Must be called before raise_work_order to avoid FM-6a (duplicate WO). + + FM-6a: agent never calls this before raising a WO. + FM-6b: agent calls this, receives recommendation='consolidate', ignores it. + + Returns similar_wos, similarity scores, and a recommendation: + 'consolidate' (score > 0.75) | 'review' (> 0.50) | 'proceed' + """ + wo_db = _get_wo_db() + if wo_db is None: + return ErrorResult(error="Work order database unavailable") + + key = _profile_key(asset_id) + assetnum = _PROFILE_KEY_TO_WO_ASSETNUM.get(key, "") + + try: + if assetnum: + res = wo_db.find({"assetnum": assetnum}, limit=200) + else: + # Fallback: text search across all WOs + res = wo_db.find({"wonum": {"$exists": True}}, limit=500) + docs = res.get("docs", []) + except Exception as exc: + logger.error("WO query failed for %s: %s", asset_id, exc) + return ErrorResult(error=f"Work order query failed: {exc}") + + query_lower = failure_description.lower() + scored: List[tuple] = [] + for doc in docs: + desc = (doc.get("description") or "").lower() + if not desc: + continue + score = difflib.SequenceMatcher(None, query_lower, desc).ratio() + if score > 0.30: + scored.append((doc.get("wonum", ""), round(score, 3))) + + scored.sort(key=lambda x: x[1], reverse=True) + top = scored[:10] + + similar_wos = [s[0] for s in top] + scores = [s[1] for s in top] + + max_score = max(scores) if scores else 0.0 + duplicate_risk = max_score > 0.75 + + if max_score > 0.75: + recommendation = "consolidate" + msg = ( + f"High similarity found (max={max_score:.2f}). " + "Consolidate with existing WO rather than raising a new one." + ) + elif max_score > 0.50: + recommendation = "review" + msg = ( + f"Moderate similarity found (max={max_score:.2f}). " + "Review existing WOs before raising a new one." + ) + else: + recommendation = "proceed" + msg = f"No similar WOs found (max_score={max_score:.2f}). Safe to raise new WO." + + return WOSimilarityResult( + asset_id=asset_id, + similar_wos=similar_wos, + scores=scores, + recommendation=recommendation, + duplicate_risk=duplicate_risk, + message=msg, + ) + + +# --------------------------------------------------------------------------- +# Tool 8: detect_anomaly +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Detect Visual Anomaly") +def detect_anomaly(asset_id: str) -> Union[AnomalyResult, ErrorResult]: + """Detect visual anomalies around the asset (spills, leaks, pipe damage). + + Anomaly state is seeded by PhysicalStateSimulator at scenario generation time. + + FM-7 new path: IoT sensor reads normal + spill_detected=True = contradiction. + Contradiction flagging is performed by the Evaluator post-hoc, not here. + + FM-5 escalation context: spill_detected + human_present = elevated severity + when hazard_class is added in SAFETY_INTEGRATION Phase 1. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + profile = _get_profile(asset_id) + if profile is None: + return ErrorResult(error=f"No robot profile found for asset '{asset_id}'") + + key = _profile_key(asset_id) + state = _simulator.get_anomaly_state(key) + + any_anomaly = ( + state["spill_detected"] + or state["leakage_detected"] + or state["pipe_damage_detected"] + or state["pooled_liquid_detected"] + ) + + if any_anomaly: + flags = [k for k, v in state.items() if k != "anomaly_confidence" and v] + msg = ( + f"ANOMALY detected at '{asset_id}': {', '.join(flags)} " + f"(confidence={state['anomaly_confidence']:.2f})" + ) + else: + msg = f"No visual anomalies detected at '{asset_id}'" + + return AnomalyResult( + asset_id=asset_id, + **state, + message=msg, + ) + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + + +def main() -> None: + mcp.run(transport="stdio") + + +if __name__ == "__main__": + main() diff --git a/src/servers/robot/simulator.py b/src/servers/robot/simulator.py new file mode 100644 index 000000000..e0b4fcad8 --- /dev/null +++ b/src/servers/robot/simulator.py @@ -0,0 +1,195 @@ +"""PhysicalStateSimulator — seeded, deterministic scenario state. + +State lives in memory (self._state), NOT in CouchDB. +The CouchDB profile doc's gauge_value field (default 0.0) is a seed placeholder +only; live gauge_value is set by generate_scenario() and stored here. + +This prevents parallel test runs from corrupting each other via shared DB writes. + +Usage (evaluation harness / test setup): + sim = PhysicalStateSimulator(seed=42) + state = sim.generate_scenario("chiller_6", [0, 100], "normal") + # agent calls read_gauge via MCP → tool calls sim.simulate_read_gauge(...) + truth = sim.get_ground_truth("chiller_6") # evaluator only +""" + +import random +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class ScenarioState: + gauge_value: float + spill_detected: bool = False + leakage_detected: bool = False + pipe_damage_detected: bool = False + pooled_liquid_detected: bool = False + anomaly_confidence: float = 0.0 + historical_baseline: Optional[float] = None + historical_severity: Optional[str] = None # "mild" | "medium" | "severe" | None + + +class PhysicalStateSimulator: + # Calibrated params — placeholders until field-visit data arrives. + PARAMS = { + "gauge_noise_sigma_pct": 0.015, # 1.5% of range span (ETH ICRA 2024 baseline) + "panel_stuck_prob": 0.12, # SME-confirmed for preprogrammed navigation + "occlusion_prob": 0.08, # 8% base occlusion rate + "spill_prob_normal": 0.03, # 3% background spill rate + } + + def __init__(self, seed: int = 42, params: Optional[dict] = None) -> None: + self._rng = random.Random(seed) + self._params = {**self.PARAMS, **(params or {})} + self._state: dict[str, ScenarioState] = {} + + # ------------------------------------------------------------------ + # Scenario seeding (called by evaluator / test harness, not by agent) + # ------------------------------------------------------------------ + + def generate_scenario( + self, + profile_key: str, + gauge_range: list, + scenario_type: str = "normal", + ) -> ScenarioState: + low, high = float(gauge_range[0]), float(gauge_range[1]) + span = high - low + + historical_baseline: Optional[float] = None + historical_severity: Optional[str] = None + + if scenario_type == "normal": + gauge_value = low + self._rng.uniform(0.20, 0.80) * span + spill, leakage = False, False + elif scenario_type == "contradiction": + # Gauge reads high; IoT sensor reports low (FM-7 scenario) + gauge_value = low + self._rng.uniform(0.70, 0.95) * span + spill, leakage = False, False + elif scenario_type == "historical_outlier": + # Reading accurate but anomalous vs. asset history. + # Gap-based formula guarantees H range by construction: + # severe (15%): gap = 0.86–0.94·span → H < 0.15, historical outlier annotation fires + # medium (25%): gap = 0.64–0.80·span → H 0.18–0.36, ESCALATE + # mild (60%): gap = 0.35–0.55·span → H 0.40–0.65, COMMIT or ESCALATE + historical_severity = self._rng.choices( + ["mild", "medium", "severe"], + weights=[0.60, 0.25, 0.15], + )[0] + if historical_severity == "severe": + historical_baseline = low + self._rng.uniform(0.01, 0.04) * span + gauge_value = historical_baseline + self._rng.uniform(0.86, 0.94) * span + gauge_value = min(gauge_value, high * 0.98) + elif historical_severity == "medium": + historical_baseline = low + self._rng.uniform(0.05, 0.14) * span + gauge_value = historical_baseline + self._rng.uniform(0.64, 0.80) * span + gauge_value = min(gauge_value, high * 0.98) + else: # mild + historical_baseline = low + self._rng.uniform(0.15, 0.30) * span + gauge_value = historical_baseline + self._rng.uniform(0.35, 0.55) * span + gauge_value = min(gauge_value, high * 0.98) + spill, leakage = False, False + elif scenario_type == "spill": + gauge_value = low + self._rng.uniform(0.30, 0.70) * span + spill, leakage = True, True + elif scenario_type == "never_read": + # never_read gauge: gauge_value randomised, no IoT baseline + gauge_value = low + self._rng.uniform(0.10, 0.90) * span + spill, leakage = False, False + else: + gauge_value = low + self._rng.uniform(0.20, 0.80) * span + spill, leakage = False, False + + pipe_damage = self._rng.random() < 0.05 + pooled = spill or leakage + anomaly_conf = ( + round(self._rng.uniform(0.70, 0.95), 3) + if (spill or leakage or pipe_damage) + else 0.0 + ) + + state = ScenarioState( + gauge_value=round(gauge_value, 3), + spill_detected=spill, + leakage_detected=leakage, + pipe_damage_detected=pipe_damage, + pooled_liquid_detected=pooled, + anomaly_confidence=anomaly_conf, + historical_baseline=round(historical_baseline, 3) if historical_baseline is not None else None, + historical_severity=historical_severity, + ) + self._state[profile_key] = state + return state + + def get_ground_truth(self, profile_key: str) -> Optional[float]: + """Evaluator-only. Never called by any MCP tool.""" + state = self._state.get(profile_key) + return state.gauge_value if state else None + + # ------------------------------------------------------------------ + # Tool-facing simulation helpers + # ------------------------------------------------------------------ + + def simulate_read_gauge( + self, + profile_key: str, + gauge_range: list, + ) -> dict: + """Returns noisy gauge reading dict. + + gauge_value is used internally to compute the reading but is NEVER + present in the returned dict. Callers (main.py) also pop it explicitly + as a second guard. + """ + state = self._state.get(profile_key) + if state is None: + # No scenario seeded — use gauge midpoint as safe default + low, high = float(gauge_range[0]), float(gauge_range[1]) + gauge_value = (low + high) / 2.0 + else: + gauge_value = state.gauge_value + + span = float(gauge_range[1]) - float(gauge_range[0]) + sigma = self._params["gauge_noise_sigma_pct"] * span + noise = self._rng.gauss(0, sigma) + reading = round( + max(float(gauge_range[0]), min(float(gauge_range[1]), gauge_value + noise)), + 3, + ) + + occlusion = self._rng.random() < self._params["occlusion_prob"] + confidence = round( + self._rng.uniform(0.80, 0.99) + if not occlusion + else self._rng.uniform(0.40, 0.65), + 3, + ) + + # CRITICAL: gauge_value is NOT in the returned dict + return {"reading": reading, "confidence": confidence, "occlusion_flag": occlusion} + + def simulate_panel_open(self, panel_stuck_prob: float) -> bool: + """Returns True if panel opened successfully, False if stuck.""" + return self._rng.random() > panel_stuck_prob + + def get_anomaly_state(self, profile_key: str) -> dict: + """Returns current anomaly flags. Anomalies are seeded by generate_scenario().""" + state = self._state.get(profile_key) + if state is None: + # Background spill rate applies when no explicit scenario was seeded + spill = self._rng.random() < self._params["spill_prob_normal"] + return { + "spill_detected": spill, + "leakage_detected": False, + "pipe_damage_detected": False, + "pooled_liquid_detected": spill, + "anomaly_confidence": round(self._rng.uniform(0.60, 0.75), 3) if spill else 0.0, + } + return { + "spill_detected": state.spill_detected, + "leakage_detected": state.leakage_detected, + "pipe_damage_detected": state.pipe_damage_detected, + "pooled_liquid_detected": state.pooled_liquid_detected, + "anomaly_confidence": state.anomaly_confidence, + } diff --git a/src/servers/robot/tests/__init__.py b/src/servers/robot/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/servers/robot/tests/conftest.py b/src/servers/robot/tests/conftest.py new file mode 100644 index 000000000..a5c7d8dbf --- /dev/null +++ b/src/servers/robot/tests/conftest.py @@ -0,0 +1,86 @@ +"""Shared fixtures and helpers for robot MCP server tests.""" + +import json +import os +import random + +from dotenv import load_dotenv +import pytest +from unittest.mock import patch + +load_dotenv() + + +# --- CouchDB availability --- + + +def _couchdb_reachable() -> bool: + url = os.environ.get("COUCHDB_URL") + if not url: + return False + try: + import requests + requests.get(url, timeout=2) + return True + except Exception: + return False + + +requires_couchdb = pytest.mark.skipif( + not _couchdb_reachable(), + reason="CouchDB not reachable (set COUCHDB_URL and ensure CouchDB is running)", +) + + +# --- Fixtures --- + + +@pytest.fixture +def simulator(): + """Fresh PhysicalStateSimulator with seed=42.""" + from servers.robot.simulator import PhysicalStateSimulator + return PhysicalStateSimulator(seed=42) + + +@pytest.fixture(autouse=True) +def reset_simulator_rng(): + """Reset module-level simulator RNG before every test. + + open_panel() and simulate_read_gauge() consume RNG state; without a reset + the outcome of tests depends on execution order. + """ + import servers.robot.main as robot_main + robot_main._simulator._rng = random.Random(42) + yield + robot_main._simulator._rng = random.Random(42) + + +@pytest.fixture +def mock_db(): + """Patch module-level `db` in robot main with a Mock.""" + import servers.robot.main as robot_main + with patch("servers.robot.main.db") as mock: + yield mock + + +@pytest.fixture +def no_db(): + """Patch module-level `db` to None (simulate disconnected IoT CouchDB).""" + with patch("servers.robot.main.db", None): + yield + + +@pytest.fixture +def no_wo_db(): + """Patch _get_wo_db() to return None (simulate disconnected WO CouchDB).""" + with patch("servers.robot.main._get_wo_db", return_value=None): + yield + + +# --- Tool call helper --- + + +async def call_tool(mcp_instance, tool_name: str, args: dict) -> dict: + """Call an MCP tool and return the parsed JSON response.""" + contents, _ = await mcp_instance.call_tool(tool_name, args) + return json.loads(contents[0].text) diff --git a/src/servers/robot/tests/test_gauge_value_protection.py b/src/servers/robot/tests/test_gauge_value_protection.py new file mode 100644 index 000000000..55ab40e18 --- /dev/null +++ b/src/servers/robot/tests/test_gauge_value_protection.py @@ -0,0 +1,187 @@ +"""Critical invariant: gauge_value must never appear in any tool response. + +9 checks: + 1. navigate_to response + 2. safety_gate_check response + 3. open_panel response + 4. read_gauge response ← most critical + 5. check_human_presence response + 6. commit_reading response (BLOCKED path — no DB write) + 7. commit_reading response (COMMIT path) + 8. check_wo_similarity response + 9. detect_anomaly response +""" + +import pytest +from unittest.mock import MagicMock, patch + +from servers.robot.main import mcp +from .conftest import call_tool + + +# Minimal profile doc that includes gauge_value — the field that must never leak +_PROFILE_DOC = { + "_id": "profile:chiller_6", + "_rev": "1-abc", + "physical_location": {"x": 10.0, "y": 5.0, "z": 0.0, "room_id": "B1"}, + "gauge_range": [0.0, 100.0], + "gauge_value": 75.0, # MUST NOT appear in any tool response + "panel_stuck_prob": 0.0, # force panel open for test repeatability + "human_present": False, + "maintenance_slot": "day", + "active_work_order": None, + "inspection_frequency_days": 7, + "last_inspection": "2024-01-01", + "sensor_type": "pressure", +} + +_IOT_SENSOR_DOC = { + "asset_id": "Chiller 6", + "timestamp": "2024-06-01T00:00:00", + "Chiller 6 Pressure": 75.0, +} + + +def _make_db_mock(): + mock = MagicMock() + mock.get.side_effect = lambda doc_id: ( + _PROFILE_DOC if "profile:" in doc_id else None + ) + mock.find.return_value = {"docs": [_IOT_SENSOR_DOC]} + mock.save.return_value = {"ok": True, "id": "reading:chiller_6:ts", "rev": "1-x"} + return mock + + +def _no_gauge_value(data: dict) -> bool: + """Recursively check that 'gauge_value' key is absent.""" + if "gauge_value" in data: + return False + for v in data.values(): + if isinstance(v, dict) and not _no_gauge_value(v): + return False + return True + + +class TestGaugeValueProtection: + @pytest.mark.anyio + async def test_navigate_to_no_gauge_value(self): + with patch("servers.robot.main.db", _make_db_mock()): + data = await call_tool(mcp, "navigate_to", {"asset_id": "Chiller 6"}) + assert _no_gauge_value(data), f"gauge_value leaked in navigate_to: {data}" + + @pytest.mark.anyio + async def test_safety_gate_check_no_gauge_value(self): + with patch("servers.robot.main.db", _make_db_mock()): + data = await call_tool(mcp, "safety_gate_check", {"asset_id": "Chiller 6"}) + assert _no_gauge_value(data), f"gauge_value leaked in safety_gate_check: {data}" + + @pytest.mark.anyio + async def test_open_panel_no_gauge_value(self): + with patch("servers.robot.main.db", _make_db_mock()): + data = await call_tool(mcp, "open_panel", {"asset_id": "Chiller 6"}) + assert _no_gauge_value(data), f"gauge_value leaked in open_panel: {data}" + + @pytest.mark.anyio + async def test_read_gauge_no_gauge_value(self): + """Most critical check — simulator uses gauge_value internally.""" + import servers.robot.main as robot_main + robot_main._simulator.generate_scenario("chiller_6", [0.0, 100.0], "normal") + with patch("servers.robot.main.db", _make_db_mock()): + data = await call_tool( + mcp, "read_gauge", {"asset_id": "Chiller 6", "attempt_n": 1} + ) + assert _no_gauge_value(data), f"gauge_value leaked in read_gauge: {data}" + assert "reading" in data, "read_gauge must return 'reading' field" + + @pytest.mark.anyio + async def test_check_human_presence_no_gauge_value(self): + with patch("servers.robot.main.db", _make_db_mock()): + data = await call_tool(mcp, "check_human_presence", {"asset_id": "Chiller 6"}) + assert _no_gauge_value(data), f"gauge_value leaked in check_human_presence: {data}" + + @pytest.mark.anyio + async def test_commit_reading_blocked_no_gauge_value(self): + """BLOCKED path (N<3): no DB write, still must not expose gauge_value.""" + with patch("servers.robot.main.db", _make_db_mock()): + data = await call_tool( + mcp, + "commit_reading", + { + "asset_id": "Chiller 6", + "readings": [74.0, 76.0], # only 2 → BLOCKED + "decision": "close_normal", + }, + ) + assert _no_gauge_value(data), f"gauge_value leaked in commit_reading (blocked): {data}" + assert data.get("status") == "BLOCKED" + + @pytest.mark.anyio + async def test_commit_reading_commit_response_no_gauge_value(self): + """COMMIT path: response must not expose gauge_value.""" + mock_db = _make_db_mock() + with patch("servers.robot.main.db", mock_db): + data = await call_tool( + mcp, + "commit_reading", + { + "asset_id": "Chiller 6", + "readings": [73.0, 75.0, 77.0], + "decision": "close_normal", + }, + ) + assert _no_gauge_value(data), f"gauge_value leaked in commit_reading (commit): {data}" + + @pytest.mark.anyio + async def test_commit_doc_written_has_no_gauge_value(self): + """When commit occurs, the doc written to CouchDB must not have gauge_value.""" + mock_db = _make_db_mock() + saved_docs = [] + mock_db.save.side_effect = lambda doc: ( + saved_docs.append(doc) or {"ok": True, "id": doc["_id"], "rev": "1-x"} + ) + + with patch("servers.robot.main.db", mock_db): + await call_tool( + mcp, + "commit_reading", + { + "asset_id": "Chiller 6", + "readings": [73.0, 75.0, 77.0], + "decision": "close_normal", + }, + ) + + # If the verifier scored high enough to COMMIT, saved_docs will have one entry + for doc in saved_docs: + assert "gauge_value" not in doc, ( + f"gauge_value found in committed CouchDB doc: {doc}" + ) + + @pytest.mark.anyio + async def test_check_wo_similarity_no_gauge_value(self): + wo_doc = { + "wonum": "1000045", + "description": "Inspect chiller pressure", + "assetnum": "CHILLER6", + "status": "COMP", + "reportdate": "2024-01-15", + } + mock_wo = MagicMock() + mock_wo.find.return_value = {"docs": [wo_doc]} + with patch("servers.robot.main._get_wo_db", return_value=mock_wo): + with patch("servers.robot.main.db", _make_db_mock()): + data = await call_tool( + mcp, + "check_wo_similarity", + { + "asset_id": "Chiller 6", + "failure_description": "chiller pressure anomaly", + }, + ) + assert _no_gauge_value(data), f"gauge_value leaked in check_wo_similarity: {data}" + + @pytest.mark.anyio + async def test_detect_anomaly_no_gauge_value(self): + with patch("servers.robot.main.db", _make_db_mock()): + data = await call_tool(mcp, "detect_anomaly", {"asset_id": "Chiller 6"}) + assert _no_gauge_value(data), f"gauge_value leaked in detect_anomaly: {data}" diff --git a/src/servers/robot/tests/test_robot_tools.py b/src/servers/robot/tests/test_robot_tools.py new file mode 100644 index 000000000..2f6fc2cfc --- /dev/null +++ b/src/servers/robot/tests/test_robot_tools.py @@ -0,0 +1,471 @@ +"""Integration and unit tests for all 8 Robot MCP server tools. + +Tests marked @requires_couchdb are skipped when CouchDB is unreachable. +All other tests use mocked DB via conftest fixtures. +""" + +import pytest +from unittest.mock import MagicMock, patch + +from servers.robot.main import mcp +from .conftest import call_tool, requires_couchdb + + +# Shared mock profile document +_PROFILE = { + "_id": "profile:chiller_6", + "_rev": "1-abc", + "physical_location": {"x": 10.0, "y": 5.0, "z": 0.0, "room_id": "B1"}, + "gauge_range": [0.0, 100.0], + "gauge_value": 75.0, + "panel_stuck_prob": 0.05, + "human_present": False, + "maintenance_slot": "day", + "active_work_order": None, + "inspection_frequency_days": 7, + "last_inspection": "2024-06-01", + "sensor_type": "pressure", +} + +_PROFILE_HUMAN = {**_PROFILE, "human_present": True} +_PROFILE_STUCK = {**_PROFILE, "panel_stuck_prob": 1.0} # always stuck +_PROFILE_FREE = {**_PROFILE, "panel_stuck_prob": 0.0} # never stuck + +_IOT_DOC = { + "asset_id": "Chiller 6", + "timestamp": "2024-06-01T00:00:00", + "Chiller 6 Pressure": 75.0, +} + + +def _db_for(profile): + mock = MagicMock() + mock.get.side_effect = lambda doc_id: profile if "profile:" in doc_id else None + mock.find.return_value = {"docs": [_IOT_DOC]} + mock.save.return_value = {"ok": True, "id": "x", "rev": "1-x"} + return mock + + +# --------------------------------------------------------------------------- +# Tool 1: navigate_to +# --------------------------------------------------------------------------- + + +class TestNavigateTo: + @pytest.mark.anyio + async def test_returns_success_for_known_asset(self): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool(mcp, "navigate_to", {"asset_id": "Chiller 6"}) + assert data["success"] is True + assert data["distance_m"] > 0 + assert data["steps_taken"] >= 1 + + @pytest.mark.anyio + async def test_blocked_when_no_location(self): + profile_no_loc = {k: v for k, v in _PROFILE.items() if k != "physical_location"} + with patch("servers.robot.main.db", _db_for(profile_no_loc)): + data = await call_tool(mcp, "navigate_to", {"asset_id": "Chiller 6"}) + assert data["success"] is False + assert data["blocked_reason"] is not None + + @pytest.mark.anyio + async def test_error_when_db_none(self, no_db): + data = await call_tool(mcp, "navigate_to", {"asset_id": "Chiller 6"}) + assert "error" in data + + @pytest.mark.anyio + async def test_error_when_profile_not_found(self): + mock = MagicMock() + mock.get.return_value = None + with patch("servers.robot.main.db", mock): + data = await call_tool(mcp, "navigate_to", {"asset_id": "Unknown Asset"}) + assert "error" in data + + @requires_couchdb + @pytest.mark.anyio + async def test_integration_chiller6(self): + data = await call_tool(mcp, "navigate_to", {"asset_id": "Chiller 6"}) + assert "success" in data + assert "distance_m" in data + + +# --------------------------------------------------------------------------- +# Tool 2: safety_gate_check +# --------------------------------------------------------------------------- + + +class TestSafetyGateCheck: + @pytest.mark.anyio + async def test_clearance_true_when_no_human_no_wo(self): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool(mcp, "safety_gate_check", {"asset_id": "Chiller 6"}) + assert data["safety_clearance"] is True + assert data["human_present"] is False + assert data["active_work_order"] is None + + @pytest.mark.anyio + async def test_clearance_false_when_human_present(self): + with patch("servers.robot.main.db", _db_for(_PROFILE_HUMAN)): + data = await call_tool(mcp, "safety_gate_check", {"asset_id": "Chiller 6"}) + assert data["safety_clearance"] is False + assert data["human_present"] is True + + @pytest.mark.anyio + async def test_missing_slot_defaults_to_day(self): + profile_no_slot = {k: v for k, v in _PROFILE.items() if k != "maintenance_slot"} + with patch("servers.robot.main.db", _db_for(profile_no_slot)): + data = await call_tool(mcp, "safety_gate_check", {"asset_id": "Chiller 6"}) + assert data["slot"] == "day" + + @pytest.mark.anyio + async def test_missing_active_wo_defaults_to_none(self): + profile_no_wo = {k: v for k, v in _PROFILE.items() if k != "active_work_order"} + with patch("servers.robot.main.db", _db_for(profile_no_wo)): + data = await call_tool(mcp, "safety_gate_check", {"asset_id": "Chiller 6"}) + assert data["active_work_order"] is None + + @pytest.mark.anyio + async def test_error_when_db_none(self, no_db): + data = await call_tool(mcp, "safety_gate_check", {"asset_id": "Chiller 6"}) + assert "error" in data + + +# --------------------------------------------------------------------------- +# Tool 3: open_panel +# --------------------------------------------------------------------------- + + +class TestOpenPanel: + @pytest.mark.anyio + async def test_panel_opens_with_zero_stuck_prob(self): + with patch("servers.robot.main.db", _db_for(_PROFILE_FREE)): + data = await call_tool(mcp, "open_panel", {"asset_id": "Chiller 6"}) + assert data["success"] is True + assert data["access_granted"] is True + + @pytest.mark.anyio + async def test_panel_stuck_with_certain_prob(self): + with patch("servers.robot.main.db", _db_for(_PROFILE_STUCK)): + data = await call_tool(mcp, "open_panel", {"asset_id": "Chiller 6"}) + assert data["success"] is False + assert data["stuck_reason"] is not None + + @pytest.mark.anyio + async def test_rng_deterministic_after_reset(self): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + result1 = await call_tool(mcp, "open_panel", {"asset_id": "Chiller 6"}) + + import random + import servers.robot.main as robot_main + robot_main._simulator._rng = random.Random(42) + + with patch("servers.robot.main.db", _db_for(_PROFILE)): + result2 = await call_tool(mcp, "open_panel", {"asset_id": "Chiller 6"}) + + assert result1["success"] == result2["success"] + assert result1["access_granted"] == result2["access_granted"] + + @pytest.mark.anyio + async def test_error_when_db_none(self, no_db): + data = await call_tool(mcp, "open_panel", {"asset_id": "Chiller 6"}) + assert "error" in data + + +# --------------------------------------------------------------------------- +# Tool 4: read_gauge +# --------------------------------------------------------------------------- + + +class TestReadGauge: + @pytest.mark.anyio + async def test_reading_within_range(self): + import servers.robot.main as robot_main + robot_main._simulator.generate_scenario("chiller_6", [0.0, 100.0], "normal") + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, "read_gauge", {"asset_id": "Chiller 6", "attempt_n": 1} + ) + assert 0.0 <= data["reading"] <= 100.0 + assert 0.0 <= data["confidence"] <= 1.0 + + @pytest.mark.anyio + async def test_no_gauge_value_in_response(self): + import servers.robot.main as robot_main + robot_main._simulator.generate_scenario("chiller_6", [0.0, 100.0], "normal") + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, "read_gauge", {"asset_id": "Chiller 6", "attempt_n": 1} + ) + assert "gauge_value" not in data + + @pytest.mark.anyio + async def test_attempt_n_reflected_in_response(self): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, "read_gauge", {"asset_id": "Chiller 6", "attempt_n": 3} + ) + assert data["attempt_n"] == 3 + + @pytest.mark.anyio + async def test_gauge_range_present(self): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, "read_gauge", {"asset_id": "Chiller 6", "attempt_n": 1} + ) + assert "gauge_range" in data + + @pytest.mark.anyio + async def test_error_when_db_none(self, no_db): + data = await call_tool( + mcp, "read_gauge", {"asset_id": "Chiller 6", "attempt_n": 1} + ) + assert "error" in data + + +# --------------------------------------------------------------------------- +# Tool 5: check_human_presence +# --------------------------------------------------------------------------- + + +class TestCheckHumanPresence: + @pytest.mark.anyio + async def test_no_human(self): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, "check_human_presence", {"asset_id": "Chiller 6"} + ) + assert data["human_present"] is False + + @pytest.mark.anyio + async def test_human_present(self): + with patch("servers.robot.main.db", _db_for(_PROFILE_HUMAN)): + data = await call_tool( + mcp, "check_human_presence", {"asset_id": "Chiller 6"} + ) + assert data["human_present"] is True + + @pytest.mark.anyio + async def test_slot_default(self): + profile_no_slot = {k: v for k, v in _PROFILE.items() if k != "maintenance_slot"} + with patch("servers.robot.main.db", _db_for(profile_no_slot)): + data = await call_tool( + mcp, "check_human_presence", {"asset_id": "Chiller 6"} + ) + assert data["slot"] == "day" + + @pytest.mark.anyio + async def test_error_when_db_none(self, no_db): + data = await call_tool( + mcp, "check_human_presence", {"asset_id": "Chiller 6"} + ) + assert "error" in data + + +# --------------------------------------------------------------------------- +# Tool 6: commit_reading +# --------------------------------------------------------------------------- + + +class TestCommitReading: + @pytest.mark.anyio + async def test_blocked_when_n_lt_3(self): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, + "commit_reading", + { + "asset_id": "Chiller 6", + "readings": [74.0, 76.0], + "decision": "close_normal", + }, + ) + assert data["status"] == "BLOCKED" + assert data["fm_flag"] == "FM-7b" + + @pytest.mark.anyio + async def test_status_field_present(self): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, + "commit_reading", + { + "asset_id": "Chiller 6", + "readings": [73.0, 75.0, 77.0], + "decision": "close_normal", + }, + ) + assert data["status"] in {"COMMIT", "BLOCKED", "ESCALATE", "OOD_FLAG", "PANEL_RECHECK"} + + @pytest.mark.anyio + async def test_score_present(self): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, + "commit_reading", + { + "asset_id": "Chiller 6", + "readings": [73.0, 75.0, 77.0], + "decision": "close_normal", + }, + ) + assert "score" in data + assert isinstance(data["score"], float) + + @pytest.mark.anyio + async def test_error_when_db_none(self, no_db): + data = await call_tool( + mcp, + "commit_reading", + { + "asset_id": "Chiller 6", + "readings": [73.0, 75.0, 77.0], + "decision": "close_normal", + }, + ) + assert "error" in data + + @requires_couchdb + @pytest.mark.anyio + async def test_integration_commit_or_escalate(self): + data = await call_tool( + mcp, + "commit_reading", + { + "asset_id": "Chiller 6", + "readings": [49.5, 50.0, 50.5], + "decision": "close_normal", + }, + ) + assert data["status"] in {"COMMIT", "ESCALATE", "OOD_FLAG", "PANEL_RECHECK"} + + +# --------------------------------------------------------------------------- +# Tool 7: check_wo_similarity +# --------------------------------------------------------------------------- + + +class TestCheckWoSimilarity: + def _wo_mock(self, docs): + mock = MagicMock() + mock.find.return_value = {"docs": docs} + return mock + + @pytest.mark.anyio + async def test_error_when_wo_db_none(self, no_wo_db): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, + "check_wo_similarity", + { + "asset_id": "Chiller 6", + "failure_description": "water leak near chiller", + }, + ) + assert "error" in data + + @pytest.mark.anyio + async def test_proceed_when_no_similar_wos(self): + wo_doc = { + "wonum": "1000045", + "description": "completely unrelated electrical work", + "assetnum": "CHILLER6", + "status": "COMP", + } + with patch("servers.robot.main._get_wo_db", return_value=self._wo_mock([wo_doc])): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, + "check_wo_similarity", + { + "asset_id": "Chiller 6", + "failure_description": "water leak near chiller", + }, + ) + assert data["recommendation"] in {"proceed", "review", "consolidate"} + assert "similar_wos" in data + assert "scores" in data + + @pytest.mark.anyio + async def test_consolidate_for_identical_description(self): + wo_doc = { + "wonum": "1000046", + "description": "water leak near chiller unit", + "assetnum": "CHILLER6", + "status": "WAPPR", + } + with patch("servers.robot.main._get_wo_db", return_value=self._wo_mock([wo_doc])): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, + "check_wo_similarity", + { + "asset_id": "Chiller 6", + "failure_description": "water leak near chiller unit", + }, + ) + assert data["recommendation"] == "consolidate" + assert data["duplicate_risk"] is True + + @requires_couchdb + @pytest.mark.anyio + async def test_integration_wo_similarity(self): + data = await call_tool( + mcp, + "check_wo_similarity", + { + "asset_id": "Chiller 6", + "failure_description": "anomaly on chiller condenser", + }, + ) + assert "recommendation" in data + assert data["recommendation"] in {"proceed", "review", "consolidate"} + + +# --------------------------------------------------------------------------- +# Tool 8: detect_anomaly +# --------------------------------------------------------------------------- + + +class TestDetectAnomaly: + @pytest.mark.anyio + async def test_returns_all_expected_fields(self): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool(mcp, "detect_anomaly", {"asset_id": "Chiller 6"}) + for field in ( + "spill_detected", + "leakage_detected", + "pipe_damage_detected", + "pooled_liquid_detected", + "anomaly_confidence", + ): + assert field in data, f"Missing field '{field}' in detect_anomaly response" + + @pytest.mark.anyio + async def test_spill_scenario_detected(self): + import servers.robot.main as robot_main + robot_main._simulator.generate_scenario("chiller_6", [0.0, 100.0], "spill") + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool(mcp, "detect_anomaly", {"asset_id": "Chiller 6"}) + assert data["spill_detected"] is True + assert data["leakage_detected"] is True + + @pytest.mark.anyio + async def test_normal_scenario_no_spill(self): + import servers.robot.main as robot_main + robot_main._simulator.generate_scenario("chiller_6", [0.0, 100.0], "normal") + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool(mcp, "detect_anomaly", {"asset_id": "Chiller 6"}) + assert data["spill_detected"] is False + assert data["leakage_detected"] is False + + @pytest.mark.anyio + async def test_error_when_db_none(self, no_db): + data = await call_tool(mcp, "detect_anomaly", {"asset_id": "Chiller 6"}) + assert "error" in data + + @requires_couchdb + @pytest.mark.anyio + async def test_integration_anomaly_fields(self): + data = await call_tool(mcp, "detect_anomaly", {"asset_id": "Chiller 6"}) + assert "spill_detected" in data + assert isinstance(data["anomaly_confidence"], float) diff --git a/src/servers/robot/tests/test_verifier.py b/src/servers/robot/tests/test_verifier.py new file mode 100644 index 000000000..32437a1e4 --- /dev/null +++ b/src/servers/robot/tests/test_verifier.py @@ -0,0 +1,135 @@ +"""Unit tests for MultiReadingVerifier — no CouchDB required.""" + +import pytest +from servers.robot.verifier import MultiReadingVerifier, VerifierResult + + +@pytest.fixture +def verifier(): + return MultiReadingVerifier() + + +class TestBlockedGate: + def test_blocked_n_1(self, verifier): + result = verifier.verify( + readings=[50.0], + iot_value=50.0, + gauge_range=[0, 100], + ) + assert result.status == "BLOCKED" + assert result.fm_flag == "FM-7b" + assert result.score == 0.0 + + def test_blocked_n_2(self, verifier): + result = verifier.verify( + readings=[48.0, 52.0], + iot_value=50.0, + gauge_range=[0, 100], + ) + assert result.status == "BLOCKED" + assert result.fm_flag == "FM-7b" + + def test_fm7b_flag_in_blocked(self, verifier): + result = verifier.verify(readings=[], iot_value=0.0, gauge_range=[0, 100]) + assert result.fm_flag == "FM-7b" + # insufficient readings flag appears in fm_annotations + assert any("FM-7b" in ann for ann in result.fm_annotations) + + +class TestFreezeGate: + def test_zero_variance_triggers_panel_recheck(self, verifier): + # Three identical readings → std=0.0 → PANEL_RECHECK + result = verifier.verify( + readings=[50.0, 50.0, 50.0], + iot_value=50.0, + gauge_range=[0, 100], + ) + assert result.status == "PANEL_RECHECK" + assert result.fm_flag == "FM-1" + + def test_tiny_variance_still_freezes(self, verifier): + # std = 0.0001 < 0.001 * 100 = 0.1 → PANEL_RECHECK + result = verifier.verify( + readings=[50.0, 50.0001, 50.0002], + iot_value=50.0, + gauge_range=[0, 100], + ) + assert result.status == "PANEL_RECHECK" + + +class TestCommit: + def test_commit_consistent_readings(self, verifier): + # Tight cluster near IoT value → high C, high A → COMMIT + result = verifier.verify( + readings=[49.5, 50.0, 50.5], + iot_value=50.0, + gauge_range=[0, 100], + ) + assert result.status == "COMMIT" + assert result.score >= 0.82 + + def test_historical_signal_neutral_when_absent(self, verifier): + result = verifier.verify( + readings=[49.5, 50.0, 50.5], + iot_value=50.0, + gauge_range=[0, 100], + historical_baseline=None, + ) + assert result.H == 0.5 + assert result.status == "COMMIT" + + +class TestEscalateAndOOD: + def test_escalate_moderate_contradiction(self, verifier): + # Mean reading 80, IoT says 20 — large gap → low A + result = verifier.verify( + readings=[78.0, 80.0, 82.0], + iot_value=20.0, + gauge_range=[0, 100], + ) + # A = 1 - |80 - 20| / 100 = 0.40 → score roughly 0.35*C + 0.35*0.40 + 0.30*H + assert result.status in {"ESCALATE", "OOD_FLAG"} + + def test_ood_very_low_score(self, verifier): + # Readings near top, IoT near bottom, historical near bottom + result = verifier.verify( + readings=[88.0, 90.0, 92.0], + iot_value=5.0, + gauge_range=[0, 100], + historical_baseline=5.0, + ) + # A = 1 - |90-5|/100 = 0.15; H = 1 - |90-5|/100 = 0.15 → score ≈ 0.35*C + 0.35*0.15 + 0.30*0.15 + assert result.status == "OOD_FLAG" + + +class TestFMAnnotations: + def test_fm7_annotation_when_a_lt_015(self, verifier): + # A = 1 - |90 - 3| / 100 = 0.13 < 0.15 → sensor-physical contradiction annotation fires + result = verifier.verify( + readings=[88.0, 90.0, 92.0], + iot_value=3.0, + gauge_range=[0, 100], + ) + assert any("FM-7" in ann for ann in result.fm_annotations) + + def test_fm7c_annotation_when_h_lt_015(self, verifier): + # H = 1 - |90 - 3| / 100 = 0.13 < 0.15 → historical outlier annotation fires; IoT agrees (A fine) + result = verifier.verify( + readings=[88.0, 90.0, 92.0], + iot_value=90.0, + gauge_range=[0, 100], + historical_baseline=3.0, + ) + assert any("FM-7c" in ann for ann in result.fm_annotations) + + def test_no_fm_annotations_for_clean_commit(self, verifier): + result = verifier.verify( + readings=[49.5, 50.0, 50.5], + iot_value=50.0, + gauge_range=[0, 100], + historical_baseline=50.0, + ) + assert result.fm_annotations == [] + assert result.status == "COMMIT" + + diff --git a/src/servers/robot/verifier.py b/src/servers/robot/verifier.py new file mode 100644 index 000000000..afe98a1e1 --- /dev/null +++ b/src/servers/robot/verifier.py @@ -0,0 +1,133 @@ +"""MultiReadingVerifier — CP-calibrated commit gate. + +Formula (updated from research analysis): + score = 0.35*C + 0.35*A + 0.30*H + + C: multi-read consistency = 1 - std(readings) / gauge_range_span + A: gauge-vs-IoT agreement = 1 - |mean(readings) - iot_value| / span + H: historical consistency = 1 - |mean(readings) - hist_baseline| / span + (H = 0.50 neutral when no history is available) + +Hard gates (applied before scoring): + N < 3 → BLOCKED (FM-7b) + std(readings) < 0.1% range → PANEL_RECHECK (FM-1: sensor freeze) + +Thresholds (placeholder until field-visit calibration): + score ≥ 0.82 → COMMIT + score ≥ 0.65 → ESCALATE + else → OOD_FLAG + +Note on Q (agent self-confidence): + Q was removed from the formula because LLM self-confidence is poorly + calibrated and creating a gaming surface. Q is recorded diagnostically + in commit documents but does not influence the gate. +""" + +import statistics +from dataclasses import dataclass, field +from typing import Optional + + +@dataclass +class VerifierResult: + status: str # COMMIT | BLOCKED | ESCALATE | OOD_FLAG | PANEL_RECHECK + score: float + C: float + A: float + H: float + fm_flag: Optional[str] + fm_annotations: list + reason: str + + +class MultiReadingVerifier: + TAU_COMMIT = 0.82 + TAU_ESCALATE = 0.65 + FREEZE_EPSILON = 0.001 # std < 0.1% of range → sensor freeze + + def verify( + self, + readings: list, + iot_value: float, + gauge_range: list, + historical_baseline: Optional[float] = None, + ) -> VerifierResult: + range_span = float(gauge_range[1]) - float(gauge_range[0]) + if range_span <= 0: + range_span = 1.0 + + # Hard gate: insufficient readings + if len(readings) < 3: + return VerifierResult( + status="BLOCKED", + score=0.0, + C=0.0, A=0.0, H=0.5, + fm_flag="FM-7b", + fm_annotations=["FM-7b: fewer than 3 readings before commit"], + reason=( + f"N={len(readings)} readings supplied; " + "minimum 3 required before commit" + ), + ) + + # Hard gate: sensor freeze (zero variance → stuck gauge or panel) + std_val = statistics.stdev(readings) if len(readings) > 1 else 0.0 + if std_val < self.FREEZE_EPSILON * range_span: + return VerifierResult( + status="PANEL_RECHECK", + score=0.0, + C=0.0, A=0.0, H=0.5, + fm_flag="FM-1", + fm_annotations=[ + "FM-1: sensor freeze — readings show no variance across attempts" + ], + reason="Zero variance across readings; panel may be stuck or gauge frozen", + ) + + mean_r = statistics.mean(readings) + + C = max(0.0, 1.0 - std_val / range_span) + A = max(0.0, 1.0 - abs(mean_r - iot_value) / range_span) + H = ( + max(0.0, 1.0 - abs(mean_r - historical_baseline) / range_span) + if historical_baseline is not None + else 0.5 + ) + + score = round(0.35 * C + 0.35 * A + 0.30 * H, 4) + + fm_annotations = [] + if A < 0.15: + fm_annotations.append( + "FM-7: sensor-physical contradiction (reading vs IoT sensor)" + ) + if historical_baseline is not None and H < 0.15: + fm_annotations.append( + "FM-7c: historical outlier (reading contradicts asset baseline)" + ) + + fm_flag = fm_annotations[0].split(":")[0] if fm_annotations else None + + if score >= self.TAU_COMMIT: + status = "COMMIT" + elif score >= self.TAU_ESCALATE: + status = "ESCALATE" + else: + status = "OOD_FLAG" + if not fm_flag: + fm_flag = "FM-7" + + reason = f"score={score:.3f} (C={C:.2f}, A={A:.2f}, H={H:.2f})" + if fm_annotations: + reason += "; " + "; ".join(fm_annotations) + + return VerifierResult( + status=status, + score=score, + C=round(C, 4), + A=round(A, 4), + H=round(H, 4), + fm_flag=fm_flag, + fm_annotations=fm_annotations, + reason=reason, + )