diff --git a/chatbot_files.zip b/chatbot_files.zip new file mode 100644 index 000000000..b9e6e91fd Binary files /dev/null and b/chatbot_files.zip differ diff --git a/chatbot_updates.zip b/chatbot_updates.zip new file mode 100644 index 000000000..a7ae59528 Binary files /dev/null and b/chatbot_updates.zip differ diff --git a/library/config/.esim/workspace.txt b/library/config/.esim/workspace.txt new file mode 100644 index 000000000..49a209513 --- /dev/null +++ b/library/config/.esim/workspace.txt @@ -0,0 +1 @@ +0 C:\Users\Dell\eSim-Workspace \ No newline at end of file diff --git a/scratch3.py b/scratch3.py new file mode 100644 index 000000000..b2849a088 --- /dev/null +++ b/scratch3.py @@ -0,0 +1,19 @@ +import sys, base64 +from PyQt5.QtWidgets import QApplication, QMainWindow +from PyQt5.QtCore import QUrl + +app = QApplication(sys.argv) +from src.frontEnd.Chatbot import ChatbotGUI + +gui = ChatbotGUI() + +img_key = "test_key" +with open("images/chatbot.png", "rb") as f: + raw = f.read() +b64 = base64.b64encode(raw).decode('utf-8') +gui._images_store = {img_key: [("chatbot.png", b64)]} + +url = QUrl(f"imageview://{img_key}/0") +print("Emitting link click...") +gui._handle_link_click(url) +print("Done!") diff --git a/src/chatbot/__init__.py b/src/chatbot/__init__.py index 2157cc829..01fe96212 100644 --- a/src/chatbot/__init__.py +++ b/src/chatbot/__init__.py @@ -2,10 +2,4 @@ eSim Chatbot Package """ -from .chatbot_core import handle_input, ESIMCopilotWrapper, analyze_schematic - -__all__ = [ - 'handle_input', - 'ESIMCopilotWrapper', - 'analyze_schematic' -] +__all__ = [] diff --git a/src/chatbot/chatbot_core.py b/src/chatbot/chatbot_core.py deleted file mode 100644 index 24b8eef09..000000000 --- a/src/chatbot/chatbot_core.py +++ /dev/null @@ -1,701 +0,0 @@ -# chatbot_core.py - -import os -import re -import json -from typing import Dict, Any, Tuple, List -from .error_solutions import get_error_solution -from .image_handler import analyze_and_extract -from .ollama_runner import run_ollama -from .knowledge_base import search_knowledge -from .ollama_runner import get_embedding - -# ==================== ESIM WORKFLOW KNOWLEDGE ==================== - -ESIM_WORKFLOWS = """ -=== COMMON ESIM WORKFLOWS === - -HOW TO ADD GROUND: -1. In KiCad schematic, press 'A' key (Add Component) -2. Type "GND" in the search box -3. Select ground symbol from "power" library -4. Click to place it on schematic -5. Press 'W' to add wire and connect to circuit -6. Save (Ctrl+S) → eSim: Simulation → Convert KiCad to NgSpice - -HOW TO ADD ANY COMPONENT: -1. In KiCad schematic, press 'A' key -2. Type component name (e.g., "Q2N3904", "1N4148", "uA741") -3. Select from appropriate library (eSim_Devices, eSim_Subckt, etc.) -4. Place on schematic and connect with wires -5. Save → Convert KiCad to NgSpice - -HOW TO FIX MISSING SPICE MODELS (3 Methods): - -Method 1 - Direct Netlist Edit (FASTEST, but temporary): -1. eSim: Tools → Spice Editor (or Ctrl+E) -2. Open your_project.cir.out file -3. Scroll to bottom (before .end line) -4. Add model definition: - BJT: .model Q2N3904 NPN(Bf=200 Is=1e-14 Vaf=100) - Diode: .model 1N4148 D(Is=1e-14 Rs=1) - Zener: .model DZ5V1 D(Is=1e-14 Bv=5.1 Ibv=5m) -5. Save (Ctrl+S) → Run Simulation -NOTE: This gets overwritten when you "Convert KiCad to NgSpice" again - -Method 2 - Component Properties (PERMANENT): -1. Open KiCad schematic (double-click .proj in Project Explorer) -2. Find the component that uses the missing model (e.g., transistor Q1) -3. Right-click on it → Properties (or press E when hovering over it) -4. Click "Edit Spice Model" button in the Properties dialog -5. In the Spice Model field, paste the model definition: - .model Q2N3904 NPN(Bf=200 Is=1e-14 Vaf=100) -6. Click OK → Save schematic (Ctrl+S) -7. eSim: Simulation → Convert KiCad to NgSpice -NOTE: This permanently associates the model with the component - -Method 3 - Include Library: -1. Spice Editor → Open .cir.out -2. Add at top: .include /usr/share/ngspice/models/bjt.lib -3. Save → Simulate - -HOW TO FIX MISSING SUBCIRCUITS: -1. Spice Editor → Open .cir.out -2. Add before .end: - .subckt OPAMP_IDEAL inp inn out vdd vss - Rin inp inn 1Meg - E1 out 0 inp inn 100000 - Rout out 0 75 - .ends -3. Save → Simulate -OR: Replace with eSim library opamp (uA741, LM324) - -HOW TO FIX FLOATING NODES: -1. Open KiCad schematic -2. Find the unconnected pin/node -3. Either connect it with wire (press W) or delete component -4. For sense points: Add Rleak node 0 1Meg -5. Save → Convert to NgSpice - -KICAD SHORTCUTS: -A = Add component -W = Add wire -M = Move item -R = Rotate item -C = Copy item -Delete = Remove item -Ctrl+S = Save - -ESIM MENU PATHS: -Convert to NgSpice: Simulation → Convert KiCad to NgSpice -Run Simulation: Simulation → Simulate -Spice Editor: Tools → Spice Editor (Ctrl+E) -Model Editor: Tools → Model Editor -Open KiCad: Double-click .proj file in Project Explorer - -FILE LOCATIONS: -Project folder: ~/eSim-Workspace// -Netlist: .cir.out -Schematic: .proj -""" - -LAST_BOT_REPLY: str = "" -LAST_IMAGE_CONTEXT: Dict[str, Any] = {} -LAST_NETLIST_ISSUES: Dict[str, Any] = {} - - -def get_history() -> Dict[str, Any]: - return LAST_IMAGE_CONTEXT - - -def clear_history() -> None: - global LAST_IMAGE_CONTEXT, LAST_NETLIST_ISSUES - LAST_IMAGE_CONTEXT = {} - LAST_NETLIST_ISSUES = {} - -# ==================== ESIM ERROR LOGIC ==================== - -def answer_with_rag_fallback(user_input: str) -> str: - """ - Try to answer using eSim manuals (RAG). - If nothing relevant is found, fallback to Ollama. - """ - - rag_context = search_knowledge(user_input) - - if rag_context.strip(): - prompt = f""" -You are eSim Copilot. - -Use ONLY the following official eSim documentation -to answer the question. Do NOT invent information. - -{rag_context} - -Question: -{user_input} - -Answer clearly and step-by-step. -""" - return run_ollama(prompt) - - # Fallback: general LLM answer - prompt = f""" -Answer the following question clearly: - -{user_input} -""" - return run_ollama(prompt) - -def detect_esim_errors(image_context: Dict[str, Any], user_input: str) -> str: - """ - Display errors from hybrid analysis with SMART FILTERING to remove hallucinations. - """ - if not image_context: - return "" - - analysis = image_context.get("circuit_analysis", {}) - raw_errors = analysis.get("design_errors", []) - warnings = analysis.get("design_warnings", []) - - # === SMART FILTERING === - components_str = str(image_context.get("components", [])).lower() - summary_str = str(image_context.get("vision_summary", "")).lower() - context_text = components_str + summary_str - - filtered_errors: List[str] = [] - for err in raw_errors: - err_lower = err.lower() - - if "ground" in err_lower and ( - "gnd" in context_text or "ground" in context_text or " 0 " in context_text - ): - continue - - if "floating" in err_lower and ( - "vin" in err_lower or "vout" in err_lower or "label" in err_lower - ): - continue - - filtered_errors.append(err) - - output: List[str] = [] - - if filtered_errors: - output.append("**🚨 CRITICAL ERRORS:**") - for err in filtered_errors: - output.append(f"❌ {err}") - - if warnings: - output.append("\n**⚠️ WARNINGS:**") - for warn in warnings: - output.append(f"⚠️ {warn}") - - text = user_input.lower() - if "singular matrix" in text: - output.append("\n**🔧 FIX:** Add 1GΩ resistors to all nodes → GND") - if "timestep" in text: - output.append("\n**🔧 FIX:** Reduce timestep or add 0.1Ω series R") - - if not output: - return "**✅ No errors detected**" - - return "\n".join(output) - - -# ==================== UTILITIES ==================== - -VALID_EXTS = (".png", ".jpg", ".jpeg", ".bmp", ".tiff", ".gif") - - -def _is_image_file(path: str) -> bool: - if not path: - return False - clean = re.sub(r"\[Image:\s*(.*?)\]", r"\1", path).strip() - return clean.lower().endswith(VALID_EXTS) - - -def _is_image_query(user_input: str) -> bool: - if not user_input: - return False - if "[Image:" in user_input: - return True - if "|" in user_input: - parts = user_input.split("|", 1) - if len(parts) == 2 and _is_image_file(parts[1]): - return True - return _is_image_file(user_input) - - -def _parse_image_query(user_input: str) -> Tuple[str, str]: - user_input = user_input.strip() - - match = re.search(r"\[Image:\s*(.*?)\]", user_input) - if match: - return user_input.replace(match.group(0), "").strip(), match.group(1).strip() - - if "|" in user_input: - q, p = [x.strip() for x in user_input.split("|", 1)] - if _is_image_file(p): - return q, p - if _is_image_file(q): - return p, q - - if _is_image_file(user_input): - return "", user_input - - return user_input, "" - - -def clean_response_raw(raw: str) -> str: - cleaned = re.sub(r"<\|.*?\|>", "", raw.strip()) - cleaned = re.sub(r"\[Context:.*?\]", "", cleaned, flags=re.DOTALL) - cleaned = re.sub(r"\[FACT .*?\]", "", cleaned, flags=re.MULTILINE) - cleaned = re.sub( - r"\[ESIM_NETLIST_START\].*?\[ESIM_NETLIST_END\]", "", cleaned, flags=re.DOTALL - ) - return cleaned.strip() - - -def _history_to_text(history: List[Dict[str, str]] | None, max_turns: int = 6) -> str: - """Convert history to readable text with MORE context (6 turns).""" - if not history: - return "" - recent = history[-max_turns:] - lines: List[str] = [] - for i, t in enumerate(recent, 1): - u = (t.get("user") or "").strip() - b = (t.get("bot") or "").strip() - if u: - lines.append(f"[Turn {i}] User: {u}") - if b: - if len(b) > 300: - b = b[:300] + "..." - lines.append(f"[Turn {i}] Assistant: {b}") - return "\n".join(lines).strip() - - -def _is_follow_up_question(user_input: str, history: List[Dict[str, str]] | None) -> bool: - """ - Detect if this is a follow-up question that needs history context. - Returns True if question lacks standalone context. - """ - if not history: - return False - - user_lower = user_input.lower().strip() - words = user_lower.split() - - - if len(words) <= 7: - return True - - pronouns = ["it", "that", "this", "those", "these", "they", "them"] - if any(pronoun in words for pronoun in pronouns): - return True - - continuations = [ - "what next", "next step", "after that", "and then", "then what", - "what about", "how about", "what if", "but why", "why not" - ] - if any(phrase in user_lower for phrase in continuations): - return True - - question_starters = ["why", "how", "where", "when", "what", "which"] - if words[0] in question_starters and len(words) <= 5: - return True - - return False -import numpy as np - -def is_semantic_topic_switch( - user_input: str, - history: list, - threshold: float = 0.30 -) -> bool: - """ - Detect topic switch using embedding similarity. - Returns True if new question is unrelated to previous assistant reply. - """ - - if not history: - return False - - last_assistant_msg = None - for item in reversed(history): - if item.get("role") == "assistant": - last_assistant_msg = item.get("content") - break - - if not last_assistant_msg: - return False - - try: - emb_new = get_embedding(user_input) - emb_prev = get_embedding(last_assistant_msg) - - if not emb_new or not emb_prev: - return False - - emb_new = np.array(emb_new) - emb_prev = np.array(emb_prev) - - similarity = np.dot(emb_new, emb_prev) / ( - np.linalg.norm(emb_new) * np.linalg.norm(emb_prev) - ) - - print(f"[COPILOT] Semantic similarity = {similarity:.3f}") - - return similarity < threshold - - except Exception as e: - print(f"[COPILOT] Topic switch check failed: {e}") - return False - -# ==================== QUESTION CLASSIFICATION ==================== - -def classify_question_type(user_input: str, has_image_context: bool, - history: List[Dict[str, str]] | None = None) -> str: - """ - Classify question type for smart routing. - Returns: 'greeting', 'simple', 'esim', 'image_query', 'follow_up_image', - 'follow_up', 'netlist' - """ - user_lower = user_input.lower() - - if "[ESIM_NETLIST_START]" in user_input: - return "netlist" - - if _is_image_query(user_input): - return "image_query" - - if has_image_context: - follow_phrases = [ - "this circuit", "that circuit", "in this schematic", - "components here", "what is the value", "how many", - "the circuit", "this schematic","what","can","how" - ] - if any(p in user_lower for p in follow_phrases): - return "follow_up_image" - - greetings = ["hello", "hi", "hey", "howdy", "greetings"] - user_words = user_lower.strip().split() - if len(user_words) <= 3 and any(g in user_words for g in greetings): - return "greeting" - - is_followup = _is_follow_up_question(user_input, history) - if is_semantic_topic_switch(user_input, history): - print("[COPILOT] Topic switch detected (semantic)") - is_followup = False - - if not is_followup: - history.clear() - LAST_IMAGE_CONTEXT = None - - esim_keywords = [ - "esim", "kicad", "ngspice", "spice", "simulation", "netlist", - "schematic", "convert", "gnd", "ground", ".model", ".subckt", - "singular matrix", "floating", "timestep", "convergence" - ] - if any(keyword in user_lower for keyword in esim_keywords): - return "esim" - - error_keywords = [ - "error", "fix", "problem", "issue", "warning", "missing", - "not working", "failed", "crash" - ] - if any(keyword in user_lower for keyword in error_keywords): - return "esim" - - return "simple" - - -# ==================== HANDLERS ==================== - -def handle_greeting() -> str: - return ( - "Hello! I'm eSim Copilot. I can help you with:\n" - "• Circuit analysis and netlist debugging\n" - "• Electronics concepts and SPICE simulation\n" - "• Component selection and circuit design\n\n" - "What would you like to know?" - ) - - -def handle_simple_question(user_input: str) -> str: - """ - Handles standalone questions. - Uses RAG first, then falls back to Ollama. - keep in mind that your a copilot of eSim an EDA tool - """ - return answer_with_rag_fallback(user_input) - - -def handle_follow_up(user_input: str, - image_context: Dict[str, Any], - history: List[Dict[str, str]] | None = None) -> str: - """ - Handle follow-up questions that depend on conversation history. - This handler PRIORITIZES history over RAG. - """ - history_text = _history_to_text(history, max_turns=6) - - if not history_text: - return "I need more context. Could you provide more details about your question?" - - rag_context = "" - user_lower = user_input.lower() - if any(kw in user_lower for kw in ["model", "spice", "ground", "error", "netlist"]): - rag_context = search_knowledge(user_input, n_results=2) - - prompt = ( - "You are an eSim expert assistant. The user is asking a follow-up question.\n\n" - "=== CONVERSATION HISTORY (MOST IMPORTANT) ===\n" - f"{history_text}\n" - "=============================================\n\n" - f"=== CURRENT USER QUESTION (FOLLOW-UP) ===\n{user_input}\n\n" - ) - - if rag_context: - prompt += f"=== REFERENCE MANUAL (if needed) ===\n{rag_context}\n\n" - - if image_context: - prompt += ( - f"=== CURRENT CIRCUIT CONTEXT ===\n" - f"Type: {image_context.get('circuit_analysis', {}).get('circuit_type', 'Unknown')}\n" - f"Components: {image_context.get('components', [])}\n\n" - ) - - prompt += ( - "CRITICAL INSTRUCTIONS:\n" - "1. The user's question refers to the CONVERSATION HISTORY above.\n" - "2. Identify what 'it', 'that', 'this', or 'next step' refers to by reading the history.\n" - "3. Answer based on the conversation context first, then use manual/workflows if needed.\n" - "4. If the user asks 'why', explain based on what was just discussed.\n" - "5. If the user asks 'what next' or 'next step', continue from the last instruction.\n" - "6. Be specific and reference what you're talking about (e.g., 'In the previous step, I mentioned...').\n" - "7. Keep answer concise (max 150 words).\n\n" - "Answer:" - ) - - return run_ollama(prompt, mode="default") - - -def handle_esim_question(user_input: str, - image_context: Dict[str, Any], - history: List[Dict[str, str]] | None = None) -> str: - """ - Handle eSim-specific questions with RAG + conversation history. - """ - user_lower = user_input.lower() - - sol = get_error_solution(user_input) - if sol and sol.get("description") != "General schematic error": - fixes = "\n".join(f"- {f}" for f in sol.get("fixes", [])) - cmd = sol.get("eSim_command", "") - answer = ( - f"**Detected issue:** {sol['description']}\n" - f"**Severity:** {sol.get('severity', 'unknown')}\n\n" - f"**Recommended fixes:**\n{fixes}\n\n" - ) - if cmd: - answer += f"**eSim action:** {cmd}\n" - return answer_with_rag_fallback(user_input) - - history_text = _history_to_text(history, max_turns=6) - - rag_context = search_knowledge(user_input, n_results=5) - - image_context_str = "" - if image_context: - image_context_str = ( - f"\n=== CURRENT CIRCUIT ===\n" - f"Type: {image_context.get('circuit_analysis', {}).get('circuit_type', 'Unknown')}\n" - f"Components: {image_context.get('components', [])}\n" - f"Values: {image_context.get('values', {})}\n" - ) - - prompt = ( - "You are an eSim expert. Answer using the workflows, manual, and conversation history.\n\n" - f"{ESIM_WORKFLOWS}\n\n" - f"=== MANUAL CONTEXT ===\n{rag_context}\n" - f"{image_context_str}\n" - ) - - if history_text: - prompt += f"=== CONVERSATION HISTORY ===\n{history_text}\n\n" - - prompt += ( - f"USER QUESTION: {user_input}\n\n" - "INSTRUCTIONS:\n" - "1. If the question refers to previous conversation, use the history.\n" - "2. Use exact menu paths and shortcuts from the workflows when relevant.\n" - "3. If the manual context does not contain the answer, say you need to check the manual.\n" - "4. Keep the answer concise (max 150 words).\n\n" - "Answer:" - ) - - return run_ollama(prompt, mode="default") - - -def handle_image_query(user_input: str) -> Tuple[str, Dict[str, Any]]: - """ - Handle image analysis queries. - Returns: (response_text, image_context_dict) - """ - question, image_path = _parse_image_query(user_input) - image_path = image_path.strip("'\"").strip() - - if not image_path or not os.path.exists(image_path): - return f"Error: Image not found: {image_path}", {} - - extraction = analyze_and_extract(image_path) - - if extraction.get("error"): - return f"Analysis Failed: {extraction['error']}", {} - - if not question: - error_report = detect_esim_errors(extraction, "") - - summary = ( - "**Image Analysis Complete**\n" - f"**Type:** {extraction.get('circuit_analysis', {}).get('circuit_type', 'Unknown')}\n" - f"**Components:** {extraction.get('component_counts', {})}\n" - f"**Description:** {extraction.get('vision_summary', '')}\n\n" - ) - - if extraction.get("components"): - summary += f"**Detected Components:** {', '.join(extraction['components'])}\n" - - if extraction.get("values"): - summary += "**Component Values:**\n" - for comp, val in extraction["values"].items(): - summary += f" • {comp}: {val}\n" - - summary += ( - "\n**Note:** Vision analysis may have errors. Use 'Analyze netlist' for precise results.\n" - ) - - if "🚨" in error_report or "⚠️" in error_report: - summary += f"\n{error_report}" - - return summary, extraction - - return handle_follow_up_image_question(question, extraction), extraction - - -def handle_follow_up_image_question(user_input: str, - image_context: Dict[str, Any]) -> str: - """ - Answer questions about an analyzed image using ONLY extracted data. - """ - image_context_str = ( - f"**Circuit Type:** {image_context.get('circuit_analysis', {}).get('circuit_type', 'Unknown')}\n" - f"**Components Detected:** {image_context.get('components', [])}\n" - f"**Component Values:** {image_context.get('values', {})}\n" - f"**Component Counts:** {image_context.get('component_counts', {})}\n" - f"**Description:** {image_context.get('vision_summary', '')}\n" - ) - - prompt = ( - "You are analyzing a circuit schematic. Answer using ONLY the circuit data below.\n\n" - "=== ANALYZED CIRCUIT DATA ===\n" - f"{image_context_str}\n" - "==============================\n\n" - f"USER QUESTION: {user_input}\n\n" - "STRICT INSTRUCTIONS:\n" - "1. Answer ONLY using the circuit data above - DO NOT use external knowledge.\n" - "2. For counts: use 'Component Counts'.\n" - "3. For values: use 'Component Values'.\n" - "4. For lists: use 'Components Detected'.\n" - "5. If data is missing, answer: 'The image analysis did not detect that information.'\n" - "6. Keep answer brief (2-3 sentences).\n\n" - "Answer:" - ) - - return run_ollama(prompt, mode="default") - - -def handle_netlist_analysis(user_input: str) -> str: - """ - Handle netlist analysis prompts (FACT-based prompt from GUI). - """ - raw_reply = run_ollama(user_input) - return clean_response_raw(raw_reply) - - -# ==================== MAIN ROUTER ==================== - -def handle_input(user_input: str, - history: List[Dict[str, str]] | None = None) -> str: - """ - Main router. Accepts optional conversation history for follow-up understanding. - """ - global LAST_IMAGE_CONTEXT, LAST_BOT_REPLY - - user_input = (user_input or "").strip() - if not user_input: - return "Please enter a query." - - if "[ESIM_NETLIST_START]" in user_input: - raw_reply = run_ollama(user_input) - cleaned = clean_response_raw(raw_reply) - LAST_BOT_REPLY = cleaned - return cleaned - - question_type = classify_question_type( - user_input, bool(LAST_IMAGE_CONTEXT), history - ) - print(f"[COPILOT] Question type: {question_type}") - - try: - if question_type == "netlist": - response = handle_netlist_analysis(user_input) - - elif question_type == "greeting": - response = handle_greeting() - - elif question_type == "image_query": - response, LAST_IMAGE_CONTEXT = handle_image_query(user_input) - - elif question_type == "follow_up_image": - response = handle_follow_up_image_question(user_input, LAST_IMAGE_CONTEXT) - - elif question_type == "simple": - response = handle_simple_question(user_input) - - elif question_type == "follow_up" and history: - response = handle_follow_up(user_input, LAST_IMAGE_CONTEXT, history) - else: - response = handle_simple_question(user_input) - - LAST_BOT_REPLY = response - return response - - except Exception as e: - error_msg = f"Error processing question: {str(e)}" - print(f"[COPILOT ERROR] {error_msg}") - return error_msg - - -# ==================== WRAPPER ==================== - -class ESIMCopilotWrapper: - def __init__(self) -> None: - self.history: List[Dict[str, str]] = [] - - def handle_input(self, user_input: str) -> str: - reply = handle_input(user_input, self.history) - self.history.append({"user": user_input, "bot": reply}) - if len(self.history) > 12: - self.history = self.history[-12:] - return reply - - def analyze_schematic(self, query: str) -> str: - return self.handle_input(query) - -_GLOBAL_WRAPPER = ESIMCopilotWrapper() - - -def analyze_schematic(query: str) -> str: - return _GLOBAL_WRAPPER.handle_input(query) diff --git a/src/chatbot/chatbot_thread.py b/src/chatbot/chatbot_thread.py index f134d4f57..c8cce620b 100644 --- a/src/chatbot/chatbot_thread.py +++ b/src/chatbot/chatbot_thread.py @@ -28,7 +28,7 @@ # llava internally resizes images to 336×336 anyway. # Downscaling large images before sending saves encoding time and reduces # the number of tokens the model spends on the image. -_MAX_IMAGE_DIM = 512 # pixels on longest side — fast, good quality +_MAX_IMAGE_DIM = 336 # llava's native patch size — no benefit sending larger def _downscale_image_bytes(raw_bytes: bytes) -> bytes: @@ -51,23 +51,14 @@ def _downscale_image_bytes(raw_bytes: bytes) -> bytes: if img.mode not in ("RGB", "L"): img = img.convert("RGB") buf = _io.BytesIO() - img.save(buf, format="JPEG", quality=85) + img.save(buf, format="JPEG", quality=70) return buf.getvalue() except Exception: return raw_bytes # fall back to original on any error # ── Connectivity / runtime helpers ─────────────────────────────────────────── - -def _check_internet(host="8.8.8.8", port=53, timeout=2): - try: - socket.setdefaulttimeout(timeout) - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect((host, port)) - sock.close() - return True - except Exception: - return False +# REMOVED: _check_internet() — dead code, never called anywhere in the codebase def get_stt_backend() -> str: @@ -139,6 +130,22 @@ def detect_topic_switch(prev_text: str, curr_text: str) -> bool: # ── Model / service background workers ─────────────────────────────────────── +def _ensure_ollama_running(worker) -> bool: + """Helper to start Ollama server if it isn't running. Returns True if ready.""" + if not is_ollama_running(): + worker.status_signal.emit("Starting Ollama server — please wait…") + started = start_ollama(stop_flag=lambda: worker._stop_requested) + if not started: + if not worker._stop_requested: + worker.response_signal.emit( + "❌ Could not start Ollama automatically.\n" + "Please open a terminal and run: ollama serve" + ) + return False + worker.status_signal.emit("Ollama started!") + time.sleep(1) + return True + class OllamaStatusWorker(QThread): result_signal = pyqtSignal(bool) @@ -146,30 +153,38 @@ def run(self): self.result_signal.emit(is_ollama_running()) +# EXTRACTED: shared model-name parser used by both ModelFetchWorker and _refresh_model_cache +def _fetch_model_names() -> list: + """Call ollama.list() and return a flat list of model name strings.""" + models_data = ollama.list() + raw = (models_data.get('models', []) + if isinstance(models_data, dict) + else getattr(models_data, 'models', [])) + + names = [] + for m in raw: + name = (m.get('name') or m.get('model', '') + if isinstance(m, dict) + else getattr(m, 'model', str(m))) + if name: + names.append(name) + return names + + class ModelFetchWorker(QThread): result_signal = pyqtSignal(list) def run(self): try: - models_data = ollama.list() - raw = (models_data.get('models', []) - if isinstance(models_data, dict) - else getattr(models_data, 'models', [])) - - names = [] - for m in raw: - name = (m.get('name') or m.get('model', '') - if isinstance(m, dict) - else getattr(m, 'model', str(m))) - if name: - names.append(name) + # MERGED: uses shared _fetch_model_names() instead of inline duplicate + names = _fetch_model_names() # Keep the vision model cache warm so image sends don't block _refresh_model_cache() - self.result_signal.emit(names if names else ['qwen2.5-coder:3b']) + self.result_signal.emit(names if names else []) except Exception: - self.result_signal.emit(['qwen2.5-coder:3b']) + self.result_signal.emit([]) # ── Smart token budget ─────────────────────────────────────────────────────── @@ -275,7 +290,7 @@ class OllamaWorker(QThread): response_signal = pyqtSignal(str) status_signal = pyqtSignal(str) - def __init__(self, chat_history, model="qwen2.5-coder:3b", + def __init__(self, chat_history, model="", temperature=0.25, num_predict=1024): super().__init__() self.chat_history = chat_history @@ -289,19 +304,9 @@ def stop(self): def run(self): try: - if not is_ollama_running(): - self.status_signal.emit("Starting Ollama server — please wait…") - started = start_ollama(stop_flag=lambda: self._stop_requested) - if not started: - if self._stop_requested: - return # user cancelled cleanly - self.response_signal.emit( - "❌ Could not start Ollama automatically.\n" - "Please open a terminal and run: ollama serve" - ) - return - self.status_signal.emit("Ollama started! Getting response…") - time.sleep(1) + if not _ensure_ollama_running(self): + return + self.status_signal.emit("Ollama is ready! Getting response…") # Keep last 10 history lines (5 turns). # Sending 20 lines fills most of the context window before the @@ -361,13 +366,17 @@ def run(self): # ── Vision model helpers ────────────────────────────────────────────────────── +# EXTRACTED: single source of truth for vision-model keywords. +# Imported by Chatbot.py so both files share the same list. +VISION_MODEL_KEYWORDS = ["llava", "bakllava", "vision", "moondream", "minicpm-v", "qwen2-vl"] + + def _is_vision_model(model_name: str) -> bool: if not model_name: return False m = model_name.lower() - return any(k in m for k in [ - "llava", "bakllava", "vision", "moondream", "minicpm-v", "qwen2-vl" - ]) + # MERGED: uses shared VISION_MODEL_KEYWORDS constant + return any(k in m for k in VISION_MODEL_KEYWORDS) # QThread reads/writes don't produce a data race. _cache_lock = threading.Lock() _installed_models_cache: list = [] @@ -377,17 +386,8 @@ def _is_vision_model(model_name: str) -> bool: def _refresh_model_cache(): global _installed_models_cache, _installed_models_cache_valid try: - models_data = ollama.list() - raw = (models_data.get('models', []) - if isinstance(models_data, dict) - else getattr(models_data, 'models', [])) - names = [] - for m in raw: - name = (m.get('name') or m.get('model', '') - if isinstance(m, dict) - else getattr(m, 'model', str(m))) - if name: - names.append(name) + # MERGED: uses shared _fetch_model_names() instead of inline duplicate + names = _fetch_model_names() with _cache_lock: _installed_models_cache = names _installed_models_cache_valid = True @@ -444,23 +444,14 @@ def _pick_best_vision_model(preferred: str = "") -> str: def _build_schematic_vision_prompt(extra_prompt: str, image_count: int) -> str: """ Build the prompt sent to the vision model alongside the image(s). - If the user typed a question, that question drives the response. - If no question was given, request a general circuit analysis. + Kept short to minimize prompt token processing time. """ - n = "this schematic" if image_count == 1 else f"these {image_count} schematics" if extra_prompt and extra_prompt.strip(): - # User asked something specific - make that the primary request. - return ( - f"Looking at {n}: {extra_prompt.strip()}\n\n" - "Base your answer on what is actually visible in the image." - ) + return extra_prompt.strip() else: - # No question given - do a general analysis. return ( - f"Please analyse {n}. " - "Identify the circuit's function, list all visible components with their " - "reference designators and values, name the nets and signal rails, " - "and flag any potential design issues you can see." + "Analyse this circuit image. Identify components, connections, " + "and its function. Flag any design issues." ) @@ -501,11 +492,13 @@ def _chat_once(self, model_name: str, prompt: str, image_bytes_list): ], stream=True, options={ - "temperature": 0.15, - # llava: ~576 tokens/image patch + ~200 prompt + 512 predict. - # 3072 gives comfortable headroom without the overhead of 4096. - "num_ctx": 3072, - "num_predict": 512, + "temperature": 0.1, + # Smaller context = faster KV-cache allocation on CPU. + # 2048 is enough for image patches + short prompt + response. + "num_ctx": 2048, + # Cap output to ~256 tokens for much faster responses. + # Most useful circuit analysis fits well within this budget. + "num_predict": 256, "repeat_penalty": 1.05, "keep_alive": "10m", } @@ -531,19 +524,8 @@ def _chat_once(self, model_name: str, prompt: str, image_bytes_list): def run(self): try: - if not is_ollama_running(): - self.status_signal.emit("Starting Ollama server — please wait…") - started = start_ollama(stop_flag=lambda: self._stop_requested) - if not started: - if self._stop_requested: - return - self.response_signal.emit( - "❌ Could not start Ollama automatically.\n" - "Please open a terminal and run: ollama serve" - ) - return - self.status_signal.emit("Ollama started!") - time.sleep(1) + if not _ensure_ollama_running(self): + return if not self.image_paths: self.response_signal.emit("❌ No image paths provided.") diff --git a/src/chatbot/error_solutions.py b/src/chatbot/error_solutions.py deleted file mode 100644 index 615a3d63c..000000000 --- a/src/chatbot/error_solutions.py +++ /dev/null @@ -1,106 +0,0 @@ -# error_solutions.py -from typing import Dict,Any - -ERROR_SOLUTIONS = { - "no ground": { - "description": "Missing ground reference (Node 0)", - "severity": "critical", - "fixes": [ - "Add GND symbol (0) to schematic", - "Ensure all nodes have DC path to ground", - "Add 1GΩ resistors from floating nodes to GND for simulation stability", - "Use GND symbol from eSim power library" - ], - "eSim_command": "Add 'GND' symbol from 'power' library" - }, - - "floating pins": { - "description": "Unconnected component pins", - "severity": "moderate", - "fixes": [ - "Connect all unused pins to appropriate nets", - "For unused inputs: tie to VCC or GND through resistors", - "For unused outputs: leave unconnected but label properly" - ], - "eSim_command": "Use 'Place Wire' tool to connect pins" - }, - - "disconnected wires": { - "description": "Wires not properly connected to pins", - "severity": "critical", - "fixes": [ - "Zoom in and check wire endpoints touch pins", - "Use junction dots at wire intersections", - "Re-route wires to ensure proper connections" - ], - "eSim_command": "Press 'J' to add junction dots" - }, - - "missing spice model": { - "description": "Component lacks SPICE model definition", - "severity": "critical", - "fixes": [ - "Add .lib statement: .lib /usr/share/esim/models.lib", - "Check IC availability in Components/ICs.pdf", - "Use eSim library components only", - "Create custom model using Model Editor" - ], - "eSim_command": "Add '.lib /usr/share/esim/models.lib' in schematic" - }, - - "singular matrix": { - "description": "Simulation convergence error", - "severity": "critical", - "fixes": [ - "Add 1GΩ resistors from ALL nodes → GND", - "Add .options gmin=1e-12 reltol=0.01", - "Use .nodeset for initial voltages", - "Add 0.1Ω series resistors to voltage sources" - ], - "eSim_command": "Add '.options gmin=1e-12 reltol=0.01' in .cir file" - }, - - "missing component values": { - "description": "Components without specified values", - "severity": "moderate", - "fixes": [ - "Double-click components to edit values", - "Set R, C, L values before simulation", - "For ICs: specify model number", - "For sources: set voltage/current values" - ], - "eSim_command": "Double-click component → Edit Properties → Set Value" - }, - - "no load after rectifier": { - "description": "Rectifier output has no load capacitor", - "severity": "warning", - "fixes": [ - "Add filter capacitor after rectifier (100-1000μF)", - "Add load resistor to establish DC operating point", - "Add voltage regulator for stable output" - ], - "eSim_command": "Add capacitor between rectifier output and GND" - } -} - -def get_error_solution(error_message: str) -> Dict[str, Any]: - """Get detailed solution for specific error.""" - error_lower = error_message.lower() - - for error_key, solution in ERROR_SOLUTIONS.items(): - if error_key in error_lower: - return solution - - # Default solution for unknown errors - return { - "description": "General schematic error", - "severity": "unknown", - "fixes": [ - "Check all connections are proper", - "Verify component values are set", - "Ensure ground symbol is present", - "Check for duplicate component IDs" - ], - "eSim_command": "Run Design Rule Check (DRC) in KiCad" - } diff --git a/src/chatbot/image_handler.py b/src/chatbot/image_handler.py deleted file mode 100644 index cd8744791..000000000 --- a/src/chatbot/image_handler.py +++ /dev/null @@ -1,247 +0,0 @@ -import os -import json -import base64 -import io -import time -from typing import Dict, Any -from PIL import Image -MAX_IMAGE_BYTES = int(0.5*1024 * 1024) -from .ollama_runner import run_ollama_vision - -# === IMPORT PADDLE OCR === -try: - from paddleocr import PaddleOCR - import logging - logging.getLogger("ppocr").setLevel(logging.ERROR) - - # CRITICAL FIX: Disabled MKLDNN and Angle Classification to prevent VM Crashes - ocr_engine = PaddleOCR( - use_angle_cls=False, # <--- MUST BE FALSE TO STOP SIGABRT - lang='en', - use_gpu=False, # Force CPU - enable_mkldnn=False, # <--- MUST BE FALSE FOR PADDLE v3 COMPATIBILITY - use_mp=False, # Disable multiprocessing - show_log=False - ) - HAS_PADDLE = True - print("[INIT] PaddleOCR initialized (Safe Mode).") -except Exception as e: - HAS_PADDLE = False - print(f"[INIT] PaddleOCR init failed: {e}") - print("[INIT] Vision analysis unavailable. Text and netlist analysis still work.") - - -def encode_image(image_path: str) -> str: - """Convert image to base64 string.""" - with open(image_path, "rb") as image_file: - return base64.b64encode(image_file.read()).decode("utf-8") - - -def optimize_image_for_vision(image_path: str) -> bytes: - """ - Resize large images to reduce vision model processing time. - Target: Max 1920x1080 while maintaining aspect ratio. - """ - try: - img = Image.open(image_path) - - if img.mode not in ('RGB', 'L'): - img = img.convert('RGB') - - max_width = 1920 - max_height = 1080 - - if img.width > max_width or img.height > max_height: - # Calculate scaling factor - scale = min(max_width / img.width, max_height / img.height) - new_size = (int(img.width * scale), int(img.height * scale)) - img = img.resize(new_size, Image.Resampling.LANCZOS) - print(f"[IMAGE] Resized from {img.width}x{img.height} to {new_size[0]}x{new_size[1]}") - - # Convert to bytes (PNG format prevents compression artifacts on text) - buffer = io.BytesIO() - img.save(buffer, format='PNG', optimize=True, quality=85) - return buffer.getvalue() - - except Exception as e: - print(f"[IMAGE] Optimization failed: {e}, using original") - with open(image_path, 'rb') as f: - return f.read() - - -def extract_text_with_paddle(image_path: str) -> str: - """Extract text using PaddleOCR (Handles rotated/vertical text excellently).""" - if not HAS_PADDLE: - return "" - try: - result = ocr_engine.ocr(image_path, cls=True) - detected_texts = [] - if result and result[0]: - for line in result[0]: - text = line[1][0] - conf = line[1][1] - - if conf > 0.6: - detected_texts.append(text) - - full_text = " ".join(detected_texts) - return full_text - - except Exception as e: - print(f"[OCR] PaddleOCR Failed: {e}") - return "" - -def analyze_and_extract(image_path: str) -> Dict[str, Any]: - """ - Analyze schematic with image optimization, PaddleOCR text injection, and timeout handling. - Rejects images larger than 0.5 MB. - """ - if not os.path.exists(image_path): - return { - "error": "Image file not found", - "vision_summary": "", - "component_counts": {}, - "circuit_analysis": { - "circuit_type": "Unknown", - "design_errors": [], - "design_warnings": [] - }, - "components": [], - "values": {} - } - - try: - file_size = os.path.getsize(image_path) - except OSError as e: - return { - "error": f"Could not read image size: {e}", - "vision_summary": "", - "component_counts": {}, - "circuit_analysis": { - "circuit_type": "Unknown", - "design_errors": [], - "design_warnings": [] - }, - "components": [], - "values": {} - } - - if file_size > MAX_IMAGE_BYTES: - size_mb = round(file_size / (1024 * 1024), 2) - return { - "error": f"Image too large ({size_mb} MB). Max allowed size is 0.5 MB.", - "vision_summary": "", - "component_counts": {}, - "circuit_analysis": { - "circuit_type": "Unknown", - "design_errors": ["Image file size exceeded 0.5 MB limit"], - "design_warnings": [] - }, - "components": [], - "values": {} - } - - # === OPTIMIZE IMAGE BEFORE SENDING === - print(f"[VISION] Processing image: {os.path.basename(image_path)}") - image_bytes = optimize_image_for_vision(image_path) - - # === EXTRACT OCR TEXT (CRITICAL STEP) === - ocr_text = extract_text_with_paddle(image_path) - - if ocr_text: - clean_ocr = ocr_text.strip() - print(f"[VISION] PaddleOCR Hints injected: {clean_ocr[:100]}...") - else: - clean_ocr = "No readable text detected." - - # === PROMPT WITH CONTEXT === - prompt = f""" -ANALYZE THIS ELECTRONICS SCHEMATIC IMAGE. - -CONTEXT FROM OCR SCAN (Text detected in image): -"{clean_ocr}" - -INSTRUCTIONS: -1. Use the OCR text to identify component labels (e.g., if you see "D1" text, there is a Diode, R1,R2,R3... for resistor). -2. Look for rotated text labels near symbols. -3. Identify the circuit topology. - -VERY IMPORTANT INSTRUCTIONS: -1. DON'T OVERCALCULATE MODEL COUNT LIKE MODEL COUNT + OCR COUNT -2. IF THERE IS ANY VALUE NOT PRESENT FOR ANY COMPONENT JUST ADD A QUESTION MARK IN FRONT OF IT - -OUTPUT RULES: -1. Return ONLY valid JSON. -2. Structure: - - -RESPOND WITH JSON ONLY. -""" - - max_retries = 2 - for attempt in range(max_retries): - try: - print(f"[VISION] Attempt {attempt + 1}/{max_retries}...") - - response_text = run_ollama_vision(prompt, image_bytes) - - cleaned_json = response_text.replace("```json", "").replace("```", "").strip() - - if "{" in cleaned_json and "}" in cleaned_json: - start = cleaned_json.index("{") - end = cleaned_json.rindex("}") + 1 - cleaned_json = cleaned_json[start:end] - - data = json.loads(cleaned_json) - - required_keys = ["vision_summary", "component_counts", "circuit_analysis", "components", "values"] - for key in required_keys: - if key not in data: - raise ValueError(f"Missing required key: {key}") - - if not isinstance(data.get("circuit_analysis"), dict): - data["circuit_analysis"] = {"circuit_type": "Unknown", "design_errors": [], "design_warnings": []} - - if "design_errors" not in data["circuit_analysis"]: - data["circuit_analysis"]["design_errors"] = [] - - if not data.get("component_counts") or all(v == 0 for v in data.get("component_counts", {}).values()): - counts = {"R": 0, "C": 0, "U": 0, "Q": 0, "D": 0, "L": 0, "Misc": 0} - for comp in data.get("components", []): - if isinstance(comp, str) and len(comp) > 0: - comp_type = comp[0].upper() - if comp_type in counts: - counts[comp_type] += 1 - elif "DIODE" in comp.upper() or comp.startswith("D"): - counts["D"] = counts.get("D", 0) + 1 - data["component_counts"] = counts - - if data.get("components"): - data["components"] = list(dict.fromkeys(data["components"])) - - print(f"[VISION] Success: {data.get('circuit_analysis', {}).get('circuit_type', 'Unknown')}") - return data - - except Exception as e: - print(f"[VISION] Attempt {attempt + 1} failed: {str(e)}") - if attempt == max_retries - 1: - return { - "error": f"Vision analysis failed: {str(e)}", - "vision_summary": "Unable to analyze circuit image", - "component_counts": {}, - "circuit_analysis": { - "circuit_type": "Unknown", - "design_errors": ["Analysis timed out or failed"], - "design_warnings": [] - }, - "components": [], - "values": {} - } - else: - import time - time.sleep(2) - - -def analyze_image(image_path: str, question: str | None = None, preprocess: bool = True) -> str: - """Helper for manual testing.""" - return str(analyze_and_extract(image_path)) \ No newline at end of file diff --git a/src/chatbot/knowledge_base.py b/src/chatbot/knowledge_base.py deleted file mode 100644 index 14ea4cc17..000000000 --- a/src/chatbot/knowledge_base.py +++ /dev/null @@ -1,144 +0,0 @@ -import os -import chromadb -from .ollama_runner import get_embedding - -# ==================== DATABASE SETUP ==================== - -def _default_db_path() -> str: - xdg_data_home = os.environ.get("XDG_DATA_HOME", "").strip() - if not xdg_data_home: - xdg_data_home = os.path.join(os.path.expanduser("~"), ".local", "share") - return os.path.join(xdg_data_home, "esim-copilot", "chroma") - -db_path = os.environ.get("ESIM_COPILOT_DB_PATH", "").strip() or _default_db_path() -os.makedirs(db_path, exist_ok=True) -chroma_client = chromadb.PersistentClient(path=db_path) - -collection = chroma_client.get_or_create_collection(name="esim_manuals") - -# ==================== INGESTION ==================== -def ingest_pdfs(manuals_directory: str) -> None: - """ - Read the single master text file and index it. - Call this once from src/ingest.py. - """ - if not os.path.exists(manuals_directory): - print("Directory not found.") - return - - # Clear existing DB to ensure no duplicates from old files - print("Clearing old database...") - try: - chroma_client.delete_collection("esim_manuals") - global collection - collection = chroma_client.get_or_create_collection(name="esim_manuals") - except Exception as e: - print(f"Warning clearing DB: {e}") - - # Look for .txt files only - files = [f for f in os.listdir(manuals_directory) if f.lower().endswith(".txt")] - - if not files: - print("❌ No .txt files found to ingest!") - return - - for filename in files: - path = os.path.join(manuals_directory, filename) - print(f"\n📄 Processing Master File: {filename}") - - try: - with open(path, "r", encoding="utf-8") as f: - text = f.read() - - raw_sections = text.split("\n\n") - - documents, embeddings, metadatas, ids = [], [], [], [] - - chunk_counter = 0 - for section in raw_sections: - section = section.strip() - if len(section) < 50: - continue - - # Further split large sections by double newlines if needed - sub_chunks = [c.strip() for c in section.split("\n\n") if len(c) > 50] - - for chunk in sub_chunks: - embed = get_embedding(chunk) - if embed: - documents.append(chunk) - embeddings.append(embed) - metadatas.append({"source": filename, "type": "master_ref"}) - ids.append(f"{filename}_{chunk_counter}") - chunk_counter += 1 - - if documents: - collection.add( - documents=documents, - embeddings=embeddings, - metadatas=metadatas, - ids=ids, - ) - print(f" ✅ Indexed {len(documents)} chunks from {filename}") - else: - print(f" ⚠️ No valid chunks found in {filename}") - - except Exception as e: - print(f" ❌ Failed to process {filename}: {e}") - - -# ==================== SEARCH ==================== - -# Relevance threshold: ChromaDB returns distances (L2 or cosine). -# Lower distance = more similar. Filter out chunks with distance > threshold. -RELEVANCE_THRESHOLD = float(os.environ.get("ESIM_RAG_RELEVANCE_THRESHOLD", "1.0")) - - -def search_knowledge(query: str, n_results: int = 4) -> str: - """ - Semantic search with relevance threshold to reduce hallucination. - Filters out chunks with distance > RELEVANCE_THRESHOLD. - """ - try: - query_embed = get_embedding(query) - if not query_embed: - return "" - - results = collection.query( - query_embeddings=[query_embed], - n_results=n_results, - include=["documents", "distances"], - ) - - docs_list = results.get("documents", [[]]) - distances_list = results.get("distances", [[]]) - - if not docs_list or not docs_list[0]: - return "" - - docs = docs_list[0] - distances = distances_list[0] if distances_list else [] - - # Filter by relevance threshold (lower distance = more similar) - if distances and len(distances) == len(docs): - filtered = [ - (doc, d) for doc, d in zip(docs, distances) - if d <= RELEVANCE_THRESHOLD - ] - if filtered: - selected_chunks = [doc for doc, _ in filtered] - else: - return "" - else: - selected_chunks = docs - - context_text = "\n\n...\n\n".join(selected_chunks) - if len(context_text) > 3500: - context_text = context_text[:3500] - - header = "=== ESIM OFFICIAL DOCUMENTATION ===\n" - return f"{header}{context_text}\n===================================\n" - - except Exception as e: - print(f"RAG Error: {e}") - return "" diff --git a/src/chatbot/ollama_runner.py b/src/chatbot/ollama_runner.py deleted file mode 100644 index ae754bd0b..000000000 --- a/src/chatbot/ollama_runner.py +++ /dev/null @@ -1,192 +0,0 @@ -import os -import ollama -import json -import time - -# ==================== CLIENT ==================== - -ollama_client = ollama.Client( - host="http://localhost:11434", - timeout=300.0, -) - -# ==================== SETTINGS ==================== - -_SETTINGS_DIR = os.path.join( - os.path.expanduser("~"), ".local", "share", "esim-copilot" -) -_SETTINGS_PATH = os.path.join(_SETTINGS_DIR, "settings.json") - -_DEFAULT_TEXT_MODEL = "qwen2.5:3b" -_DEFAULT_VISION_MODEL = "minicpm-v:latest" -EMBED_MODEL = "nomic-embed-text" - - -def load_model_settings() -> dict: - """Load persisted model preferences from disk.""" - try: - with open(_SETTINGS_PATH, "r", encoding="utf-8") as f: - return json.load(f) - except Exception: - return {} - - -def save_model_settings(text_model: str, vision_model: str) -> None: - """Persist model preferences to disk.""" - os.makedirs(_SETTINGS_DIR, exist_ok=True) - try: - with open(_SETTINGS_PATH, "w", encoding="utf-8") as f: - json.dump({"text_model": text_model, "vision_model": vision_model}, f, indent=2) - except Exception as e: - print(f"[SETTINGS] Failed to save: {e}") - - -def list_available_models() -> list: - """Query Ollama for installed models. Returns list of model name strings.""" - try: - resp = ollama_client.list() - names = [m["name"] for m in resp.get("models", [])] - return names if names else [_DEFAULT_TEXT_MODEL, _DEFAULT_VISION_MODEL] - except Exception: - return [_DEFAULT_TEXT_MODEL, _DEFAULT_VISION_MODEL, EMBED_MODEL] - - -# Load settings and initialise model dicts -_settings = load_model_settings() - -VISION_MODELS = {"primary": _settings.get("vision_model", _DEFAULT_VISION_MODEL)} -TEXT_MODELS = {"default": _settings.get("text_model", _DEFAULT_TEXT_MODEL)} - - -def reload_model_settings() -> None: - """Re-read settings from disk and update running dicts (called after save).""" - s = load_model_settings() - VISION_MODELS["primary"] = s.get("vision_model", _DEFAULT_VISION_MODEL) - TEXT_MODELS["default"] = s.get("text_model", _DEFAULT_TEXT_MODEL) - - -# ==================== VISION ==================== - -def run_ollama_vision(prompt: str, image_input) -> str: - """Call vision model with Chain-of-Thought for better accuracy.""" - model = VISION_MODELS["primary"] - - try: - import base64 - - image_b64 = "" - if isinstance(image_input, bytes): - image_b64 = base64.b64encode(image_input).decode("utf-8") - elif isinstance(image_input, str) and os.path.isfile(image_input): - with open(image_input, "rb") as f: - image_b64 = base64.b64encode(f.read()).decode("utf-8") - elif isinstance(image_input, str) and len(image_input) > 100: - image_b64 = image_input - else: - raise ValueError("Invalid image input format") - - system_prompt = ( - "You are an expert Electronics Engineer using eSim.\n" - "Analyze the schematic image carefully.\n\n" - "STEP 1: THINKING PROCESS\n" - "- List visible components (e.g., 'I see 4 diodes in a bridge...').\n" - "- Trace connections (e.g., 'Resistor R1 is in series...').\n" - "- Check against the OCR text provided.\n\n" - "STEP 2: JSON OUTPUT\n" - "After your analysis, output a SINGLE JSON object wrapped in ```json ... ```.\n" - "Structure:\n" - "{\n" - ' "vision_summary": "Summary string",\n' - ' "component_counts": {"R": 0, "C": 0, "D": 0, "Q": 0, "U": 0},\n' - ' "circuit_analysis": {\n' - ' "circuit_type": "Rectifier/Amplifier/etc",\n' - ' "design_errors": [],\n' - ' "design_warnings": []\n' - ' },\n' - ' "components": ["R1", "D1"],\n' - ' "values": {"R1": "1k"}\n' - "}\n" - ) - - resp = ollama_client.chat( - model=model, - messages=[ - {"role": "system", "content": system_prompt}, - { - "role": "user", - "content": prompt, - "images": [image_b64], - }, - ], - options={ - "temperature": 0.0, - "num_ctx": 8192, - "num_predict": 1024, - }, - ) - - content = resp["message"]["content"] - - import re - json_match = re.search(r'```json\s*(\{.*?\})\s*```', content, re.DOTALL) - if json_match: - return json_match.group(1) - - start = content.find('{') - end = content.rfind('}') + 1 - if start != -1 and end != -1: - return content[start:end] - - return "{}" - - except Exception as e: - print(f"[VISION ERROR] {e}") - return json.dumps({ - "vision_summary": f"Vision failed: {str(e)[:50]}", - "component_counts": {}, - "circuit_analysis": {"circuit_type": "Error", "design_errors": [], "design_warnings": []}, - "components": [], - "values": {}, - }) - - -# ==================== TEXT ==================== - -def run_ollama(prompt: str, mode: str = "default") -> str: - """Run text model with focused parameters.""" - model = TEXT_MODELS.get(mode, TEXT_MODELS["default"]) - - try: - resp = ollama_client.chat( - model=model, - messages=[ - { - "role": "system", - "content": "You are an eSim and electronics expert. Be concise, accurate, and practical.", - }, - {"role": "user", "content": prompt}, - ], - options={ - "temperature": 0.05, - "num_ctx": 2048, - "num_predict": 400, - "top_p": 0.9, - "repeat_penalty": 1.1, - }, - ) - return resp["message"]["content"].strip() - - except Exception as e: - return f"[Error] {str(e)}" - - -# ==================== EMBEDDINGS ==================== - -def get_embedding(text: str): - """Get text embeddings for RAG.""" - try: - r = ollama_client.embeddings(model=EMBED_MODEL, prompt=text) - return r["embedding"] - except Exception as e: - print(f"[EMBED ERROR] {e}") - return None diff --git a/src/chatbot/stt_handler.py b/src/chatbot/stt_handler.py deleted file mode 100644 index f2d536066..000000000 --- a/src/chatbot/stt_handler.py +++ /dev/null @@ -1,92 +0,0 @@ -import os -import json -import queue -import time - -try: - import sounddevice as sd - from vosk import Model, KaldiRecognizer - _HAS_STT = True -except Exception: - sd = None - Model = None - KaldiRecognizer = None - _HAS_STT = False - -_MODEL = None - -DEFAULT_VOSK_DIR = os.path.join( - os.path.expanduser("~"), ".local", "share", - "esim-copilot", "vosk-model-small-en-us-0.15", -) - -def _get_model(): - global _MODEL - if not _HAS_STT: - raise RuntimeError( - "Speech-to-text is not available (missing vosk/sounddevice)." - ) - model_path = os.environ.get("VOSK_MODEL_PATH", "").strip() - if not model_path: - model_path = DEFAULT_VOSK_DIR - if not os.path.isdir(model_path): - raise RuntimeError( - f"Vosk model path not found. Set VOSK_MODEL_PATH or install at: {model_path}" - ) - if _MODEL is None: - _MODEL = Model(model_path) - return _MODEL - -def listen_to_mic(should_stop=lambda: False, max_silence_sec=3, samplerate=16000, phrase_limit_sec=8) -> str: - """ - Offline STT using Vosk. - Returns recognized text, or "" if cancelled / timed out. - """ - if not _HAS_STT: - raise RuntimeError("Speech-to-text is not installed or failed to load.") - q = queue.Queue() - rec = KaldiRecognizer(_get_model(), samplerate) - - started = False - t0 = time.time() - t_speech = None - - def callback(indata, frames, time_info, status): - q.put(bytes(indata)) - - with sd.RawInputStream( - samplerate=samplerate, - channels=1, - dtype="int16", - blocksize=8000, - callback=callback, - ): - while True: - if should_stop(): - return "" - - now = time.time() - - # Stop after silence - if not started and (now - t0) >= max_silence_sec: - return "" - - if started and t_speech and (now - t_speech) >= phrase_limit_sec: - break - - try: - data = q.get(timeout=0.2) - except queue.Empty: - continue - - if rec.AcceptWaveform(data): - text = json.loads(rec.Result()).get("text", "").strip() - if text: - return text - else: - partial = json.loads(rec.PartialResult()).get("partial", "").strip() - if partial and not started: - started = True - t_speech = now - - return json.loads(rec.FinalResult()).get("text", "").strip() diff --git a/src/frontEnd/Application.py b/src/frontEnd/Application.py index 20a529731..14cf662bd 100644 --- a/src/frontEnd/Application.py +++ b/src/frontEnd/Application.py @@ -130,6 +130,7 @@ def initchatbot(self): self.chatbot_dock.visibilityChanged.connect( lambda _: self._reposition_chatbot_icon() ) + self.chatbot_dock.installEventFilter(self) # ── Floating icon button (bottom-right corner) ────────────────── self.chatboticon = QtWidgets.QPushButton( @@ -191,6 +192,14 @@ def _reposition_chatbot_icon(self): self.chatboticon.move(x, bottom_y) self.chatboticon.raise_() # Always keep on top + def eventFilter(self, obj, event): + """ + Detect resize events on the dock widget so the icon stays aligned. + """ + if obj == self.chatbot_dock and event.type() == QtCore.QEvent.Resize: + self._reposition_chatbot_icon() + return super().eventFilter(obj, event) + def resizeEvent(self, event): """ Adjust chatbot icon button position during window resize. diff --git a/src/frontEnd/Chatbot.py b/src/frontEnd/Chatbot.py index 1a3e75701..2339681de 100644 --- a/src/frontEnd/Chatbot.py +++ b/src/frontEnd/Chatbot.py @@ -1,7 +1,29 @@ -from chatbot.chatbot_thread import ( +import os +import sys + +# Import pathmagic first to ensure 'src' is in sys.path before any local imports +try: + import pathmagic # noqa:F401 +except ImportError: + try: + from frontEnd import pathmagic # noqa:F401 + except ImportError: + # Fallback: manually add the src directory relative to this file + current_dir = os.path.dirname(os.path.abspath(__file__)) + src_dir = os.path.abspath(os.path.join(current_dir, '..')) + if src_dir not in sys.path: + sys.path.insert(0, src_dir) + +if os.name == 'nt': + init_path = '' +else: + init_path = '../../' + +from chatbot.chatbot_thread import ( # type: ignore OllamaWorker, OllamaVisionWorker, MicWorker, OllamaStatusWorker, ModelFetchWorker, - detect_topic_switch, get_stt_backend + detect_topic_switch, get_stt_backend, + VISION_MODEL_KEYWORDS, # EXTRACTED: shared constant, avoids duplicate keyword list ) from PyQt5.QtWidgets import ( QWidget, QHBoxLayout, QTextBrowser, QVBoxLayout, @@ -14,18 +36,10 @@ from configuration.Appconfig import Appconfig from datetime import datetime import re -import os import json import uuid import base64 -if os.name == 'nt': - from frontEnd import pathmagic # noqa:F401 - init_path = '' -else: - import pathmagic # noqa:F401 - init_path = '../../' - # ── Storage paths ───────────────────────────────────────────────────────────── _ESIM_DIR = os.path.join(os.path.expanduser('~'), '.esim') _HISTORY_FILE = os.path.join(_ESIM_DIR, 'chatbot_history.json') @@ -217,11 +231,15 @@ def _image_thumbnail_html(b64_str: str, filename: str) -> str: def _user_bubble(text, timestamp): safe = _escape_text_preserve_breaks(text) + b64_text = base64.b64encode(text.encode('utf-8')).decode('utf-8') return ( '' '' '
' '' + '' '' - f'' '
' + f'✏️' + '' f'{safe}' '
You  ·  {timestamp}
' '
' @@ -243,45 +261,34 @@ def _approx_token_count(text: str) -> int: return max(1, len(text) // 4) -def _bot_bubble(text, timestamp, response_idx): +def _bot_bubble(text, timestamp, response_idx=None): + """Render a bot response bubble. If response_idx is given, include Retry/Copy links.""" rendered = _render_markdown(text) - copy_href = f'copy:///{response_idx}' - retry_href = f'retry:///{response_idx}' - token_est = _approx_token_count(text) - - return ( - '' - '' - '
' - '' - '' - '
' - f'{rendered}' - '
' - '' - f'' - f'' - '
' - f'eSim AI  ·  {timestamp}  ·  ~{token_est} tokens' - f'↻ Retry' - f'  ' - f'Copy
' - '
' - '
' - ) + if response_idx is not None: + copy_href = f'copy:///{response_idx}' + retry_href = f'retry:///{response_idx}' + token_est = _approx_token_count(text) + footer = ( + '' + '' + f'' + f'' + '
' + f'eSim AI  ·  {timestamp}  ·  ~{token_est} tokens' + f'↻ Retry' + f'  ' + f'Copy
' + '' + ) + else: + footer = ( + f'' + f'eSim AI  ·  {timestamp}' + ) -def _bot_bubble_simple(text, timestamp): - rendered = _render_markdown(text) return ( '' '' - f'' + f'{footer}' '
' @@ -296,8 +303,7 @@ def _bot_bubble_simple(text, timestamp): '">' f'{rendered}' '
' - f'eSim AI  ·  {timestamp}
' '' '' @@ -408,6 +414,10 @@ def _is_image_file(path: str) -> bool: # ── Smart input field ───────────────────────────────────────────────────────── class _HistoryLineEdit(QLineEdit): + """Input field with command history (↑/↓) and clipboard image paste (Ctrl+V).""" + + image_pasted = pyqtSignal(str) # emits temp file path of pasted image + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._sent_history = [] @@ -419,6 +429,29 @@ def add_to_history(self, text): self._hist_idx = -1 def keyPressEvent(self, event: QKeyEvent): + # ── Ctrl+V: check for clipboard image before default paste ──── + if event.key() == Qt.Key_V and event.modifiers() & Qt.ControlModifier: + clipboard = QApplication.clipboard() + mime = clipboard.mimeData() + if mime and mime.hasImage(): + image = clipboard.image() + if not image.isNull(): + import tempfile + tmp_dir = os.path.join( + os.path.expanduser('~'), '.esim', 'clipboard_images' + ) + os.makedirs(tmp_dir, exist_ok=True) + tmp_path = os.path.join( + tmp_dir, + f"paste_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" + ) + image.save(tmp_path, "PNG") + self.image_pasted.emit(tmp_path) + return + # Fall through to default paste for text + super().keyPressEvent(event) + return + if event.key() == Qt.Key_Up and self._sent_history: if self._hist_idx == -1: self._draft = self.text() @@ -514,7 +547,7 @@ def __init__(self, session: dict, parent=None): if line.startswith("User:"): html += _user_bubble(line[5:].strip(), "") elif line.startswith("Bot:"): - html += _bot_bubble_simple(line[4:].strip(), "") + html += _bot_bubble(line[4:].strip(), "") browser.setHtml(html if html else "

