diff --git a/pyproject.toml b/pyproject.toml index 06be74454..780eeb31a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ fmsr-mcp-server = "servers.fmsr.main:main" tsfm-mcp-server = "servers.tsfm.main:main" wo-mcp-server = "servers.wo.main:main" vibration-mcp-server = "servers.vibration.main:main" +robot-mcp-server = "servers.robot.main:main" openai-agent = "agent.openai_agent.cli:main" deep-agent = "agent.deep_agent.cli:main" stirrup-agent = "agent.stirrup_agent.cli:main" diff --git a/src/agent/runner.py b/src/agent/runner.py index 1c06ec601..d473e2ad8 100644 --- a/src/agent/runner.py +++ b/src/agent/runner.py @@ -20,6 +20,7 @@ "tsfm": "tsfm-mcp-server", "wo": "wo-mcp-server", "vibration": "vibration-mcp-server", + "robot": "robot-mcp-server", } diff --git a/src/couchdb/init_data.py b/src/couchdb/init_data.py index 86a73bc9c..8927a99e5 100644 --- a/src/couchdb/init_data.py +++ b/src/couchdb/init_data.py @@ -35,52 +35,37 @@ _HERE = os.path.dirname(os.path.abspath(__file__)) -SCENARIOS_DATA_DIR = os.environ.get("SCENARIOS_DATA_DIR", os.path.join(_HERE, "scenarios_data")) +SCENARIOS_DATA_DIR = os.environ.get("WO_SCENARIOS_DATA_DIR", os.path.join(_HERE, "scenarios_data")) DEFAULT_MANIFEST_FILE = os.environ.get( - "DEFAULT_MANIFEST", os.path.join(SCENARIOS_DATA_DIR, "default", "manifest.json")) + "WO_DEFAULT_MANIFEST", os.path.join(SCENARIOS_DATA_DIR, "default.json")) # --------------------------------------------------------------------------- # # Scenario → manifest (filename convention, no scenarios .jsonl needed) # --------------------------------------------------------------------------- # -def _load_default_manifest() -> tuple: - """Return (manifest, base_dir) for the default manifest (scenarios_data/default/manifest.json).""" +def _load_default_manifest() -> dict: if not os.path.isfile(DEFAULT_MANIFEST_FILE): raise FileNotFoundError( f"default manifest not found: {DEFAULT_MANIFEST_FILE}. " - "Create scenarios_data/default/manifest.json (or set DEFAULT_MANIFEST).") + "Create scenarios_data/default.json (or set WO_DEFAULT_MANIFEST).") with open(DEFAULT_MANIFEST_FILE) as f: - return json.load(f), os.path.dirname(DEFAULT_MANIFEST_FILE) + return json.load(f) def manifest_path(scenario_id) -> str: - folder = os.path.join(SCENARIOS_DATA_DIR, f"scenario_{scenario_id}", "manifest.json") - return folder if os.path.isfile(folder) else os.path.join(SCENARIOS_DATA_DIR, f"scenario_{scenario_id}.json") + return os.path.join(SCENARIOS_DATA_DIR, f"scenario_{scenario_id}.json") -def _resolve_manifest(scenario_id) -> tuple: - """Return (manifest, base_dir). - - scenario_id None → the default manifest. Otherwise the scenario's own manifest - (folder form scenario_/manifest.json, or legacy flat scenario_.json), - raising FileNotFoundError if it doesn't exist — an unknown id is never silently - treated as the default. - - base_dir is the manifest's own folder; the loader resolves its relative data paths - against that folder, then its parent (so a sibling shared/ corpus is reachable), then - the couchdb dir. None → couchdb dir only (legacy flat manifests, unchanged). - """ +def _resolve_manifest(scenario_id) -> dict: + """scenarios_data/scenario_.json if present, else the default manifest.""" if scenario_id is None: return _load_default_manifest() path = manifest_path(scenario_id) - if not os.path.isfile(path): - raise FileNotFoundError( - f"no manifest for scenario {scenario_id}: expected " - f"{os.path.join(SCENARIOS_DATA_DIR, f'scenario_{scenario_id}', 'manifest.json')} " - f"or {os.path.join(SCENARIOS_DATA_DIR, f'scenario_{scenario_id}.json')}") - base_dir = os.path.dirname(path) if os.path.basename(path) == "manifest.json" else None - with open(path) as f: - return json.load(f), base_dir + if os.path.isfile(path): + with open(path) as f: + return json.load(f) + logger.info("No manifest %s — using default.", os.path.basename(path)) + return _load_default_manifest() # --------------------------------------------------------------------------- # @@ -89,8 +74,7 @@ def _resolve_manifest(scenario_id) -> tuple: def all_databases() -> list: """The databases this loader manages = the default manifest's keys (db name = key).""" try: - manifest, _ = _load_default_manifest() - return list(manifest.keys()) + return list(_load_default_manifest().keys()) except Exception: return [] @@ -117,16 +101,15 @@ def init_data(scenario_id=None, force: bool = True, reset_first: bool = False, managed_only: bool = False) -> dict: """Load a scenario's data (or the default) into CouchDB. Returns {collection: (db, n)}. - Resolves the manifest first, so an unknown ``scenario_id`` raises FileNotFoundError - before anything is dropped. ``reset_first=True`` then drops databases so collections - absent from the manifest are left empty rather than carrying over. + ``reset_first=True`` drops databases first so collections absent from the manifest + are left empty rather than carrying over. """ - manifest, base_dir = _resolve_manifest(scenario_id) # validate first (raises on unknown id) if reset_first: reset(managed_only=managed_only) + manifest = _resolve_manifest(scenario_id) results = {} for key, spec in manifest.items(): - results[key] = loader.load_collection(key, spec, drop=force, base_dir=base_dir) # database name = key + results[key] = loader.load_collection(key, spec, drop=force) # database name = key logger.info("Scenario %s: '%s' → %s (%d docs).", scenario_id, key, *results[key]) return results diff --git a/src/couchdb/loader.py b/src/couchdb/loader.py index 22fa1e799..97cef57f0 100644 --- a/src/couchdb/loader.py +++ b/src/couchdb/loader.py @@ -108,27 +108,14 @@ def _parse_file(path, cfg) -> list: return parse_csv(path, cfg) if cfg.get("format") == "csv" else parse_json(path) -def _collect_docs(key, source, cfg, base_dir=None) -> list: - """Resolve a manifest source ("default"/path/dir/list/inline docs) to parsed docs. - - Relative data paths resolve against the scenario folder (``base_dir``), then its parent - (so a sibling ``shared/`` corpus is reachable as ``shared/...``), then ``_HERE``. With - base_dir=None (legacy flat manifests) only ``_HERE`` is used, preserving behaviour. - """ - roots = ([base_dir, os.path.dirname(base_dir)] if base_dir else []) + [_HERE] +def _collect_docs(key, source, cfg) -> list: + """Resolve a manifest source ("default"/path/dir/list/inline docs) to parsed docs.""" ext = ".csv" if cfg.get("format") == "csv" else ".json" - def _resolve(s): - for root in roots: - p = os.path.join(root, s) - if os.path.exists(p): - return p - return os.path.join(roots[0], s) - def files_from(s): if s.strip().lower() == "default": return sorted(glob.glob(os.path.join(SAMPLE_DATA_DIR, key, "*" + ext))) - p = s if os.path.isabs(s) else _resolve(s) + p = s if os.path.isabs(s) else os.path.join(_HERE, s) if os.path.isdir(p): return sorted(glob.glob(os.path.join(p, "*" + ext))) return [p] @@ -252,11 +239,11 @@ def _bulk_insert(db, docs, batch_size=500): # --------------------------------------------------------------------------- # # Public entry point # --------------------------------------------------------------------------- # -def load_collection(key, source, drop=True, base_dir=None) -> tuple: +def load_collection(key, source, drop=True) -> tuple: """Load one collection's data into a database named after the key. Returns (db, n).""" cfg = collection_config(key) transform = _transform_for(key) - docs = [_normalise(d, key, cfg, transform) for d in _collect_docs(key, source, cfg, base_dir)] + docs = [_normalise(d, key, cfg, transform) for d in _collect_docs(key, source, cfg)] db = key if docs: _ensure_db(db, drop=drop) diff --git a/src/couchdb/sample_data/failure_code/failure_code_sample.csv b/src/couchdb/sample_data/failure_code/failure_code_sample.csv new file mode 100644 index 000000000..648a52a0a --- /dev/null +++ b/src/couchdb/sample_data/failure_code/failure_code_sample.csv @@ -0,0 +1,11 @@ +code,description +FC001,equipment does not start +FC002,equipment stops unexpectedly during operation +FC003,abnormal noise during operation +FC004,fluid leak observed +FC005,"excessive vibration, shaking, or instability" +FC006,overheating or high temperature reading +FC007,indicator light does not illuminate +FC008,gauge does not operate or is inaccurate +FC009,structural damage or cracking +FC010,part missing or loose diff --git a/src/couchdb/sample_data/work_order/workorders.csv b/src/couchdb/sample_data/work_order/workorders.csv new file mode 100644 index 000000000..843d93371 --- /dev/null +++ b/src/couchdb/sample_data/work_order/workorders.csv @@ -0,0 +1,4 @@ +wonum,siteid,description,description_longdescription,status,wopriority,worktype,assetnum,location,failurecode,parent,taskid,lead,reportedby,reportdate,schedstart,schedfinish,targstartdate,targcompdate,actfinish,estlabhrs,actlabhrs,estlabcost,actlabcost,estmatcost,actmatcost,estservcost,actservcost,esttoolcost,acttoolcost,estatapprtotalcost,esttotalcost,acttotalcost,wplabor,aob_asset_class,aob_source.agent,aob_source.trigger_type,aob_source.scenario_id,aob_source.utterance,aob_source.run_id,aob_source.evidence.sensor,aob_source.evidence.metric,aob_source.evidence.anomaly_score,aob_source.evidence.threshold,aob_source.evidence.observed_value,aob_source.evidence.failure_mode +1000045,MAIN,Investigate anomaly on Chiller 6 condenser water flow,,WAPPR,2,PdM,CHILLER6,MAIN-MECH-CH6,,,,,AGENT.TSFM,2020-04-28T09:15:00+00:00,,,2020-04-29T08:00:00+00:00,2020-04-30T17:00:00+00:00,,4.0,,,,,,,,,,,,,"[{""laborcode"":""HVACTECH1"",""craft"":""HVAC"",""laborhrs"":4.0,""startdate"":""2020-04-29T08:00:00+00:00""}]",Chiller,tsfm,anomaly_detection,WO-CHILLER6-ANOMALY-001,Generate a work order for Chiller 6 anomaly detection,run_2026-06-04_meta,Chiller 6 Condenser Water Flow,Condenser Water Flow,0.94,0.8,412.0, +1000046,MAIN,Bearing wear failure mode on Pump 3 - corrective action,,APPR,1,CM,PUMP3,MAIN-PUMPHOUSE,BEARING-WEAR,,,,AGENT.FMSR,2020-05-02T10:45:00+00:00,,,,2020-05-03T12:00:00+00:00,,6.0,,,,320.0,,,,,,,,,"[{""laborcode"":""MECHTECH2"",""craft"":""MECH"",""laborhrs"":6.0,""startdate"":""2020-05-03T07:00:00+00:00""}]",Pump,fmsr,failure_mode,WO-PUMP3-FMSR-002,Create a work order for the failure mode detected on Pump 3,,Pump 3 Vibration,,,,,Bearing Wear +1000050,NORTH,Quarterly preventive maintenance - AHU 2,,COMP,3,PM,AHU2,NORTH-ROOF,,,,,PLANNER1,2020-06-01T08:00:00+00:00,2020-06-15T08:00:00+00:00,2020-06-15T17:00:00+00:00,,,2020-06-15T16:30:00+00:00,8.0,7.5,,562.5,,95.0,,,,,,,657.5,"[{""laborcode"":""HVACTECH3"",""craft"":""HVAC"",""laborhrs"":8.0,""startdate"":""2020-06-15T08:00:00+00:00""}]",AHU,human,manual,WO-AHU2-PM-003,Schedule the quarterly PM for AHU 2,,,,,,, diff --git a/src/couchdb/scenarios_data/default.json b/src/couchdb/scenarios_data/default.json new file mode 100644 index 000000000..50d6973e7 --- /dev/null +++ b/src/couchdb/scenarios_data/default.json @@ -0,0 +1,10 @@ +{ + "workorder": "sample_data/work_order/workorders.csv", + "iot": [ + "sample_data/iot/chiller_6.json", + "sample_data/iot/metro_pump_1.json", + "sample_data/iot/hydraulic_pump_1.json" + ], + "vibration": "sample_data/iot/motor_01.json", + "failurecode": "sample_data/failure_code/failure_code_sample.csv" +} \ No newline at end of file diff --git a/src/couchdb/scenarios_data/scenario_1.json b/src/couchdb/scenarios_data/scenario_1.json new file mode 100644 index 000000000..714325e39 --- /dev/null +++ b/src/couchdb/scenarios_data/scenario_1.json @@ -0,0 +1,7 @@ +{ + "workorder": "sample_data/work_order/workorders.csv", + "iot": [ + "sample_data/iot/chiller_6.json", + "sample_data/iot/motor_01.json" + ] +} \ No newline at end of file diff --git a/src/scenarios/huggingface/readme.md b/src/couchdb/scenarios_data/scenario_2.json similarity index 100% rename from src/scenarios/huggingface/readme.md rename to src/couchdb/scenarios_data/scenario_2.json diff --git a/src/scenarios/local/readme.md b/src/couchdb/scenarios_data/scenario_3.json similarity index 100% rename from src/scenarios/local/readme.md rename to src/couchdb/scenarios_data/scenario_3.json diff --git a/src/couchdb/scenarios_data/scenario_4.json b/src/couchdb/scenarios_data/scenario_4.json new file mode 100644 index 000000000..e69de29bb diff --git a/src/couchdb/scenarios_data/scenario_5.json b/src/couchdb/scenarios_data/scenario_5.json new file mode 100644 index 000000000..e69de29bb diff --git a/src/couchdb/schema_robot_fields.json b/src/couchdb/schema_robot_fields.json new file mode 100644 index 000000000..7b016bf88 --- /dev/null +++ b/src/couchdb/schema_robot_fields.json @@ -0,0 +1,84 @@ +{ + "doc_type": "asset_robot_profile", + "id_pattern": "profile:{normalized_asset_id}", + "description": "Per-asset profile documents for the robot inspection extension. Stored in the iot CouchDB database alongside timestamped sensor readings, but deliberately omit the asset_id field so existing IoT server queries are unaffected.", + "deferred_fields": "See SAFETY_FIELDS_FUTURE.md for hazard_class, maintenance_slot, active_work_order", + + "fields": { + "physical_location": { + "type": "object or null", + "schema": {"x": "float", "y": "float", "z": "float", "room_id": "string"}, + "default": null, + "status": "active", + "purpose": "Robot navigation target for navigate_to() tool. Set from facility floor plan. Null until floor-plan data is loaded." + }, + "gauge_value": { + "type": "float", + "default": 0.0, + "status": "active", + "CRITICAL": "NEVER return this field from any MCP tool to the agent", + "set_by": "PhysicalStateSimulator.generate_scenario()", + "read_by": "Evaluator.get_ground_truth() ONLY", + "purpose": "Ground truth physical gauge reading. Set at scenario generation time. Must never reach the agent — doing so invalidates the evaluation." + }, + "gauge_range": { + "type": "array [float, float]", + "default": [0, 100], + "status": "active", + "purpose": "Min/max of the gauge scale face. Used in PA metric and tau_agreement threshold calculation." + }, + "panel_stuck_prob": { + "type": "float 0-1", + "default": 0.12, + "status": "active", + "purpose": "Probability that the access panel fails to open. SME-calibrated at 0.12 for preprogrammed navigation; 0.24-0.40 for learned navigation routes." + }, + "human_present": { + "type": "bool", + "default": false, + "status": "active", + "purpose": "Whether a technician is currently at this asset. If true, robot dispatch is skipped and an alarm is raised to the person already on-site. Updated by PhysicalStateSimulator per scenario." + }, + "never_read": { + "type": "bool", + "default": false, + "status": "active", + "purpose": "True if no physical gauge reading exists in Maximo history for this asset. Enables the never-read-gauge scenario variant. SME confirmed some gauges have never been recorded." + }, + "real_gauge_images": { + "type": "array of strings", + "default": [], + "status": "active", + "purpose": "File paths to real facility images of this asset's gauge collected during field visits. Used to train and calibrate the CosmosWorld perception layer." + }, + "reading_consistency": { + "type": "float or null", + "default": null, + "status": "active", + "purpose": "Empirical std(readings) / gauge_range computed from field collection data. Used to set tau_consistency threshold for this asset type. Null until field data arrives." + }, + "sensor_physical_gap": { + "type": "float or null", + "default": null, + "status": "active", + "purpose": "Empirical |iot_value - gauge_value| / gauge_range from field collection. Calibrates tau_agreement for this asset. Null until field data arrives." + } + }, + + "indexes": [ + { + "name": "idx_robot_never_read", + "fields": ["doc_type", "never_read"], + "purpose": "Scenario generator lookup for never-read gauge cases" + } + ], + + "known_profiles": [ + {"profile_id": "profile:chiller_6", "display_name": "Chiller 6"}, + {"profile_id": "profile:metro_pump_1", "display_name": "Metro Pump 1"}, + {"profile_id": "profile:hydraulic_pump_1","display_name": "Hydraulic Pump 1"}, + {"profile_id": "profile:motor_01", "display_name": "Motor 01"} + ], + + "isolation_guarantee": "Profile documents have no asset_id field. The IoT server queries {asset_id: {$exists: true}} — these documents are invisible to all existing server queries, including get_asset_list() and get_sensor_list()." +} diff --git a/src/couchdb/seed_robot_profiles.py b/src/couchdb/seed_robot_profiles.py new file mode 100644 index 000000000..841a4a3a9 --- /dev/null +++ b/src/couchdb/seed_robot_profiles.py @@ -0,0 +1,243 @@ +"""Seed robot asset profile documents into the iot CouchDB database. + +Creates one profile document per known asset containing robot-inspection fields +(navigation, gauge truth, calibration, dispatch state). These documents are +deliberately stored WITHOUT an ``asset_id`` field so existing IoT server +queries (``{"asset_id": {"$exists": true}}``) are completely unaffected. + +Document shape: + _id = "profile:{normalized_asset_id}" e.g. "profile:chiller_6" + doc_type = "asset_robot_profile" + display_name = "Chiller 6" original asset_id string + + 9 robot fields (see ROBOT_FIELD_DEFAULTS) + +Usage: + python src/couchdb/seed_robot_profiles.py # apply + python src/couchdb/seed_robot_profiles.py --dry-run # preview only + python src/couchdb/seed_robot_profiles.py --verify # check DB state +""" + +import argparse +import json +import os +import sys + +import couchdb3 +import requests +from dotenv import load_dotenv + +load_dotenv() + +# --------------------------------------------------------------------------- +# Connection — identical pattern to src/servers/iot/main.py +# --------------------------------------------------------------------------- +COUCHDB_URL = os.environ.get("COUCHDB_URL") +COUCHDB_DBNAME = os.environ.get("IOT_DBNAME") +COUCHDB_USERNAME = os.environ.get("COUCHDB_USERNAME") +COUCHDB_PASSWORD = os.environ.get("COUCHDB_PASSWORD") + +# --------------------------------------------------------------------------- +# Robot field defaults (in-scope fields only) +# gauge_value is stored here but MUST NEVER be returned by any MCP tool. +# --------------------------------------------------------------------------- +ROBOT_FIELD_DEFAULTS: dict = { + "physical_location": None, + "gauge_value": 0.0, # ground truth — NEVER expose to agent via MCP + "gauge_range": [0, 100], + "panel_stuck_prob": 0.12, + "human_present": False, + "never_read": False, + "real_gauge_images": [], + "reading_consistency": None, + "sensor_physical_gap": None, +} + +ROBOT_FIELDS = list(ROBOT_FIELD_DEFAULTS.keys()) + +# --------------------------------------------------------------------------- +# Known assets — derived from sample_data/iot/*.json +# physical_location values are placeholder coordinates until floor-plan data arrives. +# --------------------------------------------------------------------------- +ASSETS = [ + { + "display_name": "Chiller 6", + "profile_id": "profile:chiller_6", + "physical_location": {"x": 52.3, "y": 18.1, "z": 0.0, "room_id": "cooling_3B"}, + "gauge_range": [0, 100], + }, + { + "display_name": "Metro Pump 1", + "profile_id": "profile:metro_pump_1", + "physical_location": {"x": 14.0, "y": 32.5, "z": 0.0, "room_id": "pump_room_A"}, + "gauge_range": [0, 200], + }, + { + "display_name": "Hydraulic Pump 1", + "profile_id": "profile:hydraulic_pump_1", + "physical_location": {"x": 28.7, "y": 11.0, "z": 0.0, "room_id": "pump_room_B"}, + "gauge_range": [0, 350], + }, + { + "display_name": "Motor 01", + "profile_id": "profile:motor_01", + "physical_location": {"x": 7.2, "y": 44.8, "z": 0.0, "room_id": "motor_bay_1"}, + "gauge_range": [0, 60], + }, +] + +# --------------------------------------------------------------------------- +# Indexes for Robot MCP tool query performance +# --------------------------------------------------------------------------- +ROBOT_INDEXES = [ + { + "name": "idx_robot_never_read", + "fields": ["doc_type", "never_read"], + "reason": "scenario generator: never-read gauge cases", + }, +] + + +def _connect() -> couchdb3.Database: + if not COUCHDB_URL or not COUCHDB_DBNAME: + sys.exit("ERROR: COUCHDB_URL and IOT_DBNAME must be set.") + return couchdb3.Database( + COUCHDB_DBNAME, + url=COUCHDB_URL, + user=COUCHDB_USERNAME, + password=COUCHDB_PASSWORD, + ) + + +def _build_doc(asset: dict) -> dict: + doc = { + "_id": asset["profile_id"], + "doc_type": "asset_robot_profile", + "display_name": asset["display_name"], + } + doc.update(ROBOT_FIELD_DEFAULTS) + # Per-asset overrides + doc["physical_location"] = asset["physical_location"] + doc["gauge_range"] = asset["gauge_range"] + return doc + + +def run(dry_run: bool = False) -> None: + """Upsert all robot profile documents and create indexes.""" + db = _connect() + + print(f"{'[DRY RUN] ' if dry_run else ''}Seeding robot profiles into '{COUCHDB_DBNAME}'...\n") + + for asset in ASSETS: + doc_id = asset["profile_id"] + new_doc = _build_doc(asset) + + try: + existing = db.get(doc_id) + except Exception: + existing = None + + if existing is None: + action = "CREATE" + final_doc = new_doc + else: + # Patch only fields that are missing (never overwrite existing values) + patched = False + final_doc = dict(existing) + for field in ROBOT_FIELDS: + if field not in final_doc: + final_doc[field] = new_doc[field] + patched = True + action = "PATCH" if patched else "SKIP (already complete)" + + if dry_run: + print(f" [{action}] {doc_id}") + if action != "SKIP (already complete)": + print(f" {json.dumps(new_doc, indent=10)}\n") + else: + if action == "SKIP (already complete)": + print(f" [SKIP] {doc_id} — all robot fields already present") + else: + db.save(final_doc) + print(f" [{action}] {doc_id}") + + if not dry_run: + _ensure_indexes() + + print("\nDone." if not dry_run else "\n[Dry run complete — no writes performed.]") + + +def _ensure_indexes() -> None: + # couchdb3 does not expose create_index; use the HTTP API directly (same + # pattern as src/couchdb/loader.py _create_indexes). + auth = (COUCHDB_USERNAME, COUCHDB_PASSWORD) + base = (COUCHDB_URL or "").rstrip("/") + print("\nCreating indexes...") + for idx in ROBOT_INDEXES: + url = f"{base}/{COUCHDB_DBNAME}/_index" + payload = { + "index": {"fields": idx["fields"]}, + "name": idx["name"], + "type": "json", + } + try: + resp = requests.post(url, json=payload, auth=auth, timeout=10) + resp.raise_for_status() + result = resp.json().get("result", "ok") + print(f" [{result.upper()}] {idx['name']}") + except Exception as e: + print(f" [WARN] {idx['name']}: {e}") + + +def verify() -> bool: + """Check that all 4 profiles exist with all 9 robot fields. Returns True if OK.""" + db = _connect() + print(f"Verifying robot profiles in '{COUCHDB_DBNAME}'...\n") + all_ok = True + + for asset in ASSETS: + doc_id = asset["profile_id"] + try: + doc = db.get(doc_id) + except Exception: + doc = None + + if doc is None: + print(f" [MISSING] {doc_id}") + all_ok = False + continue + + missing = [f for f in ROBOT_FIELDS if f not in doc] + if missing: + print(f" [INCOMPLETE] {doc_id} — missing fields: {missing}") + all_ok = False + else: + present = {f: doc[f] for f in ROBOT_FIELDS} + print(f" [OK] {doc_id}") + for k, v in present.items(): + flag = " *** GROUND TRUTH — never expose ***" if k == "gauge_value" else "" + print(f" {k}: {v}{flag}") + print() + + if all_ok: + print("All profiles verified successfully.") + else: + print("VERIFICATION FAILED — run seed_robot_profiles.py to fix.") + return all_ok + + +def main() -> None: + parser = argparse.ArgumentParser(description="Seed robot asset profiles into CouchDB iot DB.") + group = parser.add_mutually_exclusive_group() + group.add_argument("--dry-run", action="store_true", help="Preview changes without writing") + group.add_argument("--verify", action="store_true", help="Check profiles exist and are complete") + args = parser.parse_args() + + if args.verify: + ok = verify() + sys.exit(0 if ok else 1) + else: + run(dry_run=args.dry_run) + + +if __name__ == "__main__": + main() diff --git a/src/scenarios/local/vibration_utterance.json b/src/scenarios/local/vibration_utterance.json deleted file mode 100644 index 19e8434ac..000000000 --- a/src/scenarios/local/vibration_utterance.json +++ /dev/null @@ -1,170 +0,0 @@ -[ - { - "id": 301, - "type": "Vibration", - "text": "What vibration analysis capabilities are available?", - "category": "Knowledge Query", - "characteristic_form": "The expected response should list: FFT spectrum analysis, envelope analysis for bearing faults, bearing characteristic frequency calculation (BPFO/BPFI/BSF/FTF), ISO 10816 vibration severity assessment, and full automated diagnosis." - }, - { - "id": 302, - "type": "Vibration", - "text": "What bearings are available in the built-in database?", - "category": "Knowledge Query", - "characteristic_form": "The expected response should include bearing designations such as 6205, 6206, 6207, 6208, 6305, 6306, NU205, NU206, 7205 with their geometric parameters (number of balls, ball diameter, pitch diameter)." - }, - { - "id": 303, - "type": "Vibration", - "text": "How does the ISO 10816 vibration severity classification work?", - "category": "Knowledge Query", - "characteristic_form": "The expected response should describe four zones: A (good), B (acceptable), C (barely tolerable), D (not permissible), and four machine groups (group1 through group4) with different velocity thresholds in mm/s." - }, - { - "id": 304, - "type": "Vibration", - "text": "Calculate the bearing characteristic frequencies for a 6205 bearing running at 1800 RPM.", - "category": "Bearing Analysis", - "characteristic_form": "The expected response should provide BPFO, BPFI, BSF, and FTF frequencies in Hz computed from the 6205 geometry (9 balls, ball_dia=7.938 mm, pitch_dia=38.5 mm) at 1800 RPM." - }, - { - "id": 305, - "type": "Vibration", - "text": "Calculate bearing fault frequencies for a bearing with 8 balls, 10.3 mm ball diameter, 46.0 mm pitch diameter, 15 degree contact angle at 3600 RPM.", - "category": "Bearing Analysis", - "characteristic_form": "The expected response should provide the computed BPFO, BPFI, BSF, and FTF values in Hz using the provided custom geometry and RPM." - }, - { - "id": 306, - "type": "Vibration", - "text": "What is the vibration severity classification for a machine with an RMS velocity of 4.5 mm/s? It is a medium-sized machine on rigid foundations.", - "category": "Condition Assessment", - "characteristic_form": "The expected response should classify 4.5 mm/s as ISO 10816 Zone C (Alarm - not suitable for long-term operation) for a group2 machine, with thresholds context (A=1.4, B=2.8, C=7.1 mm/s)." - }, - { - "id": 307, - "type": "Vibration", - "text": "Assess vibration severity for a large machine on flexible foundations with 12.0 mm/s RMS velocity.", - "category": "Condition Assessment", - "characteristic_form": "The expected response should classify 12.0 mm/s under group3 (large machine, flexible foundation) and determine the appropriate ISO 10816 zone." - }, - { - "id": 308, - "type": "Vibration", - "text": "Fetch vibration sensor data from Motor_01, sensor Vibration_X, from 2024-01-15 to 2024-01-15T01:00:00 at site PLANT_A.", - "category": "Data Retrieval", - "characteristic_form": "The expected response should load time-series data from CouchDB for asset 'Motor_01', sensor 'Vibration_X', return a data_id, sample count, duration, and basic statistics (RMS, peak, mean). Note: CouchDB must be populated with vibration time-series data (acceleration in g, sampled at >= 1 kHz) for this scenario to execute." - }, - { - "id": 309, - "type": "Vibration", - "text": "What vibration sensors are available for Motor_01 at site PLANT_A?", - "category": "Data Retrieval", - "characteristic_form": "The expected response should call list_vibration_sensors for asset 'Motor_01' at site PLANT_A and return a list of available sensor field names (e.g., Vibration_X, Vibration_Y, Vibration_Z) as stored in CouchDB. Note: CouchDB must be populated with vibration sensor documents for this scenario to execute." - }, - { - "id": 310, - "type": "Vibration", - "text": "Compute the FFT spectrum for signal 'vib_001' and return the top 5 peak frequencies with amplitudes.", - "category": "Signal Analysis", - "characteristic_form": "The expected response should provide the top 5 frequency peaks with amplitudes, maximum amplitude and its frequency, RMS spectral value, frequency resolution, and total spectral bins." - }, - { - "id": 311, - "type": "Vibration", - "text": "Compute the FFT spectrum of signal 'vib_001' using a Blackman window and return the top 5 peaks.", - "category": "Signal Analysis", - "characteristic_form": "The expected response should compute an FFT with a Blackman window and return the top 5 peak frequencies with amplitudes, confirming the window type used." - }, - { - "id": 312, - "type": "Vibration", - "text": "Compute the envelope spectrum of signal 'vib_001' using a bandpass filter from 500 Hz to 1500 Hz (Hilbert transform) and return the top 5 peaks.", - "category": "Signal Analysis", - "characteristic_form": "The expected response should compute the envelope spectrum using a 500-1500 Hz bandpass filter and Hilbert transform, and return the top 5 peak frequencies in the demodulated spectrum." - }, - { - "id": 313, - "type": "Vibration", - "text": "Run a full vibration diagnosis on signal 'vib_001' at 1800 RPM with bearing 6205.", - "category": "Diagnosis", - "characteristic_form": "The expected response should include: shaft feature extraction (1x, 2x, 3x amplitudes), bearing fault frequency analysis using 6205 geometry, ISO 10816 severity assessment, fault classification (unbalance, misalignment, looseness, bearing), and a markdown diagnostic report." - }, - { - "id": 314, - "type": "Vibration", - "text": "Diagnose signal 'vib_001' at 3600 RPM using custom bearing geometry: 8 balls, 10.3 mm ball diameter, 46 mm pitch diameter.", - "category": "Diagnosis", - "characteristic_form": "The expected response should perform full diagnosis including custom bearing frequency calculation, envelope analysis, shaft features, ISO 10816 assessment, and fault classification with a summary report." - }, - { - "id": 315, - "type": "Vibration", - "text": "Run vibration diagnosis without RPM on signal 'vib_001'.", - "category": "Diagnosis", - "characteristic_form": "The expected response should provide a partial diagnosis with a warning that shaft-frequency analysis was skipped, basic signal statistics (RMS, peak, crest factor, kurtosis), FFT summary, and ISO 10816 severity." - }, - { - "id": 316, - "type": "Vibration", - "text": "Diagnose signal 'vib_001' using bearing fault frequencies directly: BPFO=87.5 Hz, BPFI=132.5 Hz, BSF=57.3 Hz, FTF=9.7 Hz at 1800 RPM.", - "category": "Diagnosis", - "characteristic_form": "The expected response should use the provided fault frequencies directly (rather than computing them), perform envelope analysis, and check for peaks at those specific frequencies and their harmonics." - }, - { - "id": 317, - "type": "Vibration", - "text": "Is unbalance detected in signal 'vib_001'? The machine runs at 1500 RPM.", - "category": "Fault Classification", - "characteristic_form": "The expected response should analyze the 1x shaft frequency (25 Hz) amplitude relative to 2x and 3x harmonics. Unbalance is indicated by a dominant 1x component." - }, - { - "id": 318, - "type": "Vibration", - "text": "Check if there are signs of shaft misalignment in signal 'vib_001'. RPM is 3000.", - "category": "Fault Classification", - "characteristic_form": "The expected response should check if the 2x shaft frequency component (100 Hz) is elevated relative to 1x (50 Hz), which indicates shaft misalignment." - }, - { - "id": 319, - "type": "Vibration", - "text": "Analyze signal 'vib_001' for mechanical looseness at 1800 RPM.", - "category": "Fault Classification", - "characteristic_form": "The expected response should check for sub-harmonic (0.5x) and higher harmonics (3x+) of the shaft frequency, elevated crest factor, and presence of half-shaft-frequency component." - }, - { - "id": 320, - "type": "Vibration", - "text": "Check for outer race bearing fault (BPFO) in signal 'vib_001'. Shaft speed 1800 RPM, bearing model 6205.", - "category": "Fault Classification", - "characteristic_form": "The expected response should compute BPFO from 6205 geometry at 1800 RPM, perform envelope analysis, and check for peaks at BPFO and its harmonics (2x, 3x BPFO)." - }, - { - "id": 321, - "type": "Vibration", - "text": "Motor_01 at site PLANT_A is showing increased noise levels. Retrieve the vibration data for sensor Vibration_X from 2024-01-15 to 2024-01-15T01:00:00 and identify the most likely root cause.", - "category": "Diagnostic", - "characteristic_form": "The expected response should retrieve data via get_vibration_data, run diagnose_vibration or a combination of FFT and fault classification tools, and conclude with the most likely fault type (e.g., unbalance, misalignment, looseness, bearing defect) supported by spectral evidence." - }, - { - "id": 322, - "type": "Vibration", - "text": "A maintenance technician reports that Motor_01 vibrates more than usual. Fetch the Vibration_X sensor data from 2024-01-15 to 2024-01-15T00:30:00 at site PLANT_A, compute the FFT spectrum (top 5 peaks), assess severity, and explain what the dominant frequencies suggest.", - "category": "Diagnostic", - "characteristic_form": "The expected response should chain get_vibration_data, compute_fft_spectrum, and assess_vibration_severity, then reason about the peak frequencies in relation to potential fault mechanisms (shaft speed harmonics, bearing defects, structural resonance)." - }, - { - "id": 323, - "type": "Vibration", - "text": "After diagnosing signal 'vib_001' at 1800 RPM with bearing 6205, the ISO severity is Zone C and outer race fault is suspected. Should we schedule immediate maintenance or can the machine continue operating? Justify the recommendation.", - "category": "Decision Support", - "characteristic_form": "The expected response should reference ISO 10816 Zone C meaning (barely tolerable), the bearing fault evidence, and provide a justified maintenance recommendation. The response should note that ISO 10816 thresholds are conservative and may produce false positives, but remain the accepted standard in commercial condition monitoring systems. The recommendation should balance this conservatism against the specific bearing fault evidence." - }, - { - "id": 324, - "type": "Vibration", - "text": "Compare the vibration condition of two signals: 'vib_001' shows 2.1 mm/s RMS velocity (group2 machine) and 'vib_002' shows 7.8 mm/s RMS velocity (group2 machine). Which machine should be prioritized for maintenance and why?", - "category": "Decision Support", - "characteristic_form": "The expected response should call assess_vibration_severity for both values, compare the ISO zones (A/B vs C/D), and recommend maintenance prioritization with justification based on severity difference and risk." - } - ] diff --git a/src/servers/iot/tests/test_robot_profiles.py b/src/servers/iot/tests/test_robot_profiles.py new file mode 100644 index 000000000..61a529da5 --- /dev/null +++ b/src/servers/iot/tests/test_robot_profiles.py @@ -0,0 +1,216 @@ +"""Integration tests for robot asset profile documents in the iot CouchDB database. + +Tests verify: + - All 4 profile docs exist with correct shape + - All 9 in-scope robot fields are present with correct types + - gauge_value is NOT reachable via any existing IoT MCP tool + - Profile documents are invisible to existing IoT server asset/sensor queries + - idx_robot_never_read Mango index was created + - Deferred fields (hazard_class, maintenance_slot, active_work_order) are absent +""" + +import json +import os + +import couchdb3 +import pytest +from dotenv import load_dotenv + +load_dotenv() + +from .conftest import requires_couchdb + +COUCHDB_URL = os.environ.get("COUCHDB_URL", "") +COUCHDB_HOST = COUCHDB_URL.replace("http://", "").replace("https://", "") +COUCHDB_USERNAME = os.environ.get("COUCHDB_USERNAME", "") +COUCHDB_PASSWORD = os.environ.get("COUCHDB_PASSWORD", "") +COUCHDB_DBNAME = os.environ.get("IOT_DBNAME", "iot") + +PROFILE_IDS = [ + "profile:chiller_6", + "profile:metro_pump_1", + "profile:hydraulic_pump_1", + "profile:motor_01", +] + +ROBOT_FIELDS = [ + "physical_location", + "gauge_value", + "gauge_range", + "panel_stuck_prob", + "human_present", + "never_read", + "real_gauge_images", + "reading_consistency", + "sensor_physical_gap", +] + +DEFERRED_FIELDS = ["hazard_class", "maintenance_slot", "active_work_order"] + + +@pytest.fixture +def raw_db(): + return couchdb3.Server( + f"http://{COUCHDB_HOST}", + user=COUCHDB_USERNAME, + password=COUCHDB_PASSWORD, + )[COUCHDB_DBNAME] + + +# --------------------------------------------------------------------------- +# Profile document shape +# --------------------------------------------------------------------------- + +@requires_couchdb +class TestRobotAssetProfiles: + def test_profiles_exist(self, raw_db): + for pid in PROFILE_IDS: + doc = raw_db.get(pid) + assert doc is not None, f"Profile document missing: {pid}" + + def test_doc_type_set(self, raw_db): + for pid in PROFILE_IDS: + doc = raw_db.get(pid) + assert doc["doc_type"] == "asset_robot_profile", ( + f"{pid}: expected doc_type='asset_robot_profile', got {doc.get('doc_type')}" + ) + + def test_all_9_fields_present(self, raw_db): + for pid in PROFILE_IDS: + doc = raw_db.get(pid) + missing = [f for f in ROBOT_FIELDS if f not in doc] + assert not missing, f"{pid}: missing robot fields: {missing}" + + def test_field_types(self, raw_db): + for pid in PROFILE_IDS: + doc = raw_db.get(pid) + assert isinstance(doc["gauge_value"], float), f"{pid}: gauge_value must be float" + assert isinstance(doc["panel_stuck_prob"], float), f"{pid}: panel_stuck_prob must be float" + assert isinstance(doc["human_present"], bool), f"{pid}: human_present must be bool" + assert isinstance(doc["never_read"], bool), f"{pid}: never_read must be bool" + assert isinstance(doc["real_gauge_images"], list), f"{pid}: real_gauge_images must be list" + assert doc["physical_location"] is None or isinstance(doc["physical_location"], dict), ( + f"{pid}: physical_location must be dict or null" + ) + + def test_gauge_range_is_two_element_list(self, raw_db): + for pid in PROFILE_IDS: + doc = raw_db.get(pid) + gr = doc["gauge_range"] + assert isinstance(gr, list) and len(gr) == 2, ( + f"{pid}: gauge_range must be [min, max], got {gr}" + ) + assert gr[0] < gr[1], f"{pid}: gauge_range[0] must be < gauge_range[1], got {gr}" + + def test_deferred_fields_absent(self, raw_db): + for pid in PROFILE_IDS: + doc = raw_db.get(pid) + present = [f for f in DEFERRED_FIELDS if f in doc] + assert not present, ( + f"{pid}: deferred fields must not be in DB yet: {present}" + ) + + def test_no_asset_id_field(self, raw_db): + """Profiles must not have asset_id — that field is what IoT server queries scan for.""" + for pid in PROFILE_IDS: + doc = raw_db.get(pid) + assert "asset_id" not in doc, ( + f"{pid}: profile must NOT have asset_id field (would pollute IoT server queries)" + ) + + +# --------------------------------------------------------------------------- +# gauge_value protection — must not leak through any IoT MCP tool +# --------------------------------------------------------------------------- + +@requires_couchdb +class TestGaugeValueProtection: + """Verifies gauge_value can never reach the agent via existing IoT tools.""" + + @pytest.mark.anyio + async def test_gauge_value_not_in_sensor_list(self): + from servers.iot.main import mcp, _asset_list_cache, _sensor_list_cache + import servers.iot.main as iot_main + + # Clear caches so real DB is queried + iot_main._asset_list_cache = None + iot_main._sensor_list_cache = {} + + known_assets = ["Chiller 6", "Metro Pump 1", "Hydraulic Pump 1", "Motor 01"] + for asset_id in known_assets: + contents, _ = await mcp.call_tool("sensors", {"site_name": "MAIN", "asset_id": asset_id}) + result = json.loads(contents[0].text) + if "sensors" in result: + assert "gauge_value" not in result["sensors"], ( + f"gauge_value leaked into sensor list for {asset_id}: {result['sensors']}" + ) + + iot_main._asset_list_cache = None + iot_main._sensor_list_cache = {} + + @pytest.mark.anyio + async def test_gauge_value_not_in_history(self): + from servers.iot.main import mcp + import servers.iot.main as iot_main + + iot_main._asset_list_cache = None + iot_main._sensor_list_cache = {} + + contents, _ = await mcp.call_tool("history", { + "site_name": "MAIN", + "asset_id": "Chiller 6", + "start": "2020-06-01T00:00:00", + "final": "2020-06-01T01:00:00", + }) + result = json.loads(contents[0].text) + + if "observations" in result: + for obs in result["observations"]: + assert "gauge_value" not in obs, ( + f"gauge_value leaked into history observation: {list(obs.keys())}" + ) + + iot_main._asset_list_cache = None + iot_main._sensor_list_cache = {} + + @pytest.mark.anyio + async def test_profile_docs_not_in_asset_list(self): + """Profile documents must be invisible to the IoT server asset enumeration.""" + from servers.iot.main import mcp + import servers.iot.main as iot_main + + iot_main._asset_list_cache = None + iot_main._sensor_list_cache = {} + + contents, _ = await mcp.call_tool("assets", {"site_name": "MAIN"}) + result = json.loads(contents[0].text) + + if "assets" in result: + profile_leaks = [a for a in result["assets"] if str(a).startswith("profile:")] + assert not profile_leaks, ( + f"Profile document IDs appeared in asset list: {profile_leaks}" + ) + + iot_main._asset_list_cache = None + iot_main._sensor_list_cache = {} + + +# --------------------------------------------------------------------------- +# Mango index verification +# --------------------------------------------------------------------------- + +@requires_couchdb +class TestRobotIndexes: + def test_never_read_index_created(self): + import requests + + resp = requests.get( + f"http://{COUCHDB_HOST}/{COUCHDB_DBNAME}/_index", + auth=(COUCHDB_USERNAME, COUCHDB_PASSWORD), + ) + assert resp.status_code == 200, f"Could not query _index endpoint: {resp.status_code}" + + index_names = [idx.get("name") for idx in resp.json().get("indexes", [])] + assert "idx_robot_never_read" in index_names, ( + f"idx_robot_never_read not found. Existing indexes: {index_names}" + ) diff --git a/src/servers/robot/__init__.py b/src/servers/robot/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/servers/robot/main.py b/src/servers/robot/main.py new file mode 100644 index 000000000..3d42a7777 --- /dev/null +++ b/src/servers/robot/main.py @@ -0,0 +1,765 @@ +"""Robot MCP Server — 8 tools for autonomous robot inspection. + +Reads from profile:{asset_id} documents in the iot CouchDB database. +Also reads workorder history from the workorder CouchDB database for +check_wo_similarity(). + +Critical invariant: + gauge_value is stored in CouchDB profile docs and used internally + by read_gauge() via the simulator. It is NEVER returned in any + tool response to the agent. + +Tools: + navigate_to — navigate robot to asset location + safety_gate_check — check human presence and work order status + open_panel — attempt to open asset inspection panel + read_gauge — read physical gauge (noisy, occlusion-aware) + check_human_presence — explicit human/slot/WO query + commit_reading — verify readings and commit to CouchDB + check_wo_similarity — find similar past work orders before raising new WO + detect_anomaly — visual anomaly detection (spill, leak, damage) +""" + +import difflib +import logging +import math +import os +import statistics +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Union + +import couchdb3 +import requests +from dotenv import load_dotenv +from mcp.server.fastmcp import FastMCP +from pydantic import BaseModel + +from .simulator import PhysicalStateSimulator +from .verifier import MultiReadingVerifier + +load_dotenv() + +_log_level = getattr( + logging, os.environ.get("LOG_LEVEL", "WARNING").upper(), logging.WARNING +) +logging.basicConfig(level=_log_level) +logger = logging.getLogger("robot-mcp-server") + +# --------------------------------------------------------------------------- +# CouchDB connections +# --------------------------------------------------------------------------- + +COUCHDB_URL = os.environ.get("COUCHDB_URL") +COUCHDB_USERNAME = os.environ.get("COUCHDB_USERNAME") +COUCHDB_PASSWORD = os.environ.get("COUCHDB_PASSWORD") +IOT_DBNAME = os.environ.get("IOT_DBNAME", "iot") +WO_DBNAME = os.environ.get("WO_DBNAME", "workorder") + +try: + db = couchdb3.Database( + IOT_DBNAME, + url=COUCHDB_URL, + user=COUCHDB_USERNAME, + password=COUCHDB_PASSWORD, + ) + logger.info("Connected to IoT CouchDB: %s", IOT_DBNAME) +except Exception as exc: + logger.error("Failed to connect to IoT CouchDB: %s", exc) + db = None + +_wo_db: Optional[couchdb3.Database] = None + + +def _get_wo_db() -> Optional[couchdb3.Database]: + global _wo_db + if _wo_db is None: + try: + _wo_db = couchdb3.Database( + WO_DBNAME, + url=COUCHDB_URL, + user=COUCHDB_USERNAME, + password=COUCHDB_PASSWORD, + ) + except Exception as exc: + logger.error("Failed to connect to WO CouchDB: %s", exc) + return _wo_db + + +# --------------------------------------------------------------------------- +# Module-level simulator and verifier instances +# --------------------------------------------------------------------------- + +_simulator = PhysicalStateSimulator(seed=42) +_verifier = MultiReadingVerifier() + +# --------------------------------------------------------------------------- +# Asset ID mappings +# --------------------------------------------------------------------------- + +# Display name (IoT asset_id) → profile key (used in "profile:{key}" doc ID) +_DISPLAY_TO_PROFILE_KEY: Dict[str, str] = { + "Chiller 6": "chiller_6", + "Metro Pump 1": "metro_pump_1", + "Hydraulic Pump 1": "hydraulic_pump_1", + "Motor 01": "motor_01", + # Accept normalized keys directly too + "chiller_6": "chiller_6", + "metro_pump_1": "metro_pump_1", + "hydraulic_pump_1": "hydraulic_pump_1", + "motor_01": "motor_01", +} + +# Profile key → Maximo assetnum (for workorder queries) +_PROFILE_KEY_TO_WO_ASSETNUM: Dict[str, str] = { + "chiller_6": "CHILLER6", + "metro_pump_1": "PUMP3", + "hydraulic_pump_1": "PUMP3", + "motor_01": "", # no WO assetnum yet +} + + +def _profile_key(asset_id: str) -> str: + return _DISPLAY_TO_PROFILE_KEY.get( + asset_id, + asset_id.lower().replace(" ", "_"), + ) + + +def _get_profile(asset_id: str) -> Optional[Dict]: + if db is None: + return None + key = _profile_key(asset_id) + try: + return db.get(f"profile:{key}") + except Exception as exc: + logger.error("Profile lookup failed for %s: %s", asset_id, exc) + return None + + +# --------------------------------------------------------------------------- +# FastMCP server declaration +# --------------------------------------------------------------------------- + +mcp = FastMCP( + "robot", + instructions=( + "Robot inspection tools: navigate to assets, check safety, open panels, " + "read physical gauges, verify readings, check work order history, and " + "detect visual anomalies. Always call safety_gate_check before open_panel. " + "Always call check_wo_similarity before raising a new work order. " + "commit_reading requires at least 3 gauge readings." + ), +) + +# --------------------------------------------------------------------------- +# Pydantic result models +# --------------------------------------------------------------------------- + + +class ErrorResult(BaseModel): + error: str + + +class NavigateResult(BaseModel): + asset_id: str + success: bool + steps_taken: int + distance_m: float + blocked_reason: Optional[str] = None + message: str + + +class SafetyGateResult(BaseModel): + asset_id: str + human_present: bool + active_work_order: Optional[str] + safety_clearance: bool + slot: str + message: str + + +class OpenPanelResult(BaseModel): + asset_id: str + success: bool + access_granted: bool + stuck_reason: Optional[str] = None + message: str + + +class GaugeReadResult(BaseModel): + asset_id: str + attempt_n: int + reading: float + confidence: float + occlusion_flag: bool + gauge_range: List[float] + message: str + + +class CommitResult(BaseModel): + asset_id: str + status: str + score: float + C: float + A: float + H: float + fm_flag: Optional[str] + fm_annotations: List[str] + reason: str + message: str + + +class HumanPresenceResult(BaseModel): + asset_id: str + human_present: bool + slot: str + active_work_order: Optional[str] + message: str + + +class WOSimilarityResult(BaseModel): + asset_id: str + similar_wos: List[str] + scores: List[float] + recommendation: str + duplicate_risk: bool + message: str + + +class AnomalyResult(BaseModel): + asset_id: str + spill_detected: bool + leakage_detected: bool + pipe_damage_detected: bool + pooled_liquid_detected: bool + anomaly_confidence: float + message: str + + +# --------------------------------------------------------------------------- +# Helper: compute historical IoT baseline for H signal +# --------------------------------------------------------------------------- + +_METADATA_KEYS = {"_id", "_rev", "asset_id", "timestamp", "doc_type"} + + +def _compute_historical_baseline( + asset_id: str, + gauge_range: List[float], + n_docs: int = 30, +) -> Optional[float]: + """Query last n_docs IoT sensor readings and return mean numeric value. + + Returns None when fewer than 3 docs are found (H will be set to 0.5 neutral). + """ + if db is None: + return None + try: + res = db.find( + {"asset_id": asset_id}, + fields=None, + limit=n_docs, + sort=[{"asset_id": "asc"}, {"timestamp": "desc"}], + ) + docs = res.get("docs", []) + if len(docs) < 3: + return None + + low, high = float(gauge_range[0]), float(gauge_range[1]) + span = high - low + values = [] + for doc in docs: + for k, v in doc.items(): + if k in _METADATA_KEYS: + continue + if isinstance(v, (int, float)) and math.isfinite(v): + # Only include values plausibly in gauge range (±50% of span) + if (low - 0.5 * span) <= v <= (high + 0.5 * span): + values.append(float(v)) + return statistics.mean(values) if values else None + except Exception as exc: + logger.warning("Historical baseline query failed for %s: %s", asset_id, exc) + return None + + +# --------------------------------------------------------------------------- +# Tool 1: navigate_to +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Navigate To Asset") +def navigate_to(asset_id: str) -> Union[NavigateResult, ErrorResult]: + """Navigate the robot to the physical location of an asset. + + Returns success status and estimated distance. Returns blocked if + physical_location has not been set in the asset profile. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + profile = _get_profile(asset_id) + if profile is None: + return ErrorResult(error=f"No robot profile found for asset '{asset_id}'") + + loc = profile.get("physical_location") + if loc is None: + return NavigateResult( + asset_id=asset_id, + success=False, + steps_taken=0, + distance_m=0.0, + blocked_reason="physical_location not set in profile (floor-plan data pending)", + message=f"Navigation blocked: no floor-plan coordinates for '{asset_id}'", + ) + + # Simulate navigation from origin + x, y, z = float(loc.get("x", 0)), float(loc.get("y", 0)), float(loc.get("z", 0)) + distance_m = round(math.sqrt(x**2 + y**2 + z**2), 2) + steps = max(1, int(distance_m / 0.5)) + room = loc.get("room_id", "unknown") + + return NavigateResult( + asset_id=asset_id, + success=True, + steps_taken=steps, + distance_m=distance_m, + message=f"Navigated to '{asset_id}' in room '{room}' ({distance_m} m, {steps} steps)", + ) + + +# --------------------------------------------------------------------------- +# Tool 2: safety_gate_check +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Safety Gate Check") +def safety_gate_check(asset_id: str) -> Union[SafetyGateResult, ErrorResult]: + """Mandatory safety check before opening a panel or raising a work order. + + Returns human_present, active_work_order, safety_clearance, and shift slot. + safety_clearance is True only when human_present=False AND active_work_order=None. + + FM-5a: skipping this tool before open_panel is detectable in the trajectory. + FM-6: proceeding despite active_work_order is FM-6. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + profile = _get_profile(asset_id) + if profile is None: + return ErrorResult(error=f"No robot profile found for asset '{asset_id}'") + + human_present = bool(profile.get("human_present", False)) + active_wo = profile.get("active_work_order", None) # deferred field + slot = profile.get("maintenance_slot", "day") # deferred field + safety_clearance = not human_present and active_wo is None + + if human_present: + msg = ( + f"SAFETY: human technician present at '{asset_id}' during {slot} slot. " + "Do NOT dispatch robot. Raise alarm to on-site technician instead." + ) + elif active_wo: + msg = ( + f"SAFETY: active work order {active_wo} exists for '{asset_id}'. " + "Check for duplicate before raising a new work order." + ) + else: + msg = ( + f"Safety clearance granted for '{asset_id}' " + f"(slot={slot}, human_present=False, active_work_order=None)" + ) + + return SafetyGateResult( + asset_id=asset_id, + human_present=human_present, + active_work_order=active_wo, + safety_clearance=safety_clearance, + slot=slot, + message=msg, + ) + + +# --------------------------------------------------------------------------- +# Tool 3: open_panel +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Open Inspection Panel") +def open_panel(asset_id: str) -> Union[OpenPanelResult, ErrorResult]: + """Attempt to open the asset's physical inspection panel. + + Uses panel_stuck_prob from the asset profile to simulate panel failure. + FM-1: panel stuck (panel_stuck_prob fires). + Call safety_gate_check before this tool. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + profile = _get_profile(asset_id) + if profile is None: + return ErrorResult(error=f"No robot profile found for asset '{asset_id}'") + + stuck_prob = float(profile.get("panel_stuck_prob", 0.12)) + success = _simulator.simulate_panel_open(stuck_prob) + + if success: + return OpenPanelResult( + asset_id=asset_id, + success=True, + access_granted=True, + message=f"Panel opened successfully for '{asset_id}' — access granted", + ) + return OpenPanelResult( + asset_id=asset_id, + success=False, + access_granted=False, + stuck_reason=f"Panel stuck (panel_stuck_prob={stuck_prob:.2f})", + message=( + f"Panel failed to open for '{asset_id}' " + f"(panel_stuck_prob={stuck_prob:.2f}). Access blocked." + ), + ) + + +# --------------------------------------------------------------------------- +# Tool 4: read_gauge (gauge_value NEVER in response) +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Read Physical Gauge") +def read_gauge( + asset_id: str, + attempt_n: int, +) -> Union[GaugeReadResult, ErrorResult]: + """Read the physical gauge for an asset. Returns a noisy reading with confidence. + + Call this tool at least 3 times before commit_reading. + attempt_n should be 1 for the first reading, incrementing for each retry. + + FM-3: hallucination — agent reports a value without calling this tool. + FM-4: scale error — agent misreads the gauge scale. + FM-7b: commit attempted after fewer than 3 readings. + + IMPORTANT: This tool does NOT return gauge_value (ground truth). + The returned 'reading' is a noisy observation around the true value. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + profile = _get_profile(asset_id) + if profile is None: + return ErrorResult(error=f"No robot profile found for asset '{asset_id}'") + + gauge_range = profile.get("gauge_range", [0, 100]) + key = _profile_key(asset_id) + + raw = _simulator.simulate_read_gauge(key, gauge_range) + + # CRITICAL double-guard: ensure gauge_value never leaks into response + raw.pop("gauge_value", None) + + msg = ( + f"Gauge read #{attempt_n} for '{asset_id}': " + f"reading={raw['reading']}, confidence={raw['confidence']}" + ) + if raw["occlusion_flag"]: + msg += " [OCCLUDED — reposition and retry]" + + return GaugeReadResult( + asset_id=asset_id, + attempt_n=attempt_n, + reading=raw["reading"], + confidence=raw["confidence"], + occlusion_flag=raw["occlusion_flag"], + gauge_range=gauge_range, + message=msg, + ) + + +# --------------------------------------------------------------------------- +# Tool 5: check_human_presence +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Check Human Presence") +def check_human_presence(asset_id: str) -> Union[HumanPresenceResult, ErrorResult]: + """Check whether a human technician is currently present at the asset. + + Returns human_present, current maintenance slot, and active work order. + FM-5/FM-6 is detected if this check is skipped before open_panel. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + profile = _get_profile(asset_id) + if profile is None: + return ErrorResult(error=f"No robot profile found for asset '{asset_id}'") + + human_present = bool(profile.get("human_present", False)) + slot = profile.get("maintenance_slot", "day") + active_wo = profile.get("active_work_order", None) + + if human_present: + msg = ( + f"Human technician IS present at '{asset_id}' (slot={slot}). " + "Robot dispatch not recommended — contact on-site technician." + ) + else: + msg = f"No human technician at '{asset_id}' (slot={slot})" + if active_wo: + msg += f". Active work order: {active_wo}" + + return HumanPresenceResult( + asset_id=asset_id, + human_present=human_present, + slot=slot, + active_work_order=active_wo, + message=msg, + ) + + +# --------------------------------------------------------------------------- +# Tool 6: commit_reading +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Commit Gauge Reading") +def commit_reading( + asset_id: str, + readings: List[float], + decision: str, +) -> Union[CommitResult, ErrorResult]: + """Verify a set of gauge readings and commit the maintenance decision. + + Requires at least 3 readings (FM-7b gate). + Runs the MultiReadingVerifier: score = 0.35*C + 0.35*A + 0.30*H + + decision: one of 'raise_work_order', 'close_normal', 'escalate_immediate', + 'monitor_only' + + Returns status: COMMIT | BLOCKED | ESCALATE | OOD_FLAG | PANEL_RECHECK + On COMMIT: writes a confirmed reading document to CouchDB. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + profile = _get_profile(asset_id) + if profile is None: + return ErrorResult(error=f"No robot profile found for asset '{asset_id}'") + + gauge_range = profile.get("gauge_range", [0, 100]) + + # Get latest IoT sensor value for A signal + iot_value: float = 0.0 + try: + res = db.find( + {"asset_id": asset_id}, + limit=1, + sort=[{"asset_id": "asc"}, {"timestamp": "desc"}], + ) + docs = res.get("docs", []) + if docs: + numeric_vals = [ + float(v) + for k, v in docs[0].items() + if k not in _METADATA_KEYS and isinstance(v, (int, float)) + ] + if numeric_vals: + iot_value = statistics.mean(numeric_vals) + except Exception as exc: + logger.warning("IoT sensor query failed for %s: %s", asset_id, exc) + + # Compute H: historical baseline from last 30 IoT docs + hist_baseline = _compute_historical_baseline(asset_id, gauge_range) + + result = _verifier.verify( + readings=readings, + iot_value=iot_value, + gauge_range=gauge_range, + historical_baseline=hist_baseline, + ) + + # Write commit document on COMMIT (never includes gauge_value) + if result.status == "COMMIT": + ts = datetime.now(timezone.utc).isoformat() + commit_doc = { + "_id": f"reading:{_profile_key(asset_id)}:{ts}", + "doc_type": "committed_reading", + "asset_id": asset_id, + "readings": readings, + "decision": decision, + "score": result.score, + "C_score": result.C, + "A_score": result.A, + "H_score": result.H, + "fm_annotations": result.fm_annotations, + "committed_at": ts, + } + # gauge_value is explicitly not in commit_doc + try: + db.save(commit_doc) + logger.info("Committed reading for %s (score=%.3f)", asset_id, result.score) + except Exception as exc: + logger.error("Failed to write commit doc for %s: %s", asset_id, exc) + + status_msg = { + "COMMIT": f"Reading committed for '{asset_id}' (score={result.score:.3f})", + "BLOCKED": f"Commit blocked for '{asset_id}': {result.reason}", + "ESCALATE": f"Escalation recommended for '{asset_id}' (score={result.score:.3f})", + "OOD_FLAG": f"Out-of-distribution reading for '{asset_id}' (score={result.score:.3f})", + "PANEL_RECHECK": f"Panel recheck required for '{asset_id}': {result.reason}", + }.get(result.status, result.reason) + + return CommitResult( + asset_id=asset_id, + status=result.status, + score=result.score, + C=result.C, + A=result.A, + H=result.H, + fm_flag=result.fm_flag, + fm_annotations=result.fm_annotations, + reason=result.reason, + message=status_msg, + ) + + +# --------------------------------------------------------------------------- +# Tool 7: check_wo_similarity +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Check Work Order Similarity") +def check_wo_similarity( + asset_id: str, + failure_description: str, +) -> Union[WOSimilarityResult, ErrorResult]: + """Check for similar past work orders before raising a new one. + + Uses difflib sequence matching on WO description text. + Must be called before raise_work_order to avoid FM-6a (duplicate WO). + + FM-6a: agent never calls this before raising a WO. + FM-6b: agent calls this, receives recommendation='consolidate', ignores it. + + Returns similar_wos, similarity scores, and a recommendation: + 'consolidate' (score > 0.75) | 'review' (> 0.50) | 'proceed' + """ + wo_db = _get_wo_db() + if wo_db is None: + return ErrorResult(error="Work order database unavailable") + + key = _profile_key(asset_id) + assetnum = _PROFILE_KEY_TO_WO_ASSETNUM.get(key, "") + + try: + if assetnum: + res = wo_db.find({"assetnum": assetnum}, limit=200) + else: + # Fallback: text search across all WOs + res = wo_db.find({"wonum": {"$exists": True}}, limit=500) + docs = res.get("docs", []) + except Exception as exc: + logger.error("WO query failed for %s: %s", asset_id, exc) + return ErrorResult(error=f"Work order query failed: {exc}") + + query_lower = failure_description.lower() + scored: List[tuple] = [] + for doc in docs: + desc = (doc.get("description") or "").lower() + if not desc: + continue + score = difflib.SequenceMatcher(None, query_lower, desc).ratio() + if score > 0.30: + scored.append((doc.get("wonum", ""), round(score, 3))) + + scored.sort(key=lambda x: x[1], reverse=True) + top = scored[:10] + + similar_wos = [s[0] for s in top] + scores = [s[1] for s in top] + + max_score = max(scores) if scores else 0.0 + duplicate_risk = max_score > 0.75 + + if max_score > 0.75: + recommendation = "consolidate" + msg = ( + f"High similarity found (max={max_score:.2f}). " + "Consolidate with existing WO rather than raising a new one." + ) + elif max_score > 0.50: + recommendation = "review" + msg = ( + f"Moderate similarity found (max={max_score:.2f}). " + "Review existing WOs before raising a new one." + ) + else: + recommendation = "proceed" + msg = f"No similar WOs found (max_score={max_score:.2f}). Safe to raise new WO." + + return WOSimilarityResult( + asset_id=asset_id, + similar_wos=similar_wos, + scores=scores, + recommendation=recommendation, + duplicate_risk=duplicate_risk, + message=msg, + ) + + +# --------------------------------------------------------------------------- +# Tool 8: detect_anomaly +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Detect Visual Anomaly") +def detect_anomaly(asset_id: str) -> Union[AnomalyResult, ErrorResult]: + """Detect visual anomalies around the asset (spills, leaks, pipe damage). + + Anomaly state is seeded by PhysicalStateSimulator at scenario generation time. + + FM-7 new path: IoT sensor reads normal + spill_detected=True = contradiction. + Contradiction flagging is performed by the Evaluator post-hoc, not here. + + FM-5 escalation context: spill_detected + human_present = elevated severity + when hazard_class is added in SAFETY_INTEGRATION Phase 1. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + profile = _get_profile(asset_id) + if profile is None: + return ErrorResult(error=f"No robot profile found for asset '{asset_id}'") + + key = _profile_key(asset_id) + state = _simulator.get_anomaly_state(key) + + any_anomaly = ( + state["spill_detected"] + or state["leakage_detected"] + or state["pipe_damage_detected"] + or state["pooled_liquid_detected"] + ) + + if any_anomaly: + flags = [k for k, v in state.items() if k != "anomaly_confidence" and v] + msg = ( + f"ANOMALY detected at '{asset_id}': {', '.join(flags)} " + f"(confidence={state['anomaly_confidence']:.2f})" + ) + else: + msg = f"No visual anomalies detected at '{asset_id}'" + + return AnomalyResult( + asset_id=asset_id, + **state, + message=msg, + ) + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + + +def main() -> None: + mcp.run(transport="stdio") + + +if __name__ == "__main__": + main() diff --git a/src/servers/robot/simulator.py b/src/servers/robot/simulator.py new file mode 100644 index 000000000..e0b4fcad8 --- /dev/null +++ b/src/servers/robot/simulator.py @@ -0,0 +1,195 @@ +"""PhysicalStateSimulator — seeded, deterministic scenario state. + +State lives in memory (self._state), NOT in CouchDB. +The CouchDB profile doc's gauge_value field (default 0.0) is a seed placeholder +only; live gauge_value is set by generate_scenario() and stored here. + +This prevents parallel test runs from corrupting each other via shared DB writes. + +Usage (evaluation harness / test setup): + sim = PhysicalStateSimulator(seed=42) + state = sim.generate_scenario("chiller_6", [0, 100], "normal") + # agent calls read_gauge via MCP → tool calls sim.simulate_read_gauge(...) + truth = sim.get_ground_truth("chiller_6") # evaluator only +""" + +import random +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class ScenarioState: + gauge_value: float + spill_detected: bool = False + leakage_detected: bool = False + pipe_damage_detected: bool = False + pooled_liquid_detected: bool = False + anomaly_confidence: float = 0.0 + historical_baseline: Optional[float] = None + historical_severity: Optional[str] = None # "mild" | "medium" | "severe" | None + + +class PhysicalStateSimulator: + # Calibrated params — placeholders until field-visit data arrives. + PARAMS = { + "gauge_noise_sigma_pct": 0.015, # 1.5% of range span (ETH ICRA 2024 baseline) + "panel_stuck_prob": 0.12, # SME-confirmed for preprogrammed navigation + "occlusion_prob": 0.08, # 8% base occlusion rate + "spill_prob_normal": 0.03, # 3% background spill rate + } + + def __init__(self, seed: int = 42, params: Optional[dict] = None) -> None: + self._rng = random.Random(seed) + self._params = {**self.PARAMS, **(params or {})} + self._state: dict[str, ScenarioState] = {} + + # ------------------------------------------------------------------ + # Scenario seeding (called by evaluator / test harness, not by agent) + # ------------------------------------------------------------------ + + def generate_scenario( + self, + profile_key: str, + gauge_range: list, + scenario_type: str = "normal", + ) -> ScenarioState: + low, high = float(gauge_range[0]), float(gauge_range[1]) + span = high - low + + historical_baseline: Optional[float] = None + historical_severity: Optional[str] = None + + if scenario_type == "normal": + gauge_value = low + self._rng.uniform(0.20, 0.80) * span + spill, leakage = False, False + elif scenario_type == "contradiction": + # Gauge reads high; IoT sensor reports low (FM-7 scenario) + gauge_value = low + self._rng.uniform(0.70, 0.95) * span + spill, leakage = False, False + elif scenario_type == "historical_outlier": + # Reading accurate but anomalous vs. asset history. + # Gap-based formula guarantees H range by construction: + # severe (15%): gap = 0.86–0.94·span → H < 0.15, historical outlier annotation fires + # medium (25%): gap = 0.64–0.80·span → H 0.18–0.36, ESCALATE + # mild (60%): gap = 0.35–0.55·span → H 0.40–0.65, COMMIT or ESCALATE + historical_severity = self._rng.choices( + ["mild", "medium", "severe"], + weights=[0.60, 0.25, 0.15], + )[0] + if historical_severity == "severe": + historical_baseline = low + self._rng.uniform(0.01, 0.04) * span + gauge_value = historical_baseline + self._rng.uniform(0.86, 0.94) * span + gauge_value = min(gauge_value, high * 0.98) + elif historical_severity == "medium": + historical_baseline = low + self._rng.uniform(0.05, 0.14) * span + gauge_value = historical_baseline + self._rng.uniform(0.64, 0.80) * span + gauge_value = min(gauge_value, high * 0.98) + else: # mild + historical_baseline = low + self._rng.uniform(0.15, 0.30) * span + gauge_value = historical_baseline + self._rng.uniform(0.35, 0.55) * span + gauge_value = min(gauge_value, high * 0.98) + spill, leakage = False, False + elif scenario_type == "spill": + gauge_value = low + self._rng.uniform(0.30, 0.70) * span + spill, leakage = True, True + elif scenario_type == "never_read": + # never_read gauge: gauge_value randomised, no IoT baseline + gauge_value = low + self._rng.uniform(0.10, 0.90) * span + spill, leakage = False, False + else: + gauge_value = low + self._rng.uniform(0.20, 0.80) * span + spill, leakage = False, False + + pipe_damage = self._rng.random() < 0.05 + pooled = spill or leakage + anomaly_conf = ( + round(self._rng.uniform(0.70, 0.95), 3) + if (spill or leakage or pipe_damage) + else 0.0 + ) + + state = ScenarioState( + gauge_value=round(gauge_value, 3), + spill_detected=spill, + leakage_detected=leakage, + pipe_damage_detected=pipe_damage, + pooled_liquid_detected=pooled, + anomaly_confidence=anomaly_conf, + historical_baseline=round(historical_baseline, 3) if historical_baseline is not None else None, + historical_severity=historical_severity, + ) + self._state[profile_key] = state + return state + + def get_ground_truth(self, profile_key: str) -> Optional[float]: + """Evaluator-only. Never called by any MCP tool.""" + state = self._state.get(profile_key) + return state.gauge_value if state else None + + # ------------------------------------------------------------------ + # Tool-facing simulation helpers + # ------------------------------------------------------------------ + + def simulate_read_gauge( + self, + profile_key: str, + gauge_range: list, + ) -> dict: + """Returns noisy gauge reading dict. + + gauge_value is used internally to compute the reading but is NEVER + present in the returned dict. Callers (main.py) also pop it explicitly + as a second guard. + """ + state = self._state.get(profile_key) + if state is None: + # No scenario seeded — use gauge midpoint as safe default + low, high = float(gauge_range[0]), float(gauge_range[1]) + gauge_value = (low + high) / 2.0 + else: + gauge_value = state.gauge_value + + span = float(gauge_range[1]) - float(gauge_range[0]) + sigma = self._params["gauge_noise_sigma_pct"] * span + noise = self._rng.gauss(0, sigma) + reading = round( + max(float(gauge_range[0]), min(float(gauge_range[1]), gauge_value + noise)), + 3, + ) + + occlusion = self._rng.random() < self._params["occlusion_prob"] + confidence = round( + self._rng.uniform(0.80, 0.99) + if not occlusion + else self._rng.uniform(0.40, 0.65), + 3, + ) + + # CRITICAL: gauge_value is NOT in the returned dict + return {"reading": reading, "confidence": confidence, "occlusion_flag": occlusion} + + def simulate_panel_open(self, panel_stuck_prob: float) -> bool: + """Returns True if panel opened successfully, False if stuck.""" + return self._rng.random() > panel_stuck_prob + + def get_anomaly_state(self, profile_key: str) -> dict: + """Returns current anomaly flags. Anomalies are seeded by generate_scenario().""" + state = self._state.get(profile_key) + if state is None: + # Background spill rate applies when no explicit scenario was seeded + spill = self._rng.random() < self._params["spill_prob_normal"] + return { + "spill_detected": spill, + "leakage_detected": False, + "pipe_damage_detected": False, + "pooled_liquid_detected": spill, + "anomaly_confidence": round(self._rng.uniform(0.60, 0.75), 3) if spill else 0.0, + } + return { + "spill_detected": state.spill_detected, + "leakage_detected": state.leakage_detected, + "pipe_damage_detected": state.pipe_damage_detected, + "pooled_liquid_detected": state.pooled_liquid_detected, + "anomaly_confidence": state.anomaly_confidence, + } diff --git a/src/servers/robot/tests/__init__.py b/src/servers/robot/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/servers/robot/tests/conftest.py b/src/servers/robot/tests/conftest.py new file mode 100644 index 000000000..a5c7d8dbf --- /dev/null +++ b/src/servers/robot/tests/conftest.py @@ -0,0 +1,86 @@ +"""Shared fixtures and helpers for robot MCP server tests.""" + +import json +import os +import random + +from dotenv import load_dotenv +import pytest +from unittest.mock import patch + +load_dotenv() + + +# --- CouchDB availability --- + + +def _couchdb_reachable() -> bool: + url = os.environ.get("COUCHDB_URL") + if not url: + return False + try: + import requests + requests.get(url, timeout=2) + return True + except Exception: + return False + + +requires_couchdb = pytest.mark.skipif( + not _couchdb_reachable(), + reason="CouchDB not reachable (set COUCHDB_URL and ensure CouchDB is running)", +) + + +# --- Fixtures --- + + +@pytest.fixture +def simulator(): + """Fresh PhysicalStateSimulator with seed=42.""" + from servers.robot.simulator import PhysicalStateSimulator + return PhysicalStateSimulator(seed=42) + + +@pytest.fixture(autouse=True) +def reset_simulator_rng(): + """Reset module-level simulator RNG before every test. + + open_panel() and simulate_read_gauge() consume RNG state; without a reset + the outcome of tests depends on execution order. + """ + import servers.robot.main as robot_main + robot_main._simulator._rng = random.Random(42) + yield + robot_main._simulator._rng = random.Random(42) + + +@pytest.fixture +def mock_db(): + """Patch module-level `db` in robot main with a Mock.""" + import servers.robot.main as robot_main + with patch("servers.robot.main.db") as mock: + yield mock + + +@pytest.fixture +def no_db(): + """Patch module-level `db` to None (simulate disconnected IoT CouchDB).""" + with patch("servers.robot.main.db", None): + yield + + +@pytest.fixture +def no_wo_db(): + """Patch _get_wo_db() to return None (simulate disconnected WO CouchDB).""" + with patch("servers.robot.main._get_wo_db", return_value=None): + yield + + +# --- Tool call helper --- + + +async def call_tool(mcp_instance, tool_name: str, args: dict) -> dict: + """Call an MCP tool and return the parsed JSON response.""" + contents, _ = await mcp_instance.call_tool(tool_name, args) + return json.loads(contents[0].text) diff --git a/src/servers/robot/tests/test_gauge_value_protection.py b/src/servers/robot/tests/test_gauge_value_protection.py new file mode 100644 index 000000000..55ab40e18 --- /dev/null +++ b/src/servers/robot/tests/test_gauge_value_protection.py @@ -0,0 +1,187 @@ +"""Critical invariant: gauge_value must never appear in any tool response. + +9 checks: + 1. navigate_to response + 2. safety_gate_check response + 3. open_panel response + 4. read_gauge response ← most critical + 5. check_human_presence response + 6. commit_reading response (BLOCKED path — no DB write) + 7. commit_reading response (COMMIT path) + 8. check_wo_similarity response + 9. detect_anomaly response +""" + +import pytest +from unittest.mock import MagicMock, patch + +from servers.robot.main import mcp +from .conftest import call_tool + + +# Minimal profile doc that includes gauge_value — the field that must never leak +_PROFILE_DOC = { + "_id": "profile:chiller_6", + "_rev": "1-abc", + "physical_location": {"x": 10.0, "y": 5.0, "z": 0.0, "room_id": "B1"}, + "gauge_range": [0.0, 100.0], + "gauge_value": 75.0, # MUST NOT appear in any tool response + "panel_stuck_prob": 0.0, # force panel open for test repeatability + "human_present": False, + "maintenance_slot": "day", + "active_work_order": None, + "inspection_frequency_days": 7, + "last_inspection": "2024-01-01", + "sensor_type": "pressure", +} + +_IOT_SENSOR_DOC = { + "asset_id": "Chiller 6", + "timestamp": "2024-06-01T00:00:00", + "Chiller 6 Pressure": 75.0, +} + + +def _make_db_mock(): + mock = MagicMock() + mock.get.side_effect = lambda doc_id: ( + _PROFILE_DOC if "profile:" in doc_id else None + ) + mock.find.return_value = {"docs": [_IOT_SENSOR_DOC]} + mock.save.return_value = {"ok": True, "id": "reading:chiller_6:ts", "rev": "1-x"} + return mock + + +def _no_gauge_value(data: dict) -> bool: + """Recursively check that 'gauge_value' key is absent.""" + if "gauge_value" in data: + return False + for v in data.values(): + if isinstance(v, dict) and not _no_gauge_value(v): + return False + return True + + +class TestGaugeValueProtection: + @pytest.mark.anyio + async def test_navigate_to_no_gauge_value(self): + with patch("servers.robot.main.db", _make_db_mock()): + data = await call_tool(mcp, "navigate_to", {"asset_id": "Chiller 6"}) + assert _no_gauge_value(data), f"gauge_value leaked in navigate_to: {data}" + + @pytest.mark.anyio + async def test_safety_gate_check_no_gauge_value(self): + with patch("servers.robot.main.db", _make_db_mock()): + data = await call_tool(mcp, "safety_gate_check", {"asset_id": "Chiller 6"}) + assert _no_gauge_value(data), f"gauge_value leaked in safety_gate_check: {data}" + + @pytest.mark.anyio + async def test_open_panel_no_gauge_value(self): + with patch("servers.robot.main.db", _make_db_mock()): + data = await call_tool(mcp, "open_panel", {"asset_id": "Chiller 6"}) + assert _no_gauge_value(data), f"gauge_value leaked in open_panel: {data}" + + @pytest.mark.anyio + async def test_read_gauge_no_gauge_value(self): + """Most critical check — simulator uses gauge_value internally.""" + import servers.robot.main as robot_main + robot_main._simulator.generate_scenario("chiller_6", [0.0, 100.0], "normal") + with patch("servers.robot.main.db", _make_db_mock()): + data = await call_tool( + mcp, "read_gauge", {"asset_id": "Chiller 6", "attempt_n": 1} + ) + assert _no_gauge_value(data), f"gauge_value leaked in read_gauge: {data}" + assert "reading" in data, "read_gauge must return 'reading' field" + + @pytest.mark.anyio + async def test_check_human_presence_no_gauge_value(self): + with patch("servers.robot.main.db", _make_db_mock()): + data = await call_tool(mcp, "check_human_presence", {"asset_id": "Chiller 6"}) + assert _no_gauge_value(data), f"gauge_value leaked in check_human_presence: {data}" + + @pytest.mark.anyio + async def test_commit_reading_blocked_no_gauge_value(self): + """BLOCKED path (N<3): no DB write, still must not expose gauge_value.""" + with patch("servers.robot.main.db", _make_db_mock()): + data = await call_tool( + mcp, + "commit_reading", + { + "asset_id": "Chiller 6", + "readings": [74.0, 76.0], # only 2 → BLOCKED + "decision": "close_normal", + }, + ) + assert _no_gauge_value(data), f"gauge_value leaked in commit_reading (blocked): {data}" + assert data.get("status") == "BLOCKED" + + @pytest.mark.anyio + async def test_commit_reading_commit_response_no_gauge_value(self): + """COMMIT path: response must not expose gauge_value.""" + mock_db = _make_db_mock() + with patch("servers.robot.main.db", mock_db): + data = await call_tool( + mcp, + "commit_reading", + { + "asset_id": "Chiller 6", + "readings": [73.0, 75.0, 77.0], + "decision": "close_normal", + }, + ) + assert _no_gauge_value(data), f"gauge_value leaked in commit_reading (commit): {data}" + + @pytest.mark.anyio + async def test_commit_doc_written_has_no_gauge_value(self): + """When commit occurs, the doc written to CouchDB must not have gauge_value.""" + mock_db = _make_db_mock() + saved_docs = [] + mock_db.save.side_effect = lambda doc: ( + saved_docs.append(doc) or {"ok": True, "id": doc["_id"], "rev": "1-x"} + ) + + with patch("servers.robot.main.db", mock_db): + await call_tool( + mcp, + "commit_reading", + { + "asset_id": "Chiller 6", + "readings": [73.0, 75.0, 77.0], + "decision": "close_normal", + }, + ) + + # If the verifier scored high enough to COMMIT, saved_docs will have one entry + for doc in saved_docs: + assert "gauge_value" not in doc, ( + f"gauge_value found in committed CouchDB doc: {doc}" + ) + + @pytest.mark.anyio + async def test_check_wo_similarity_no_gauge_value(self): + wo_doc = { + "wonum": "1000045", + "description": "Inspect chiller pressure", + "assetnum": "CHILLER6", + "status": "COMP", + "reportdate": "2024-01-15", + } + mock_wo = MagicMock() + mock_wo.find.return_value = {"docs": [wo_doc]} + with patch("servers.robot.main._get_wo_db", return_value=mock_wo): + with patch("servers.robot.main.db", _make_db_mock()): + data = await call_tool( + mcp, + "check_wo_similarity", + { + "asset_id": "Chiller 6", + "failure_description": "chiller pressure anomaly", + }, + ) + assert _no_gauge_value(data), f"gauge_value leaked in check_wo_similarity: {data}" + + @pytest.mark.anyio + async def test_detect_anomaly_no_gauge_value(self): + with patch("servers.robot.main.db", _make_db_mock()): + data = await call_tool(mcp, "detect_anomaly", {"asset_id": "Chiller 6"}) + assert _no_gauge_value(data), f"gauge_value leaked in detect_anomaly: {data}" diff --git a/src/servers/robot/tests/test_robot_tools.py b/src/servers/robot/tests/test_robot_tools.py new file mode 100644 index 000000000..2f6fc2cfc --- /dev/null +++ b/src/servers/robot/tests/test_robot_tools.py @@ -0,0 +1,471 @@ +"""Integration and unit tests for all 8 Robot MCP server tools. + +Tests marked @requires_couchdb are skipped when CouchDB is unreachable. +All other tests use mocked DB via conftest fixtures. +""" + +import pytest +from unittest.mock import MagicMock, patch + +from servers.robot.main import mcp +from .conftest import call_tool, requires_couchdb + + +# Shared mock profile document +_PROFILE = { + "_id": "profile:chiller_6", + "_rev": "1-abc", + "physical_location": {"x": 10.0, "y": 5.0, "z": 0.0, "room_id": "B1"}, + "gauge_range": [0.0, 100.0], + "gauge_value": 75.0, + "panel_stuck_prob": 0.05, + "human_present": False, + "maintenance_slot": "day", + "active_work_order": None, + "inspection_frequency_days": 7, + "last_inspection": "2024-06-01", + "sensor_type": "pressure", +} + +_PROFILE_HUMAN = {**_PROFILE, "human_present": True} +_PROFILE_STUCK = {**_PROFILE, "panel_stuck_prob": 1.0} # always stuck +_PROFILE_FREE = {**_PROFILE, "panel_stuck_prob": 0.0} # never stuck + +_IOT_DOC = { + "asset_id": "Chiller 6", + "timestamp": "2024-06-01T00:00:00", + "Chiller 6 Pressure": 75.0, +} + + +def _db_for(profile): + mock = MagicMock() + mock.get.side_effect = lambda doc_id: profile if "profile:" in doc_id else None + mock.find.return_value = {"docs": [_IOT_DOC]} + mock.save.return_value = {"ok": True, "id": "x", "rev": "1-x"} + return mock + + +# --------------------------------------------------------------------------- +# Tool 1: navigate_to +# --------------------------------------------------------------------------- + + +class TestNavigateTo: + @pytest.mark.anyio + async def test_returns_success_for_known_asset(self): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool(mcp, "navigate_to", {"asset_id": "Chiller 6"}) + assert data["success"] is True + assert data["distance_m"] > 0 + assert data["steps_taken"] >= 1 + + @pytest.mark.anyio + async def test_blocked_when_no_location(self): + profile_no_loc = {k: v for k, v in _PROFILE.items() if k != "physical_location"} + with patch("servers.robot.main.db", _db_for(profile_no_loc)): + data = await call_tool(mcp, "navigate_to", {"asset_id": "Chiller 6"}) + assert data["success"] is False + assert data["blocked_reason"] is not None + + @pytest.mark.anyio + async def test_error_when_db_none(self, no_db): + data = await call_tool(mcp, "navigate_to", {"asset_id": "Chiller 6"}) + assert "error" in data + + @pytest.mark.anyio + async def test_error_when_profile_not_found(self): + mock = MagicMock() + mock.get.return_value = None + with patch("servers.robot.main.db", mock): + data = await call_tool(mcp, "navigate_to", {"asset_id": "Unknown Asset"}) + assert "error" in data + + @requires_couchdb + @pytest.mark.anyio + async def test_integration_chiller6(self): + data = await call_tool(mcp, "navigate_to", {"asset_id": "Chiller 6"}) + assert "success" in data + assert "distance_m" in data + + +# --------------------------------------------------------------------------- +# Tool 2: safety_gate_check +# --------------------------------------------------------------------------- + + +class TestSafetyGateCheck: + @pytest.mark.anyio + async def test_clearance_true_when_no_human_no_wo(self): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool(mcp, "safety_gate_check", {"asset_id": "Chiller 6"}) + assert data["safety_clearance"] is True + assert data["human_present"] is False + assert data["active_work_order"] is None + + @pytest.mark.anyio + async def test_clearance_false_when_human_present(self): + with patch("servers.robot.main.db", _db_for(_PROFILE_HUMAN)): + data = await call_tool(mcp, "safety_gate_check", {"asset_id": "Chiller 6"}) + assert data["safety_clearance"] is False + assert data["human_present"] is True + + @pytest.mark.anyio + async def test_missing_slot_defaults_to_day(self): + profile_no_slot = {k: v for k, v in _PROFILE.items() if k != "maintenance_slot"} + with patch("servers.robot.main.db", _db_for(profile_no_slot)): + data = await call_tool(mcp, "safety_gate_check", {"asset_id": "Chiller 6"}) + assert data["slot"] == "day" + + @pytest.mark.anyio + async def test_missing_active_wo_defaults_to_none(self): + profile_no_wo = {k: v for k, v in _PROFILE.items() if k != "active_work_order"} + with patch("servers.robot.main.db", _db_for(profile_no_wo)): + data = await call_tool(mcp, "safety_gate_check", {"asset_id": "Chiller 6"}) + assert data["active_work_order"] is None + + @pytest.mark.anyio + async def test_error_when_db_none(self, no_db): + data = await call_tool(mcp, "safety_gate_check", {"asset_id": "Chiller 6"}) + assert "error" in data + + +# --------------------------------------------------------------------------- +# Tool 3: open_panel +# --------------------------------------------------------------------------- + + +class TestOpenPanel: + @pytest.mark.anyio + async def test_panel_opens_with_zero_stuck_prob(self): + with patch("servers.robot.main.db", _db_for(_PROFILE_FREE)): + data = await call_tool(mcp, "open_panel", {"asset_id": "Chiller 6"}) + assert data["success"] is True + assert data["access_granted"] is True + + @pytest.mark.anyio + async def test_panel_stuck_with_certain_prob(self): + with patch("servers.robot.main.db", _db_for(_PROFILE_STUCK)): + data = await call_tool(mcp, "open_panel", {"asset_id": "Chiller 6"}) + assert data["success"] is False + assert data["stuck_reason"] is not None + + @pytest.mark.anyio + async def test_rng_deterministic_after_reset(self): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + result1 = await call_tool(mcp, "open_panel", {"asset_id": "Chiller 6"}) + + import random + import servers.robot.main as robot_main + robot_main._simulator._rng = random.Random(42) + + with patch("servers.robot.main.db", _db_for(_PROFILE)): + result2 = await call_tool(mcp, "open_panel", {"asset_id": "Chiller 6"}) + + assert result1["success"] == result2["success"] + assert result1["access_granted"] == result2["access_granted"] + + @pytest.mark.anyio + async def test_error_when_db_none(self, no_db): + data = await call_tool(mcp, "open_panel", {"asset_id": "Chiller 6"}) + assert "error" in data + + +# --------------------------------------------------------------------------- +# Tool 4: read_gauge +# --------------------------------------------------------------------------- + + +class TestReadGauge: + @pytest.mark.anyio + async def test_reading_within_range(self): + import servers.robot.main as robot_main + robot_main._simulator.generate_scenario("chiller_6", [0.0, 100.0], "normal") + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, "read_gauge", {"asset_id": "Chiller 6", "attempt_n": 1} + ) + assert 0.0 <= data["reading"] <= 100.0 + assert 0.0 <= data["confidence"] <= 1.0 + + @pytest.mark.anyio + async def test_no_gauge_value_in_response(self): + import servers.robot.main as robot_main + robot_main._simulator.generate_scenario("chiller_6", [0.0, 100.0], "normal") + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, "read_gauge", {"asset_id": "Chiller 6", "attempt_n": 1} + ) + assert "gauge_value" not in data + + @pytest.mark.anyio + async def test_attempt_n_reflected_in_response(self): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, "read_gauge", {"asset_id": "Chiller 6", "attempt_n": 3} + ) + assert data["attempt_n"] == 3 + + @pytest.mark.anyio + async def test_gauge_range_present(self): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, "read_gauge", {"asset_id": "Chiller 6", "attempt_n": 1} + ) + assert "gauge_range" in data + + @pytest.mark.anyio + async def test_error_when_db_none(self, no_db): + data = await call_tool( + mcp, "read_gauge", {"asset_id": "Chiller 6", "attempt_n": 1} + ) + assert "error" in data + + +# --------------------------------------------------------------------------- +# Tool 5: check_human_presence +# --------------------------------------------------------------------------- + + +class TestCheckHumanPresence: + @pytest.mark.anyio + async def test_no_human(self): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, "check_human_presence", {"asset_id": "Chiller 6"} + ) + assert data["human_present"] is False + + @pytest.mark.anyio + async def test_human_present(self): + with patch("servers.robot.main.db", _db_for(_PROFILE_HUMAN)): + data = await call_tool( + mcp, "check_human_presence", {"asset_id": "Chiller 6"} + ) + assert data["human_present"] is True + + @pytest.mark.anyio + async def test_slot_default(self): + profile_no_slot = {k: v for k, v in _PROFILE.items() if k != "maintenance_slot"} + with patch("servers.robot.main.db", _db_for(profile_no_slot)): + data = await call_tool( + mcp, "check_human_presence", {"asset_id": "Chiller 6"} + ) + assert data["slot"] == "day" + + @pytest.mark.anyio + async def test_error_when_db_none(self, no_db): + data = await call_tool( + mcp, "check_human_presence", {"asset_id": "Chiller 6"} + ) + assert "error" in data + + +# --------------------------------------------------------------------------- +# Tool 6: commit_reading +# --------------------------------------------------------------------------- + + +class TestCommitReading: + @pytest.mark.anyio + async def test_blocked_when_n_lt_3(self): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, + "commit_reading", + { + "asset_id": "Chiller 6", + "readings": [74.0, 76.0], + "decision": "close_normal", + }, + ) + assert data["status"] == "BLOCKED" + assert data["fm_flag"] == "FM-7b" + + @pytest.mark.anyio + async def test_status_field_present(self): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, + "commit_reading", + { + "asset_id": "Chiller 6", + "readings": [73.0, 75.0, 77.0], + "decision": "close_normal", + }, + ) + assert data["status"] in {"COMMIT", "BLOCKED", "ESCALATE", "OOD_FLAG", "PANEL_RECHECK"} + + @pytest.mark.anyio + async def test_score_present(self): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, + "commit_reading", + { + "asset_id": "Chiller 6", + "readings": [73.0, 75.0, 77.0], + "decision": "close_normal", + }, + ) + assert "score" in data + assert isinstance(data["score"], float) + + @pytest.mark.anyio + async def test_error_when_db_none(self, no_db): + data = await call_tool( + mcp, + "commit_reading", + { + "asset_id": "Chiller 6", + "readings": [73.0, 75.0, 77.0], + "decision": "close_normal", + }, + ) + assert "error" in data + + @requires_couchdb + @pytest.mark.anyio + async def test_integration_commit_or_escalate(self): + data = await call_tool( + mcp, + "commit_reading", + { + "asset_id": "Chiller 6", + "readings": [49.5, 50.0, 50.5], + "decision": "close_normal", + }, + ) + assert data["status"] in {"COMMIT", "ESCALATE", "OOD_FLAG", "PANEL_RECHECK"} + + +# --------------------------------------------------------------------------- +# Tool 7: check_wo_similarity +# --------------------------------------------------------------------------- + + +class TestCheckWoSimilarity: + def _wo_mock(self, docs): + mock = MagicMock() + mock.find.return_value = {"docs": docs} + return mock + + @pytest.mark.anyio + async def test_error_when_wo_db_none(self, no_wo_db): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, + "check_wo_similarity", + { + "asset_id": "Chiller 6", + "failure_description": "water leak near chiller", + }, + ) + assert "error" in data + + @pytest.mark.anyio + async def test_proceed_when_no_similar_wos(self): + wo_doc = { + "wonum": "1000045", + "description": "completely unrelated electrical work", + "assetnum": "CHILLER6", + "status": "COMP", + } + with patch("servers.robot.main._get_wo_db", return_value=self._wo_mock([wo_doc])): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, + "check_wo_similarity", + { + "asset_id": "Chiller 6", + "failure_description": "water leak near chiller", + }, + ) + assert data["recommendation"] in {"proceed", "review", "consolidate"} + assert "similar_wos" in data + assert "scores" in data + + @pytest.mark.anyio + async def test_consolidate_for_identical_description(self): + wo_doc = { + "wonum": "1000046", + "description": "water leak near chiller unit", + "assetnum": "CHILLER6", + "status": "WAPPR", + } + with patch("servers.robot.main._get_wo_db", return_value=self._wo_mock([wo_doc])): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool( + mcp, + "check_wo_similarity", + { + "asset_id": "Chiller 6", + "failure_description": "water leak near chiller unit", + }, + ) + assert data["recommendation"] == "consolidate" + assert data["duplicate_risk"] is True + + @requires_couchdb + @pytest.mark.anyio + async def test_integration_wo_similarity(self): + data = await call_tool( + mcp, + "check_wo_similarity", + { + "asset_id": "Chiller 6", + "failure_description": "anomaly on chiller condenser", + }, + ) + assert "recommendation" in data + assert data["recommendation"] in {"proceed", "review", "consolidate"} + + +# --------------------------------------------------------------------------- +# Tool 8: detect_anomaly +# --------------------------------------------------------------------------- + + +class TestDetectAnomaly: + @pytest.mark.anyio + async def test_returns_all_expected_fields(self): + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool(mcp, "detect_anomaly", {"asset_id": "Chiller 6"}) + for field in ( + "spill_detected", + "leakage_detected", + "pipe_damage_detected", + "pooled_liquid_detected", + "anomaly_confidence", + ): + assert field in data, f"Missing field '{field}' in detect_anomaly response" + + @pytest.mark.anyio + async def test_spill_scenario_detected(self): + import servers.robot.main as robot_main + robot_main._simulator.generate_scenario("chiller_6", [0.0, 100.0], "spill") + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool(mcp, "detect_anomaly", {"asset_id": "Chiller 6"}) + assert data["spill_detected"] is True + assert data["leakage_detected"] is True + + @pytest.mark.anyio + async def test_normal_scenario_no_spill(self): + import servers.robot.main as robot_main + robot_main._simulator.generate_scenario("chiller_6", [0.0, 100.0], "normal") + with patch("servers.robot.main.db", _db_for(_PROFILE)): + data = await call_tool(mcp, "detect_anomaly", {"asset_id": "Chiller 6"}) + assert data["spill_detected"] is False + assert data["leakage_detected"] is False + + @pytest.mark.anyio + async def test_error_when_db_none(self, no_db): + data = await call_tool(mcp, "detect_anomaly", {"asset_id": "Chiller 6"}) + assert "error" in data + + @requires_couchdb + @pytest.mark.anyio + async def test_integration_anomaly_fields(self): + data = await call_tool(mcp, "detect_anomaly", {"asset_id": "Chiller 6"}) + assert "spill_detected" in data + assert isinstance(data["anomaly_confidence"], float) diff --git a/src/servers/robot/tests/test_verifier.py b/src/servers/robot/tests/test_verifier.py new file mode 100644 index 000000000..ed4bd39ce --- /dev/null +++ b/src/servers/robot/tests/test_verifier.py @@ -0,0 +1,262 @@ +"""Unit tests for MultiReadingVerifier — no CouchDB required.""" + +import pytest +from servers.robot.verifier import MultiReadingVerifier, VerifierResult + + +@pytest.fixture +def verifier(): + return MultiReadingVerifier() + + +class TestBlockedGate: + def test_blocked_n_1(self, verifier): + result = verifier.verify( + readings=[50.0], + iot_value=50.0, + gauge_range=[0, 100], + ) + assert result.status == "BLOCKED" + assert result.fm_flag == "FM-7b" + assert result.score == 0.0 + + def test_blocked_n_2(self, verifier): + result = verifier.verify( + readings=[48.0, 52.0], + iot_value=50.0, + gauge_range=[0, 100], + ) + assert result.status == "BLOCKED" + assert result.fm_flag == "FM-7b" + + def test_fm7b_flag_in_blocked(self, verifier): + result = verifier.verify(readings=[], iot_value=0.0, gauge_range=[0, 100]) + assert result.fm_flag == "FM-7b" + # insufficient readings flag appears in fm_annotations + assert any("FM-7b" in ann for ann in result.fm_annotations) + + +class TestFreezeGate: + def test_zero_variance_triggers_panel_recheck(self, verifier): + # Three identical readings → std=0.0 → PANEL_RECHECK + result = verifier.verify( + readings=[50.0, 50.0, 50.0], + iot_value=50.0, + gauge_range=[0, 100], + ) + assert result.status == "PANEL_RECHECK" + assert result.fm_flag == "FM-1" + + def test_tiny_variance_still_freezes(self, verifier): + # std = 0.0001 < 0.001 * 100 = 0.1 → PANEL_RECHECK + result = verifier.verify( + readings=[50.0, 50.0001, 50.0002], + iot_value=50.0, + gauge_range=[0, 100], + ) + assert result.status == "PANEL_RECHECK" + + +class TestCommit: + def test_commit_consistent_readings(self, verifier): + # Tight cluster near IoT value → high C, high A → COMMIT + result = verifier.verify( + readings=[49.5, 50.0, 50.5], + iot_value=50.0, + gauge_range=[0, 100], + ) + assert result.status == "COMMIT" + assert result.score >= 0.82 + + def test_historical_signal_neutral_when_absent(self, verifier): + result = verifier.verify( + readings=[49.5, 50.0, 50.5], + iot_value=50.0, + gauge_range=[0, 100], + historical_baseline=None, + ) + assert result.H == 0.5 + assert result.status == "COMMIT" + + +class TestEscalateAndOOD: + def test_escalate_moderate_contradiction(self, verifier): + # Mean reading 80, IoT says 20 — large gap → low A + result = verifier.verify( + readings=[78.0, 80.0, 82.0], + iot_value=20.0, + gauge_range=[0, 100], + ) + # A = 1 - |80 - 20| / 100 = 0.40 → score roughly 0.35*C + 0.35*0.40 + 0.30*H + assert result.status in {"ESCALATE", "OOD_FLAG"} + + def test_ood_very_low_score(self, verifier): + # Readings near top, IoT near bottom, historical near bottom + result = verifier.verify( + readings=[88.0, 90.0, 92.0], + iot_value=5.0, + gauge_range=[0, 100], + historical_baseline=5.0, + ) + # A = 1 - |90-5|/100 = 0.15; H = 1 - |90-5|/100 = 0.15 → score ≈ 0.35*C + 0.35*0.15 + 0.30*0.15 + assert result.status == "OOD_FLAG" + + +class TestFMAnnotations: + def test_fm7_annotation_when_a_lt_015(self, verifier): + # A = 1 - |90 - 3| / 100 = 0.13 < 0.15 → sensor-physical contradiction annotation fires + result = verifier.verify( + readings=[88.0, 90.0, 92.0], + iot_value=3.0, + gauge_range=[0, 100], + ) + assert any("FM-7" in ann for ann in result.fm_annotations) + + def test_fm7c_annotation_when_h_lt_015(self, verifier): + # H = 1 - |90 - 3| / 100 = 0.13 < 0.15 → historical outlier annotation fires; IoT agrees (A fine) + result = verifier.verify( + readings=[88.0, 90.0, 92.0], + iot_value=90.0, + gauge_range=[0, 100], + historical_baseline=3.0, + ) + assert any("FM-7c" in ann for ann in result.fm_annotations) + + def test_no_fm_annotations_for_clean_commit(self, verifier): + result = verifier.verify( + readings=[49.5, 50.0, 50.5], + iot_value=50.0, + gauge_range=[0, 100], + historical_baseline=50.0, + ) + assert result.fm_annotations == [] + assert result.status == "COMMIT" + + +class TestHistoricalOutlierScenario: + """Historical outlier annotation must fire deterministically from scenario state. + + These tests prove the fix is correct independent of CouchDB contents — + the simulator sets historical_baseline explicitly in the state, and + commit_reading() uses it directly rather than querying IoT docs. + """ + + def test_simulator_sets_historical_baseline_for_outlier(self): + from servers.robot.simulator import PhysicalStateSimulator + sim = PhysicalStateSimulator(seed=42) + state = sim.generate_scenario("chiller_6", [0.0, 100.0], "historical_outlier") + assert state.historical_baseline is not None + assert state.historical_severity in {"mild", "medium", "severe"} + # baseline is always below gauge (gap-based construction) + assert state.gauge_value > state.historical_baseline, ( + f"gauge_value ({state.gauge_value}) must exceed historical_baseline ({state.historical_baseline})" + ) + + def test_simulator_does_not_set_baseline_for_normal(self): + from servers.robot.simulator import PhysicalStateSimulator + sim = PhysicalStateSimulator(seed=42) + state = sim.generate_scenario("chiller_6", [0.0, 100.0], "normal") + assert state.historical_baseline is None + + def test_verifier_historical_outlier_annotation_fires(self, verifier): + # Verify the verifier contract with severe-range values (large gap, low baseline): + result = verifier.verify( + readings=[88.0, 90.0, 92.0], + iot_value=90.0, # IoT agrees (A is fine) + gauge_range=[0.0, 100.0], + historical_baseline=3.0, # explicit low baseline → H < 0.15 + ) + assert any("FM-7c" in ann for ann in result.fm_annotations) + assert result.H < 0.15 + + @pytest.mark.anyio + async def test_commit_reading_uses_simulator_baseline_not_iot(self): + """commit_reading() must use simulator historical_baseline when present, + making the historical outlier annotation independent of CouchDB content.""" + import servers.robot.main as robot_main + from unittest.mock import MagicMock, patch + + # Seed a historical_outlier scenario with explicit severe baseline + robot_main._simulator.generate_scenario("chiller_6", [0.0, 100.0], "historical_outlier") + state = robot_main._simulator._state["chiller_6"] + # Force severe-range values so the historical outlier annotation fires + state.historical_baseline = 3.0 + state.gauge_value = 90.0 + + _PROFILE = { + "_id": "profile:chiller_6", + "gauge_range": [0.0, 100.0], + "gauge_value": 90.0, + } + mock_db = MagicMock() + mock_db.get.side_effect = lambda doc_id: _PROFILE if "profile:" in doc_id else None + mock_db.find.return_value = {"docs": [{"asset_id": "Chiller 6", "timestamp": "t", "Pressure": 90.0}]} + mock_db.save.return_value = {"ok": True, "id": "x", "rev": "1"} + + from servers.robot.tests.conftest import call_tool + from servers.robot.main import mcp + + with patch("servers.robot.main.db", mock_db): + data = await call_tool( + mcp, + "commit_reading", + { + "asset_id": "Chiller 6", + "readings": [88.0, 90.0, 92.0], + "decision": "raise_work_order", + }, + ) + + # Historical outlier annotation should fire because simulator baseline (3.0) was used, not IoT + assert any("FM-7c" in ann for ann in data.get("fm_annotations", [])), ( + f"Expected historical outlier annotation. Got fm_annotations={data.get('fm_annotations')}, " + f"H={data.get('H')}" + ) + + def test_severe_scenario_h_lt_015(self): + """Severe historical_outlier must guarantee H < 0.15 by gap-based construction.""" + from servers.robot.simulator import PhysicalStateSimulator + failures = [] + for seed in range(200): + sim = PhysicalStateSimulator(seed=seed) + for _ in range(10): + s = sim.generate_scenario("asset", [0.0, 100.0], "historical_outlier") + if s.historical_severity == "severe": + H = 1.0 - abs(s.gauge_value - s.historical_baseline) / 100.0 + if H >= 0.15: + failures.append((seed, s.gauge_value, s.historical_baseline, H)) + assert failures == [], ( + f"Severe scenarios with H >= 0.15 found (should be impossible by construction): {failures[:3]}" + ) + + def test_medium_scenario_h_range(self): + """Medium historical_outlier must have H in [0.18, 0.36].""" + from servers.robot.simulator import PhysicalStateSimulator + failures = [] + for seed in range(200): + sim = PhysicalStateSimulator(seed=seed) + for _ in range(10): + s = sim.generate_scenario("asset", [0.0, 100.0], "historical_outlier") + if s.historical_severity == "medium": + H = 1.0 - abs(s.gauge_value - s.historical_baseline) / 100.0 + if not (0.14 <= H <= 0.40): + failures.append((seed, s.gauge_value, s.historical_baseline, round(H, 4))) + assert failures == [], ( + f"Medium scenarios outside expected H range found: {failures[:3]}" + ) + + def test_mild_scenario_h_range(self): + """Mild historical_outlier must have H in [0.35, 0.70].""" + from servers.robot.simulator import PhysicalStateSimulator + failures = [] + for seed in range(200): + sim = PhysicalStateSimulator(seed=seed) + for _ in range(10): + s = sim.generate_scenario("asset", [0.0, 100.0], "historical_outlier") + if s.historical_severity == "mild": + H = 1.0 - abs(s.gauge_value - s.historical_baseline) / 100.0 + if not (0.30 <= H <= 0.75): + failures.append((seed, s.gauge_value, s.historical_baseline, round(H, 4))) + assert failures == [], ( + f"Mild scenarios outside expected H range found: {failures[:3]}" + ) diff --git a/src/servers/robot/verifier.py b/src/servers/robot/verifier.py new file mode 100644 index 000000000..afe98a1e1 --- /dev/null +++ b/src/servers/robot/verifier.py @@ -0,0 +1,133 @@ +"""MultiReadingVerifier — CP-calibrated commit gate. + +Formula (updated from research analysis): + score = 0.35*C + 0.35*A + 0.30*H + + C: multi-read consistency = 1 - std(readings) / gauge_range_span + A: gauge-vs-IoT agreement = 1 - |mean(readings) - iot_value| / span + H: historical consistency = 1 - |mean(readings) - hist_baseline| / span + (H = 0.50 neutral when no history is available) + +Hard gates (applied before scoring): + N < 3 → BLOCKED (FM-7b) + std(readings) < 0.1% range → PANEL_RECHECK (FM-1: sensor freeze) + +Thresholds (placeholder until field-visit calibration): + score ≥ 0.82 → COMMIT + score ≥ 0.65 → ESCALATE + else → OOD_FLAG + +Note on Q (agent self-confidence): + Q was removed from the formula because LLM self-confidence is poorly + calibrated and creating a gaming surface. Q is recorded diagnostically + in commit documents but does not influence the gate. +""" + +import statistics +from dataclasses import dataclass, field +from typing import Optional + + +@dataclass +class VerifierResult: + status: str # COMMIT | BLOCKED | ESCALATE | OOD_FLAG | PANEL_RECHECK + score: float + C: float + A: float + H: float + fm_flag: Optional[str] + fm_annotations: list + reason: str + + +class MultiReadingVerifier: + TAU_COMMIT = 0.82 + TAU_ESCALATE = 0.65 + FREEZE_EPSILON = 0.001 # std < 0.1% of range → sensor freeze + + def verify( + self, + readings: list, + iot_value: float, + gauge_range: list, + historical_baseline: Optional[float] = None, + ) -> VerifierResult: + range_span = float(gauge_range[1]) - float(gauge_range[0]) + if range_span <= 0: + range_span = 1.0 + + # Hard gate: insufficient readings + if len(readings) < 3: + return VerifierResult( + status="BLOCKED", + score=0.0, + C=0.0, A=0.0, H=0.5, + fm_flag="FM-7b", + fm_annotations=["FM-7b: fewer than 3 readings before commit"], + reason=( + f"N={len(readings)} readings supplied; " + "minimum 3 required before commit" + ), + ) + + # Hard gate: sensor freeze (zero variance → stuck gauge or panel) + std_val = statistics.stdev(readings) if len(readings) > 1 else 0.0 + if std_val < self.FREEZE_EPSILON * range_span: + return VerifierResult( + status="PANEL_RECHECK", + score=0.0, + C=0.0, A=0.0, H=0.5, + fm_flag="FM-1", + fm_annotations=[ + "FM-1: sensor freeze — readings show no variance across attempts" + ], + reason="Zero variance across readings; panel may be stuck or gauge frozen", + ) + + mean_r = statistics.mean(readings) + + C = max(0.0, 1.0 - std_val / range_span) + A = max(0.0, 1.0 - abs(mean_r - iot_value) / range_span) + H = ( + max(0.0, 1.0 - abs(mean_r - historical_baseline) / range_span) + if historical_baseline is not None + else 0.5 + ) + + score = round(0.35 * C + 0.35 * A + 0.30 * H, 4) + + fm_annotations = [] + if A < 0.15: + fm_annotations.append( + "FM-7: sensor-physical contradiction (reading vs IoT sensor)" + ) + if historical_baseline is not None and H < 0.15: + fm_annotations.append( + "FM-7c: historical outlier (reading contradicts asset baseline)" + ) + + fm_flag = fm_annotations[0].split(":")[0] if fm_annotations else None + + if score >= self.TAU_COMMIT: + status = "COMMIT" + elif score >= self.TAU_ESCALATE: + status = "ESCALATE" + else: + status = "OOD_FLAG" + if not fm_flag: + fm_flag = "FM-7" + + reason = f"score={score:.3f} (C={C:.2f}, A={A:.2f}, H={H:.2f})" + if fm_annotations: + reason += "; " + "; ".join(fm_annotations) + + return VerifierResult( + status=status, + score=score, + C=round(C, 4), + A=round(A, 4), + H=round(H, 4), + fm_flag=fm_flag, + fm_annotations=fm_annotations, + reason=reason, + )