No messages

") QTimer.singleShot(120, lambda: browser.verticalScrollBar().setValue(browser.verticalScrollBar().maximum())) root.addWidget(browser) @@ -1052,9 +1085,6 @@ def __init__(self): self._save_debounce_timer.setSingleShot(True) self._save_debounce_timer.timeout.connect(self._flush_save) - self._thinking_timer = QTimer(self) - self._thinking_timer.timeout.connect(self._animate_thinking) - self._typing_anim_timer = QTimer(self) self._typing_anim_timer.timeout.connect(self._animate_typing_bubble) @@ -1334,10 +1364,11 @@ def __init__(self): self.user_input = _HistoryLineEdit( self, placeholderText="Message eSim AI… (↑↓ for history)" ) + self.user_input.setMinimumHeight(42) self.user_input.setStyleSheet(""" QLineEdit { - font-size:13px; padding:9px 14px; - border:1.5px solid #e0e0e0; border-radius:22px; + font-size:14px; padding:10px 18px; + border:1.5px solid #e0e0e0; border-radius:21px; background:#f7f7f7; color:#1a1a2e; } QLineEdit:focus { @@ -1346,15 +1377,20 @@ def __init__(self): } """) self.user_input.returnPressed.connect(self.ask_ollama) + self.user_input.image_pasted.connect( + lambda path: self._stage_image_paths([path]) + ) input_layout.addWidget(self.user_input) - self.send_button = QPushButton("Send") - self.send_button.setFixedHeight(38) + self.send_button = QPushButton("➤") + self.send_button.setFixedSize(40, 40) + self.send_button.setToolTip("Send Message") self.send_button.setStyleSheet(""" QPushButton { - font-size:13px; font-weight:600; padding:5px 20px; + font-size:18px; font-weight:600; background-color:#0095f6; color:white; - border:none; border-radius:19px; + border:none; border-radius:20px; + padding-left: 2px; } QPushButton:hover { background-color:#0082d8; } """) @@ -1374,18 +1410,7 @@ def __init__(self): self.stop_button.hide() input_layout.addWidget(self.stop_button) - self.clear_button = QPushButton("Clear") - self.clear_button.setFixedHeight(38) - self.clear_button.setStyleSheet(""" - QPushButton { - font-size:13px; padding:5px 14px; - background-color:#f0f0f0; color:#666; - border:none; border-radius:19px; - } - QPushButton:hover { background-color:#ffe0e0; color:#cc0000; } - """) - self.clear_button.clicked.connect(self.clear_session) - input_layout.addWidget(self.clear_button) + chat_layout.addLayout(input_layout) @@ -1822,19 +1847,8 @@ def _new_chat(self): # clear_session() — that method deletes the session file, which # would erase the chat we just saved above. self.chat_display.setHtml(WELCOME_MESSAGE) - self.chat_history = [] - self._retry_history = [] - self._bot_responses = {} - self._response_counter = 0 - self._last_user_text = "" - self._viewing_past_session = False - self._clear_staged_images() - self._images_store = {} - self._last_image_paths = [] - self._current_session_kind = "text" - self._session_title_override = None - self._current_session_id = str(uuid.uuid4()) - self._session_created_at = datetime.now().strftime("%Y-%m-%d %H:%M") + # MERGED: combined all state resetting into one reusable helper + self._reset_session_state() try: if os.path.exists(_HISTORY_FILE): @@ -1843,9 +1857,6 @@ def _new_chat(self): pass self._sidebar.populate() - self._current_session_kind = "text" - self._session_title_override = None - self._sidebar.populate() def _on_session_deleted(self, deleted_id: str): if deleted_id == self._current_session_id or self._viewing_past_session: @@ -1856,18 +1867,8 @@ def _on_session_deleted(self, deleted_id: str): self._save_debounce_timer.stop() self._save_pending = False - self._current_session_id = str(uuid.uuid4()) - self._session_created_at = datetime.now().strftime("%Y-%m-%d %H:%M") - self._current_session_kind = "text" - self._session_title_override = None - self._viewing_past_session = False - self.chat_history = [] - self._retry_history = [] - self._bot_responses = {} - self._response_counter = 0 - self._last_user_text = "" - self._images_store = {} - self._last_image_paths = [] + # MERGED: combined all state resetting into one reusable helper + self._reset_session_state() try: if os.path.exists(_HISTORY_FILE): os.remove(_HISTORY_FILE) @@ -1933,8 +1934,6 @@ def _on_status_result(self, running: bool): """) self._was_ollama_offline = True - # ── Typing bubble ───────────────────────────────────────────────── - # ── Typing bubble (window-switch safe) ────────────────────────── # # (_typing_start_pos) and used it to select-and-replace the animated @@ -2051,6 +2050,19 @@ def _handle_link_click(self, url): return self._retry_response(idx) + elif scheme == 'edit': + if not parts: + return + try: + import base64 + b64_text = parts[-1] + text = base64.b64decode(b64_text).decode('utf-8') + self._editing_prompt_text = text + self.user_input.setText(text) + self.user_input.setFocus() + except Exception: + pass + elif scheme == 'clear': self.clear_session() @@ -2153,35 +2165,49 @@ def _clear_staged_images(self): self._staged_images.clear() self._refresh_staging_strip() - def _warn_or_switch_to_vision_model(self) -> bool: + def _auto_switch_model(self, keywords, preferred_names, label): """ - Ensure a vision-capable model is selected before sending images. - - Returns True if it is safe to proceed (a vision model is active), - or False if no vision model is installed and the request should be - blocked. Sending images to a text-only model causes it to fabricate - completely wrong answers because it cannot actually see the image. + Shared helper for auto-switching models. + Returns the index of the matched model, or -1 if none found. """ current = self.model_combo.currentText() - vision_keywords = ["llava", "bakllava", "vision", "moondream", "qwen2-vl", "minicpm-v"] - # Already on a vision model — good to go. - if any(k in current.lower() for k in vision_keywords): - return True + # Already on a matching model — no switch needed. + if any(k in current.lower() for k in keywords): + return self.model_combo.currentIndex() - # Try to auto-switch to any vision model the user has installed. - preferred_order = ["moondream", "llava:7b", "llava", "bakllava", "llava:13b"] + # Try preferred model names first (exact match). + for preferred in preferred_names: + idx = self.model_combo.findText(preferred) + if idx >= 0: + self.model_combo.setCurrentIndex(idx) + self.chat_display.append(_system_bubble( + f"🔄 Auto-switched to {label} model: {preferred}" + )) + self._scroll_to_bottom() + return idx + + # Fallback: any model containing one of the keywords. for i in range(self.model_combo.count()): name = self.model_combo.itemText(i) - if any(k in name.lower() for k in vision_keywords): + if any(k in name.lower() for k in keywords): self.model_combo.setCurrentIndex(i) self.chat_display.append(_system_bubble( - f"Switched to vision model: {name}" + f"🔄 Auto-switched to {label} model: {name}" )) self._scroll_to_bottom() - return True + return i + + return -1 - # No vision model found — block the request and explain clearly. + def _warn_or_switch_to_vision_model(self) -> bool: + """Ensure a vision model is active before sending images.""" + # MERGED: uses shared VISION_MODEL_KEYWORDS from chatbot_thread + preferred = ["llava:latest", "llava", "llava:7b", "llava:13b", "bakllava", "moondream"] + idx = self._auto_switch_model(VISION_MODEL_KEYWORDS, preferred, "vision") + if idx >= 0: + return True + # No vision model found — block and explain. self.chat_display.append(_system_bubble( "⚠️ No vision model installed. Image analysis is not possible with the " "current model — a text-only model cannot see images and will give " @@ -2193,6 +2219,10 @@ def _warn_or_switch_to_vision_model(self) -> bool: self._scroll_to_bottom() return False + def _switch_to_text_model(self): + """Auto-switch to qwen2.5 for text queries.""" + self._auto_switch_model(["qwen2.5"], [], "text") + # ── Mic ────────────────────────────────────────────────────────── def _on_temp_changed(self, value: int): @@ -2314,15 +2344,8 @@ def analyse_netlist(self, netlist_path: str): self._last_user_text = prompt self._start_thinking() - self.worker = OllamaWorker( - self.chat_history, - model=self.model_combo.currentText(), - temperature=self._temperature, - num_predict=self._num_predict, - ) - self.worker.response_signal.connect(self.display_response) - self.worker.status_signal.connect(self._on_status_update) - self.worker.start() + # EXTRACTED: helper method to launch OllamaWorker + self._launch_text_worker(self.chat_history) # ── Topic switch ───────────────────────────────────────────────── @@ -2448,22 +2471,41 @@ def _populate_models(self): def _on_models_fetched(self, model_names: list): self.model_combo.clear() + + if not model_names: + # No models found — Ollama may be offline or has no models pulled. + self.model_combo.addItem("No models found") + self.model_combo.setEnabled(False) + self.status_label.setText( + "⚠️ No Ollama models found. Run 'ollama pull qwen2.5-coder' " + "in a terminal to install one." + ) + return + for name in model_names: self.model_combo.addItem(name) - preferred_order = [ - 'qwen2.5-coder:3b', - 'llava:13b', - 'llava:7b', - 'llava', - 'bakllava', - ] + # Try to default to any qwen2.5 variant chosen_idx = -1 - for preferred in preferred_order: - idx = self.model_combo.findText(preferred) - if idx >= 0: - chosen_idx = idx + for i in range(self.model_combo.count()): + name = self.model_combo.itemText(i) + if "qwen2.5" in name.lower(): + chosen_idx = i break + + # If no qwen2.5, try some fallback preferred models + if chosen_idx == -1: + preferred_fallbacks = ['llava:13b', 'llava:7b', 'llava', 'bakllava'] + for preferred in preferred_fallbacks: + idx = self.model_combo.findText(preferred) + if idx >= 0: + chosen_idx = idx + break + + # If still nothing matched, just use the first available model + if chosen_idx == -1 and self.model_combo.count() > 0: + chosen_idx = 0 + if chosen_idx >= 0: self.model_combo.setCurrentIndex(chosen_idx) @@ -2471,9 +2513,6 @@ def _on_models_fetched(self, model_names: list): # ── Thinking / retry / regenerate ──────────────────────────────── - def _animate_thinking(self): - pass - def _start_thinking(self): self._is_generating = True self.user_input.setEnabled(False) @@ -2482,7 +2521,7 @@ def _start_thinking(self): self._staging_area.setEnabled(False) self.send_button.hide() self.stop_button.show() - self.clear_button.setEnabled(False) + self._show_typing_bubble() def _stop_thinking(self): @@ -2495,13 +2534,52 @@ def _stop_thinking(self): self._staging_area.setEnabled(True) self.stop_button.hide() self.send_button.show() - self.clear_button.setEnabled(True) + def _scroll_to_bottom(self): self.chat_display.verticalScrollBar().setValue( self.chat_display.verticalScrollBar().maximum() ) + def _reset_session_state(self): + """EXTRACTED: Common session state resets to avoid code duplication across handlers.""" + self.chat_history = [] + self._retry_history = [] + self._bot_responses = {} + self._response_counter = 0 + self._last_user_text = "" + self._viewing_past_session = False + self._clear_staged_images() + self._images_store = {} + self._last_image_paths = [] + self._current_session_kind = "text" + self._session_title_override = None + self._current_session_id = str(uuid.uuid4()) + self._session_created_at = datetime.now().strftime("%Y-%m-%d %H:%M") + + def _launch_text_worker(self, chat_history): + """EXTRACTED: Launch OllamaWorker with correct configuration and signal mappings.""" + self.worker = OllamaWorker( + chat_history, + model=self.model_combo.currentText(), + temperature=self._temperature, + num_predict=self._num_predict, + ) + self.worker.response_signal.connect(self.display_response) + self.worker.status_signal.connect(self._on_status_update) + self.worker.start() + + def _launch_vision_worker(self, image_paths, extra_prompt): + """EXTRACTED: Launch OllamaVisionWorker with correct configuration and signal mappings.""" + self.worker = OllamaVisionWorker( + image_paths=image_paths, + extra_prompt=extra_prompt, + model=self.model_combo.currentText(), + ) + self.worker.response_signal.connect(self.display_response) + self.worker.status_signal.connect(self._on_status_update) + self.worker.start() + def _stop_generating(self): if hasattr(self, 'worker') and self.worker.isRunning(): self.worker.stop() @@ -2555,26 +2633,13 @@ def _retry_response(self, response_idx: int): followup_paths = [p for p in self._last_image_paths if os.path.exists(p)] if followup_paths and "[Image analysis request:" in last_user: prompt = last_user.split("\n", 1)[-1].strip() if "\n" in last_user else "" - self.worker = OllamaVisionWorker( - image_paths=followup_paths, - extra_prompt=prompt, - model=self.model_combo.currentText(), - ) + # EXTRACTED: helper method to launch OllamaVisionWorker + self._launch_vision_worker(followup_paths, prompt) else: - self.worker = OllamaWorker( - self._retry_history, - model=self.model_combo.currentText(), - temperature=self._temperature, - num_predict=self._num_predict, - ) - self.worker.response_signal.connect(self.display_response) - self.worker.status_signal.connect(self._on_status_update) - self.worker.start() + # EXTRACTED: helper method to launch OllamaWorker + self._launch_text_worker(self._retry_history) - def _retry_last(self): - """Legacy shim kept so any external callers don't break.""" - if self.chat_history: - self._retry_response(self._response_counter - 1) + # REMOVED: _retry_last() — legacy shim with no callers found in codebase def _regenerate_last_response(self): if not self.chat_history: @@ -2595,15 +2660,8 @@ def _regenerate_last_response(self): self._rebuild_chat_html_from_history() self._start_thinking() - self.worker = OllamaWorker( - self._retry_history, - model=self.model_combo.currentText(), - temperature=self._temperature, - num_predict=self._num_predict, - ) - self.worker.response_signal.connect(self.display_response) - self.worker.status_signal.connect(self._on_status_update) - self.worker.start() + # EXTRACTED: helper method to launch OllamaWorker + self._launch_text_worker(self._retry_history) def _on_status_update(self, msg: str): self.status_label.setText(msg) @@ -2624,11 +2682,32 @@ def ask_ollama(self): if self._is_generating: return + # Guard: prevent sending when no valid model is available + selected = self.model_combo.currentText() + if not selected or selected == "No models found" or selected == "Loading models…": + self.status_label.setText( + "⚠️ No model available. Make sure Ollama is running " + "and you have pulled a model." + ) + self._populate_models() + return + if self._viewing_past_session: # chat_history was already synced when the session was loaded, # so no rebuild is needed — just clear the read-only flag. self._viewing_past_session = False + editing_text = getattr(self, '_editing_prompt_text', None) + if editing_text: + # We are editing an existing prompt. Find it in history and truncate. + for i in range(len(self.chat_history) - 1, -1, -1): + msg = self.chat_history[i] + if msg == f"User: {editing_text}" or msg.endswith(f"\n{editing_text}"): + self.chat_history = self.chat_history[:i] + self._rebuild_chat_html_from_history() + break + self._editing_prompt_text = None + ts = _get_time() if staged_paths: @@ -2708,18 +2787,20 @@ def ask_ollama(self): self._clear_staged_images() self._start_thinking() - self.worker = OllamaVisionWorker( - image_paths=staged_paths, - extra_prompt=vision_extra_prompt, - model=self.model_combo.currentText(), - ) - self.worker.response_signal.connect(self.display_response) - self.worker.status_signal.connect(self._on_status_update) - self.worker.start() + # EXTRACTED: helper method to launch OllamaVisionWorker + self._launch_vision_worker(staged_paths, vision_extra_prompt) return - self._current_session_kind = "text" self._check_topic_switch(user_text) + + # The user explicitly requested that any text-only search should + # switch to the Qwen model. Since Qwen cannot process images, + # we must drop any previous image context and use the text worker. + self._last_image_paths.clear() + + self._current_session_kind = "text" + self._switch_to_text_model() + self.chat_history = (self.chat_history + [f"User: {user_text}"])[-20:] self.chat_display.append(_user_bubble(user_text, ts)) self._scroll_to_bottom() @@ -2730,27 +2811,8 @@ def ask_ollama(self): self._retry_history = list(self.chat_history) self._start_thinking() - # If the user is following up on an image session, re-send the last - # images so the model has visual context for its answer. - followup_image_paths = [ - p for p in self._last_image_paths if os.path.exists(p) - ] - if followup_image_paths and self._current_session_kind in ("image", "text"): - self.worker = OllamaVisionWorker( - image_paths=followup_image_paths, - extra_prompt=user_text, - model=self.model_combo.currentText(), - ) - else: - self.worker = OllamaWorker( - self.chat_history, - model=self.model_combo.currentText(), - temperature=self._temperature, - num_predict=self._num_predict, - ) - self.worker.response_signal.connect(self.display_response) - self.worker.status_signal.connect(self._on_status_update) - self.worker.start() + # EXTRACTED: helper method to launch OllamaWorker + self._launch_text_worker(self.chat_history) # ── Window / response / clear ──────────────────────────────────── @@ -2806,22 +2868,8 @@ def clear_session(self): pass self.chat_display.setHtml(WELCOME_MESSAGE) - self.chat_history = [] - self._retry_history = [] - self._bot_responses = {} - self._response_counter = 0 - self._last_user_text = "" - self._viewing_past_session = False - self._clear_staged_images() - self._images_store = {} - self._last_image_paths = [] - self._viewing_past_session = False - self._current_session_kind = "text" - self._session_title_override = None - - # Assign a fresh session ID so the next conversation starts clean - self._current_session_id = str(uuid.uuid4()) - self._session_created_at = datetime.now().strftime("%Y-%m-%d %H:%M") + # MERGED: combined all state resetting into one reusable helper + self._reset_session_state() try: if os.path.exists(_HISTORY_FILE): @@ -2847,15 +2895,8 @@ def debug_ollama(self): self._scroll_to_bottom() self._retry_history = list(self.chat_history) self._start_thinking() - self.worker = OllamaWorker( - self.chat_history, - model=self.model_combo.currentText(), - temperature=self._temperature, - num_predict=self._num_predict, - ) - self.worker.response_signal.connect(self.display_response) - self.worker.status_signal.connect(self._on_status_update) - self.worker.start() + # EXTRACTED: helper method to launch OllamaWorker + self._launch_text_worker(self.chat_history) self.user_input.clear() def debug_error(self, log): diff --git a/src/frontEnd/DockArea.py b/src/frontEnd/DockArea.py index 32d0682fb..a63c87379 100755 --- a/src/frontEnd/DockArea.py +++ b/src/frontEnd/DockArea.py @@ -12,7 +12,6 @@ from PyQt5.QtWidgets import QLineEdit, QLabel, QPushButton, QVBoxLayout, QHBoxLayout from PyQt5.QtCore import Qt import os -from frontEnd.Chatbot import create_chatbot_dock from converter.pspiceToKicad import PspiceConverter from converter.ltspiceToKicad import LTspiceConverter from converter.LtspiceLibConverter import LTspiceLibConverter @@ -607,17 +606,3 @@ def closeDock(self): self.temp = self.obj_appconfig.current_project['ProjectName'] for dockwidget in self.obj_appconfig.dock_dict[self.temp]: dockwidget.close() - - def chatbotEditor(self): - """ - Creates the eSim Copilot (Chatbot) dock. - """ - global count - - self.chatbot_dock = create_chatbot_dock(self) - - self.addDockWidget(QtCore.Qt.BottomDockWidgetArea, self.chatbot_dock) - - self.chatbot_dock.setVisible(True) - self.chatbot_dock.raise_() - diff --git a/src/frontEnd/pathmagic.py b/src/frontEnd/pathmagic.py index 5f0d712c3..30e1516a2 100755 --- a/src/frontEnd/pathmagic.py +++ b/src/frontEnd/pathmagic.py @@ -1,7 +1,15 @@ import os import sys -# Setting PYTHONPATH -cwd = os.getcwd() -(setPath, fronEnd) = os.path.split(cwd) -sys.path.append(setPath) +# Calculate absolute path to the 'src' directory relative to this file +# __file__ is src/frontEnd/pathmagic.py, its parent is 'src/frontEnd', and its grandparent is 'src' +current_dir = os.path.dirname(os.path.abspath(__file__)) +src_dir = os.path.abspath(os.path.join(current_dir, '..')) + +if src_dir not in sys.path: + sys.path.insert(0, src_dir) + +# Also add the project root directory to sys.path +root_dir = os.path.abspath(os.path.join(src_dir, '..')) +if root_dir not in sys.path: + sys.path.append(root_dir)