diff --git a/optillm/mars/agent.py b/optillm/mars/agent.py index 704c33d3..e9ebd275 100644 --- a/optillm/mars/agent.py +++ b/optillm/mars/agent.py @@ -5,17 +5,17 @@ import logging from typing import Dict, Any, Tuple from datetime import datetime -import random from .prompts import ( MATHEMATICAL_SYSTEM_PROMPT, AGENT_EXPLORATION_PROMPT, VERIFICATION_PROMPT, - IMPROVEMENT_PROMPT + IMPROVEMENT_PROMPT, ) from .workspace import AgentSolution, VerificationResult logger = logging.getLogger(__name__) + class MARSAgent: """Individual agent for mathematical reasoning with OpenRouter reasoning API""" @@ -40,28 +40,33 @@ def _get_reasoning_effort(self) -> str: else: return "high" # ~80% of max_tokens for reasoning - def generate_solution(self, problem: str, request_id: str = None) -> Tuple[AgentSolution, int]: + def generate_solution( + self, problem: str, request_id: str = None + ) -> Tuple[AgentSolution, int]: """Generate a solution for the given problem using reasoning API""" import time + start_time = time.time() - logger.info(f"🤖 AGENT {self.agent_id}: Starting solution generation (temp: {self.temperature}, effort: {self._get_reasoning_effort()})") - logger.info(f"🤖 AGENT {self.agent_id}: Problem length: {len(problem)} characters") + logger.info( + f"🤖 AGENT {self.agent_id}: Starting solution generation (temp: {self.temperature}, effort: {self._get_reasoning_effort()})" + ) + logger.info( + f"🤖 AGENT {self.agent_id}: Problem length: {len(problem)} characters" + ) # Prepare the prompt exploration_prompt = AGENT_EXPLORATION_PROMPT.format( - agent_id=self.agent_id, - temperature=self.temperature, - problem=problem + agent_id=self.agent_id, temperature=self.temperature, problem=problem ) # Configure reasoning parameters - simplified with effort only reasoning_effort = self._get_reasoning_effort() - max_tokens = self.config['max_tokens'] - logger.info(f"🤖 AGENT {self.agent_id}: Using max_tokens={max_tokens}, reasoning_effort={reasoning_effort}") + max_tokens = self.config["max_tokens"] + logger.info( + f"🤖 AGENT {self.agent_id}: Using max_tokens={max_tokens}, reasoning_effort={reasoning_effort}" + ) - reasoning_config = { - "effort": reasoning_effort - } + reasoning_config = {"effort": reasoning_effort} try: # Make API call with reasoning via extra_body for OpenRouter compatibility @@ -71,52 +76,74 @@ def generate_solution(self, problem: str, request_id: str = None) -> Tuple[Agent model=self.model, messages=[ {"role": "system", "content": MATHEMATICAL_SYSTEM_PROMPT}, - {"role": "user", "content": exploration_prompt} + {"role": "user", "content": exploration_prompt}, ], max_tokens=max_tokens, temperature=self.temperature, timeout=300, # 5 minute timeout for complex problems - extra_body={ - "reasoning": reasoning_config - } + extra_body={"reasoning": reasoning_config}, ) api_duration = time.time() - api_start - logger.info(f"🤖 AGENT {self.agent_id}: API call completed in {api_duration:.2f}s") + logger.info( + f"🤖 AGENT {self.agent_id}: API call completed in {api_duration:.2f}s" + ) + if not response.choices: + raise ValueError( + f"LLM returned empty response in generate_solution (agent {self.agent_id})" + ) solution_text = response.choices[0].message.content.strip() # ENHANCED LOGGING: Log solution details solution_length = len(solution_text) word_count = len(solution_text.split()) has_boxed = "\\boxed{" in solution_text - has_proof_words = any(word in solution_text.lower() for word in ['therefore', 'thus', 'proof', 'qed']) + has_proof_words = any( + word in solution_text.lower() + for word in ["therefore", "thus", "proof", "qed"] + ) logger.info(f"🤖 AGENT {self.agent_id}: Solution analysis:") logger.info(f" 📝 Length: {solution_length:,} chars, {word_count:,} words") logger.info(f" 📦 Has boxed answer: {has_boxed}") logger.info(f" 🔍 Has proof indicators: {has_proof_words}") - logger.info(f" 📄 Preview: {solution_text[:200]}{'...' if len(solution_text) > 200 else ''}") - logger.info(f" 📄 Last 100 chars: ...{solution_text[-100:] if solution_length > 100 else solution_text}") + logger.info( + f" 📄 Preview: {solution_text[:200]}{'...' if len(solution_text) > 200 else ''}" + ) + logger.info( + f" 📄 Last 100 chars: ...{solution_text[-100:] if solution_length > 100 else solution_text}" + ) # Extract reasoning tokens from the correct nested structure reasoning_tokens = 0 total_tokens = 0 - if hasattr(response, 'usage') and response.usage: - total_tokens = getattr(response.usage, 'total_tokens', 0) + if hasattr(response, "usage") and response.usage: + total_tokens = getattr(response.usage, "total_tokens", 0) # Check completion_tokens_details first (OpenRouter structure) - if hasattr(response.usage, 'completion_tokens_details') and response.usage.completion_tokens_details: - reasoning_tokens = getattr(response.usage.completion_tokens_details, 'reasoning_tokens', 0) + if ( + hasattr(response.usage, "completion_tokens_details") + and response.usage.completion_tokens_details + ): + reasoning_tokens = getattr( + response.usage.completion_tokens_details, "reasoning_tokens", 0 + ) # Fallback to direct usage field (standard OpenAI structure) if reasoning_tokens == 0: - reasoning_tokens = getattr(response.usage, 'reasoning_tokens', 0) + reasoning_tokens = getattr(response.usage, "reasoning_tokens", 0) - reasoning_ratio = (reasoning_tokens / total_tokens * 100) if total_tokens > 0 else 0 - logger.info(f"🤖 AGENT {self.agent_id}: Token usage: reasoning={reasoning_tokens:,}, total={total_tokens:,} ({reasoning_ratio:.1f}% reasoning)") + reasoning_ratio = ( + (reasoning_tokens / total_tokens * 100) if total_tokens > 0 else 0 + ) + logger.info( + f"🤖 AGENT {self.agent_id}: Token usage: reasoning={reasoning_tokens:,}, total={total_tokens:,} ({reasoning_ratio:.1f}% reasoning)" + ) # Extract confidence from solution (heuristic based on response characteristics) confidence = self._estimate_confidence(solution_text) - logger.info(f"🤖 AGENT {self.agent_id}: Estimated confidence: {confidence:.3f}") + logger.info( + f"🤖 AGENT {self.agent_id}: Estimated confidence: {confidence:.3f}" + ) # Create agent solution object with enhanced metadata agent_solution = AgentSolution( @@ -126,17 +153,23 @@ def generate_solution(self, problem: str, request_id: str = None) -> Tuple[Agent reasoning_tokens=reasoning_tokens, total_tokens=total_tokens, solution_length=solution_length, - temperature=self.temperature + temperature=self.temperature, ) total_duration = time.time() - start_time - logger.info(f"🤖 AGENT {self.agent_id}: ✅ Solution generated in {total_duration:.2f}s (API: {api_duration:.2f}s, processing: {total_duration-api_duration:.2f}s)") + logger.info( + f"🤖 AGENT {self.agent_id}: ✅ Solution generated in {total_duration:.2f}s (API: {api_duration:.2f}s, processing: {total_duration-api_duration:.2f}s)" + ) return agent_solution, reasoning_tokens except Exception as e: error_duration = time.time() - start_time - logger.error(f"🤖 AGENT {self.agent_id}: ❌ Error generating solution after {error_duration:.2f}s: {str(e)}") - logger.error(f"🤖 AGENT {self.agent_id}: Model: {self.model}, Temperature: {self.temperature}, Max tokens: {max_tokens}") + logger.error( + f"🤖 AGENT {self.agent_id}: ❌ Error generating solution after {error_duration:.2f}s: {str(e)}" + ) + logger.error( + f"🤖 AGENT {self.agent_id}: Model: {self.model}, Temperature: {self.temperature}, Max tokens: {max_tokens}" + ) # Return empty solution with error indication error_message = f"Error generating solution: {str(e)}" error_solution = AgentSolution( @@ -146,24 +179,35 @@ def generate_solution(self, problem: str, request_id: str = None) -> Tuple[Agent reasoning_tokens=0, total_tokens=0, solution_length=len(error_message), - temperature=self.temperature + temperature=self.temperature, ) return error_solution, 0 - def verify_solution(self, problem: str, solution: str, verifier_id: int, solution_agent_id: int, request_id: str = None) -> VerificationResult: + def verify_solution( + self, + problem: str, + solution: str, + verifier_id: int, + solution_agent_id: int, + request_id: str = None, + ) -> VerificationResult: """Verify a solution using mathematical reasoning""" import time + start_time = time.time() - logger.info(f"🔍 VERIFIER {self.agent_id}: Starting verification (target: Agent {solution_agent_id}, verifier_id: {verifier_id})") - logger.info(f"🔍 VERIFIER {self.agent_id}: Solution length: {len(solution):,} chars") + logger.info( + f"🔍 VERIFIER {self.agent_id}: Starting verification (target: Agent {solution_agent_id}, verifier_id: {verifier_id})" + ) + logger.info( + f"🔍 VERIFIER {self.agent_id}: Solution length: {len(solution):,} chars" + ) verification_prompt = VERIFICATION_PROMPT.format( - problem=problem, - solution=solution + problem=problem, solution=solution ) # Use simplified verification with effort parameter - max_tokens = self.config['max_tokens'] + max_tokens = self.config["max_tokens"] try: api_start = time.time() @@ -172,7 +216,7 @@ def verify_solution(self, problem: str, solution: str, verifier_id: int, solutio model=self.model, messages=[ {"role": "system", "content": MATHEMATICAL_SYSTEM_PROMPT}, - {"role": "user", "content": verification_prompt} + {"role": "user", "content": verification_prompt}, ], max_tokens=max_tokens, temperature=0.1, # Low temperature for consistent verification @@ -181,17 +225,29 @@ def verify_solution(self, problem: str, solution: str, verifier_id: int, solutio "reasoning": { "effort": "low" # Low effort for verification consistency } - } + }, ) api_duration = time.time() - api_start - logger.info(f"🔍 VERIFIER {self.agent_id}: Verification API call completed in {api_duration:.2f}s") + logger.info( + f"🔍 VERIFIER {self.agent_id}: Verification API call completed in {api_duration:.2f}s" + ) + if not response.choices: + raise ValueError( + f"LLM returned empty response in verify_solution (agent {self.agent_id})" + ) verification_text = response.choices[0].message.content.strip() # Parse verification result - assessment, confidence, issues, suggestions = self._parse_verification(verification_text) - logger.info(f"🔍 VERIFIER {self.agent_id}: Assessment: {assessment}, Confidence: {confidence:.3f}") - logger.info(f"🔍 VERIFIER {self.agent_id}: Issues found: {len(issues)}, Suggestions: {len(suggestions)}") + assessment, confidence, issues, suggestions = self._parse_verification( + verification_text + ) + logger.info( + f"🔍 VERIFIER {self.agent_id}: Assessment: {assessment}, Confidence: {confidence:.3f}" + ) + logger.info( + f"🔍 VERIFIER {self.agent_id}: Issues found: {len(issues)}, Suggestions: {len(suggestions)}" + ) if issues: logger.info(f"🔍 VERIFIER {self.agent_id}: Key issues: {issues[:2]}") @@ -203,16 +259,20 @@ def verify_solution(self, problem: str, solution: str, verifier_id: int, solutio issues=issues, suggestions=suggestions, detailed_report=verification_text, - timestamp=datetime.now() + timestamp=datetime.now(), ) total_duration = time.time() - start_time - logger.info(f"🔍 VERIFIER {self.agent_id}: ✅ Verification completed in {total_duration:.2f}s") + logger.info( + f"🔍 VERIFIER {self.agent_id}: ✅ Verification completed in {total_duration:.2f}s" + ) return result except Exception as e: error_duration = time.time() - start_time - logger.error(f"🔍 VERIFIER {self.agent_id}: ❌ Verification error after {error_duration:.2f}s: {str(e)}") + logger.error( + f"🔍 VERIFIER {self.agent_id}: ❌ Verification error after {error_duration:.2f}s: {str(e)}" + ) return VerificationResult( verifier_id=verifier_id, solution_id=f"agent_{solution_agent_id}_iter_0", @@ -221,26 +281,36 @@ def verify_solution(self, problem: str, solution: str, verifier_id: int, solutio issues=[f"Verification error: {str(e)}"], suggestions=["Retry verification"], detailed_report=f"Error during verification: {str(e)}", - timestamp=datetime.now() + timestamp=datetime.now(), ) - def improve_solution(self, problem: str, current_solution: str, feedback: str, issues: list, request_id: str = None) -> Tuple[str, int]: + def improve_solution( + self, + problem: str, + current_solution: str, + feedback: str, + issues: list, + request_id: str = None, + ) -> Tuple[str, int]: """Improve a solution based on verification feedback""" import time + start_time = time.time() logger.info(f"🔧 IMPROVER {self.agent_id}: Starting solution improvement") - logger.info(f"🔧 IMPROVER {self.agent_id}: Current solution: {len(current_solution):,} chars") + logger.info( + f"🔧 IMPROVER {self.agent_id}: Current solution: {len(current_solution):,} chars" + ) logger.info(f"🔧 IMPROVER {self.agent_id}: Issues to address: {len(issues)}") improvement_prompt = IMPROVEMENT_PROMPT.format( problem=problem, current_solution=current_solution, feedback=feedback, - issues="\n".join(f"- {issue}" for issue in issues) + issues="\n".join(f"- {issue}" for issue in issues), ) # Use simplified improvement with high effort - max_tokens = self.config['max_tokens'] + max_tokens = self.config["max_tokens"] try: api_start = time.time() @@ -249,36 +319,51 @@ def improve_solution(self, problem: str, current_solution: str, feedback: str, i model=self.model, messages=[ {"role": "system", "content": MATHEMATICAL_SYSTEM_PROMPT}, - {"role": "user", "content": improvement_prompt} + {"role": "user", "content": improvement_prompt}, ], max_tokens=max_tokens, - temperature=self.temperature * 0.8, # Slightly lower temperature for improvement + temperature=self.temperature + * 0.8, # Slightly lower temperature for improvement timeout=300, extra_body={ - "reasoning": { - "effort": "high" # High effort for improvements - } - } + "reasoning": {"effort": "high"} # High effort for improvements + }, ) api_duration = time.time() - api_start - logger.info(f"🔧 IMPROVER {self.agent_id}: Improvement API call completed in {api_duration:.2f}s") + logger.info( + f"🔧 IMPROVER {self.agent_id}: Improvement API call completed in {api_duration:.2f}s" + ) + if not response.choices: + raise ValueError( + f"LLM returned empty response in improve_solution (agent {self.agent_id})" + ) improved_solution = response.choices[0].message.content.strip() - reasoning_tokens = getattr(response.usage, 'reasoning_tokens', 0) + reasoning_tokens = getattr(response.usage, "reasoning_tokens", 0) # Log improvement analysis length_change = len(improved_solution) - len(current_solution) - logger.info(f"🔧 IMPROVER {self.agent_id}: Solution length change: {length_change:+,} chars") - logger.info(f"🔧 IMPROVER {self.agent_id}: Improved solution preview: {improved_solution[:200]}{'...' if len(improved_solution) > 200 else ''}") + logger.info( + f"🔧 IMPROVER {self.agent_id}: Solution length change: {length_change:+,} chars" + ) + logger.info( + f"🔧 IMPROVER {self.agent_id}: Improved solution preview: {improved_solution[:200]}{'...' if len(improved_solution) > 200 else ''}" + ) total_duration = time.time() - start_time - logger.info(f"🔧 IMPROVER {self.agent_id}: ✅ Solution improved in {total_duration:.2f}s with {reasoning_tokens:,} reasoning tokens") + logger.info( + f"🔧 IMPROVER {self.agent_id}: ✅ Solution improved in {total_duration:.2f}s with {reasoning_tokens:,} reasoning tokens" + ) return improved_solution, reasoning_tokens except Exception as e: error_duration = time.time() - start_time - logger.error(f"🔧 IMPROVER {self.agent_id}: ❌ Improvement error after {error_duration:.2f}s: {str(e)}") - logger.warning(f"🔧 IMPROVER {self.agent_id}: Returning original solution due to error") + logger.error( + f"🔧 IMPROVER {self.agent_id}: ❌ Improvement error after {error_duration:.2f}s: {str(e)}" + ) + logger.warning( + f"🔧 IMPROVER {self.agent_id}: Returning original solution due to error" + ) return current_solution, 0 # Return original solution if improvement fails def _estimate_confidence(self, solution: str) -> float: @@ -313,10 +398,14 @@ def _estimate_confidence(self, solution: str) -> float: uncertainty_factors.append("explicit_uncertainty") final_confidence = max(0.1, min(1.0, confidence)) - logger.debug(f"🤖 AGENT {self.agent_id}: Confidence factors: +{confidence_factors}, -{uncertainty_factors} → {final_confidence:.3f}") + logger.debug( + f"🤖 AGENT {self.agent_id}: Confidence factors: +{confidence_factors}, -{uncertainty_factors} → {final_confidence:.3f}" + ) return final_confidence - def _parse_verification(self, verification_text: str) -> Tuple[str, float, list, list]: + def _parse_verification( + self, verification_text: str + ) -> Tuple[str, float, list, list]: """Parse verification result to extract structured information""" assessment = "INCOMPLETE" # Default confidence = 0.5 @@ -338,23 +427,32 @@ def _parse_verification(self, verification_text: str) -> Tuple[str, float, list, # Extract confidence if explicitly mentioned import re - confidence_match = re.search(r'confidence.*?(\d+).*?(?:out of|/)\s*(\d+)', text_lower) + + confidence_match = re.search( + r"confidence.*?(\d+).*?(?:out of|/)\s*(\d+)", text_lower + ) if confidence_match: conf_score = float(confidence_match.group(1)) conf_total = float(confidence_match.group(2)) confidence = conf_score / conf_total # Extract issues (simple heuristic) - lines = verification_text.split('\n') + lines = verification_text.split("\n") for line in lines: line_lower = line.lower() - if any(word in line_lower for word in ['error', 'mistake', 'incorrect', 'wrong', 'issue']): + if any( + word in line_lower + for word in ["error", "mistake", "incorrect", "wrong", "issue"] + ): issues.append(line.strip()) # Extract suggestions (simple heuristic) for line in lines: line_lower = line.lower() - if any(word in line_lower for word in ['suggest', 'recommend', 'should', 'could improve']): + if any( + word in line_lower + for word in ["suggest", "recommend", "should", "could improve"] + ): suggestions.append(line.strip()) - return assessment, confidence, issues, suggestions \ No newline at end of file + return assessment, confidence, issues, suggestions diff --git a/optillm/plugins/coc_plugin.py b/optillm/plugins/coc_plugin.py index 3db92e91..83104486 100644 --- a/optillm/plugins/coc_plugin.py +++ b/optillm/plugins/coc_plugin.py @@ -1,7 +1,7 @@ """ Chain of Code (CoC) plugin for OptILLM. -This plugin implements a chain-of-code approach that combines Chain-of-Thought (CoT) +This plugin implements a chain-of-code approach that combines Chain-of-Thought (CoT) reasoning with code execution and LLM-based code simulation. SAFETY NOTE: This plugin has been refactored to use Jupyter notebook kernel execution @@ -18,12 +18,8 @@ import re import logging -from typing import Tuple, Dict, Any, List +from typing import Tuple, Any, List import ast -import traceback -import math -import importlib -import json import nbformat from nbconvert.preprocessors import ExecutePreprocessor import os @@ -38,7 +34,7 @@ MAX_FIX_ATTEMPTS = 3 # Initial code generation prompt -CHAIN_OF_CODE_PROMPT = ''' +CHAIN_OF_CODE_PROMPT = """ Write Python code to solve this problem. The code should: 1. Break down the problem into clear computational steps 2. Use standard Python features and math operations @@ -50,10 +46,10 @@ ```python [Your complete Python program here] ``` -''' +""" # Code fix prompt -CODE_FIX_PROMPT = ''' +CODE_FIX_PROMPT = """ The following Python code failed to execute. Fix the code to make it work. Original code: ```python @@ -73,10 +69,10 @@ ```python [Your fixed code here] ``` -''' +""" # Simulation prompt -SIMULATION_PROMPT = ''' +SIMULATION_PROMPT = """ The following Python code could not be executed directly. Analyze the code and determine what the answer would be. Pay special attention to: 1. The core computational logic, ignoring any visualization or display code @@ -92,11 +88,12 @@ {error} Return ONLY the final value that would be in the 'answer' variable. Return just the value, no explanations. -''' +""" + def extract_code_blocks(text: str) -> List[str]: """Extract Python code blocks from text.""" - pattern = r'```python\s*(.*?)\s*```' + pattern = r"```python\s*(.*?)\s*```" matches = re.findall(pattern, text, re.DOTALL) blocks = [m.strip() for m in matches] logger.info(f"Extracted {len(blocks)} code blocks") @@ -104,34 +101,39 @@ def extract_code_blocks(text: str) -> List[str]: logger.info(f"Code block {i+1}:\n{block}") return blocks + def sanitize_code(code: str) -> str: """Prepare code for safe execution by removing problematic visualization code.""" # Remove or modify problematic visualization code - lines = code.split('\n') + lines = code.split("\n") safe_lines = [] for line in lines: # Skip matplotlib-related imports and plotting commands that could cause issues - if any(x in line.lower() for x in ['matplotlib', 'plt.', '.plot(', '.show(', 'figure', 'subplot']): + if any( + x in line.lower() + for x in ["matplotlib", "plt.", ".plot(", ".show(", "figure", "subplot"] + ): # Replace with a comment to maintain code structure safe_lines.append(f"# {line} # Removed for safety") else: # Keep the line if it's not visualization-related safe_lines.append(line) - - return '\n'.join(safe_lines) + + return "\n".join(safe_lines) + def execute_code(code: str) -> Tuple[Any, str]: """Attempt to execute the code using Jupyter notebook kernel and return result or error.""" logger.info("Attempting to execute code in notebook kernel") logger.info(f"Code:\n{code}") - + try: # Sanitize the code first sanitized_code = sanitize_code(code) - + # Create a notebook with the code notebook = nbformat.v4.new_notebook() - + # Add code that captures the answer variable enhanced_code = f""" {sanitized_code} @@ -142,70 +144,74 @@ def execute_code(code: str) -> Tuple[Any, str]: else: print("ANSWER_RESULT: No answer variable found") """ - - notebook['cells'] = [nbformat.v4.new_code_cell(enhanced_code)] - + + notebook["cells"] = [nbformat.v4.new_code_cell(enhanced_code)] + # Convert notebook to JSON string and then to bytes notebook_json = nbformat.writes(notebook) - notebook_bytes = notebook_json.encode('utf-8') + notebook_bytes = notebook_json.encode("utf-8") # Create temporary notebook file - with tempfile.NamedTemporaryFile(mode='wb', suffix='.ipynb', delete=False) as tmp: + with tempfile.NamedTemporaryFile( + mode="wb", suffix=".ipynb", delete=False + ) as tmp: tmp.write(notebook_bytes) tmp.flush() tmp_name = tmp.name try: # Read and execute the notebook - with open(tmp_name, 'r', encoding='utf-8') as f: + with open(tmp_name, "r", encoding="utf-8") as f: nb = nbformat.read(f, as_version=4) - + # Execute with timeout and isolation - ep = ExecutePreprocessor(timeout=30, kernel_name='python3') - ep.preprocess(nb, {'metadata': {'path': './'}}) + ep = ExecutePreprocessor(timeout=30, kernel_name="python3") + ep.preprocess(nb, {"metadata": {"path": "./"}}) # Extract the output output = "" error_output = "" - + for cell in nb.cells: - if cell.cell_type == 'code' and cell.outputs: + if cell.cell_type == "code" and cell.outputs: for output_item in cell.outputs: - if output_item.output_type == 'stream': - if output_item.name == 'stdout': + if output_item.output_type == "stream": + if output_item.name == "stdout": output += output_item.text - elif output_item.name == 'stderr': + elif output_item.name == "stderr": error_output += output_item.text - elif output_item.output_type == 'execute_result': - output += str(output_item.data.get('text/plain', '')) - elif output_item.output_type == 'error': + elif output_item.output_type == "execute_result": + output += str(output_item.data.get("text/plain", "")) + elif output_item.output_type == "error": error_output += f"{output_item.ename}: {output_item.evalue}" - + # Check for errors first if error_output: logger.error(f"Execution failed: {error_output}") return None, error_output - + # Parse the answer from output output = output.strip() - + # Look for our special ANSWER_RESULT marker if "ANSWER_RESULT:" in output: - answer_line = [line for line in output.split('\n') if 'ANSWER_RESULT:' in line][-1] - answer_str = answer_line.split('ANSWER_RESULT:', 1)[1].strip() - + answer_line = [ + line for line in output.split("\n") if "ANSWER_RESULT:" in line + ][-1] + answer_str = answer_line.split("ANSWER_RESULT:", 1)[1].strip() + if answer_str == "No answer variable found": error = "Code executed but did not produce an answer variable" logger.warning(error) return None, error - + try: # Try to evaluate the answer to convert it to proper type answer = ast.literal_eval(answer_str) except (ValueError, SyntaxError): # If literal_eval fails, keep as string answer = answer_str - + logger.info(f"Execution successful. Answer: {answer}") return answer, None else: @@ -217,37 +223,44 @@ def execute_code(code: str) -> Tuple[Any, str]: error = "Code executed but produced no output" logger.warning(error) return None, error - + finally: # Clean up temporary file try: os.unlink(tmp_name) except: pass - + except Exception as e: error = f"Notebook execution failed: {str(e)}" logger.error(error) return None, error -def generate_fixed_code(original_code: str, error: str, client, model: str) -> Tuple[str, int]: + +def generate_fixed_code( + original_code: str, error: str, client, model: str +) -> Tuple[str, int]: """Ask LLM to fix the broken code.""" logger.info("Requesting code fix from LLM") logger.info(f"Original error: {error}") - + response = client.chat.completions.create( model=model, messages=[ - {"role": "system", "content": CODE_FIX_PROMPT.format( - code=original_code, error=error)}, - {"role": "user", "content": "Fix the code to make it work."} + { + "role": "system", + "content": CODE_FIX_PROMPT.format(code=original_code, error=error), + }, + {"role": "user", "content": "Fix the code to make it work."}, ], - temperature=0.2 + temperature=0.2, ) - + + if not response.choices: + raise ValueError("LLM returned empty response in generate_fixed_code") fixed_code = response.choices[0].message.content code_blocks = extract_code_blocks(fixed_code) - + if code_blocks: logger.info("Received fixed code from LLM") return code_blocks[0], response.usage.completion_tokens @@ -255,21 +268,29 @@ def generate_fixed_code(original_code: str, error: str, client, model: str) -> T logger.warning("No code block found in LLM response") return None, response.usage.completion_tokens + def simulate_execution(code: str, error: str, client, model: str) -> Tuple[Any, int]: """Ask LLM to simulate code execution.""" logger.info("Attempting code simulation with LLM") - + response = client.chat.completions.create( model=model, messages=[ - {"role": "system", "content": SIMULATION_PROMPT.format( - code=code, error=error)}, - {"role": "user", "content": "Simulate this code and return the final answer value."} + { + "role": "system", + "content": SIMULATION_PROMPT.format(code=code, error=error), + }, + { + "role": "user", + "content": "Simulate this code and return the final answer value.", + }, ], - temperature=0.2 + temperature=0.2, ) - + try: + if not response.choices: + raise ValueError("LLM returned empty response in simulate_execution") result = response.choices[0].message.content.strip() # Try to convert to appropriate type try: @@ -282,25 +303,26 @@ def simulate_execution(code: str, error: str, client, model: str) -> Tuple[Any, logger.error(f"Failed to parse simulation result: {str(e)}") return None, response.usage.completion_tokens + def run(system_prompt: str, initial_query: str, client, model: str) -> Tuple[str, int]: """Main Chain of Code execution function.""" logger.info("Starting Chain of Code execution") logger.info(f"Query: {initial_query}") - + # Initial code generation messages = [ {"role": "system", "content": system_prompt + "\n" + CHAIN_OF_CODE_PROMPT}, - {"role": "user", "content": initial_query} + {"role": "user", "content": initial_query}, ] - + response = client.chat.completions.create( - model=model, - messages=messages, - temperature=0.7 + model=model, messages=messages, temperature=0.7 ) total_tokens = response.usage.completion_tokens - + # Extract initial code + if not response.choices: + raise ValueError("LLM returned empty response in run (initial code generation)") code_blocks = extract_code_blocks(response.choices[0].message.content) if not code_blocks: logger.warning("No code blocks found in response") @@ -309,47 +331,52 @@ def run(system_prompt: str, initial_query: str, client, model: str) -> Tuple[str current_code = code_blocks[0] fix_attempts = 0 last_error = None - + # Strategy 1: Direct execution and fix attempts while fix_attempts < MAX_FIX_ATTEMPTS: fix_attempts += 1 logger.info(f"Execution attempt {fix_attempts}/{MAX_FIX_ATTEMPTS}") - + # Try to execute current code answer, error = execute_code(current_code) - + # If successful, return the answer if error is None: logger.info(f"Successful execution on attempt {fix_attempts}") return str(answer), total_tokens - + last_error = error - + # If we hit max attempts, break to try simulation if fix_attempts >= MAX_FIX_ATTEMPTS: logger.warning(f"Failed after {fix_attempts} fix attempts") break - + # Otherwise, try to get fixed code from LLM logger.info(f"Requesting code fix, attempt {fix_attempts}") fixed_code, fix_tokens = generate_fixed_code(current_code, error, client, model) total_tokens += fix_tokens - + if fixed_code: current_code = fixed_code else: logger.error("Failed to get fixed code from LLM") break - + # Strategy 2: If all execution attempts failed, try simulation logger.info("All execution attempts failed, trying simulation") - simulated_answer, sim_tokens = simulate_execution(current_code, last_error, client, model) + simulated_answer, sim_tokens = simulate_execution( + current_code, last_error, client, model + ) total_tokens += sim_tokens - + if simulated_answer is not None: logger.info("Successfully got answer from simulation") return str(simulated_answer), total_tokens - + # If we get here, everything failed logger.warning("All strategies failed") - return f"Error: Could not solve problem after all attempts. Last error: {last_error}", total_tokens \ No newline at end of file + return ( + f"Error: Could not solve problem after all attempts. Last error: {last_error}", + total_tokens, + ) diff --git a/optillm/plugins/deep_research/research_engine.py b/optillm/plugins/deep_research/research_engine.py index d36cb6bb..da7decca 100644 --- a/optillm/plugins/deep_research/research_engine.py +++ b/optillm/plugins/deep_research/research_engine.py @@ -8,130 +8,133 @@ through denoising and retrieval, generating comprehensive research reports. """ -import asyncio -import json import re -from typing import Tuple, List, Dict, Optional, Any +from typing import Tuple, List, Dict, Any from datetime import datetime -from collections import defaultdict -from optillm.plugins.web_search_plugin import run as web_search_run, BrowserSessionManager +from optillm.plugins.web_search_plugin import run as web_search_run from optillm.plugins.readurls_plugin import run as readurls_run -from optillm.plugins.deep_research.session_state import get_session_manager, close_session +from optillm.plugins.deep_research.session_state import ( + get_session_manager, + close_session, +) import uuid def clean_reasoning_tags(text: str) -> str: """ Remove reasoning tags from model responses for clean final output. - + Removes common reasoning tags like: - - - - - + Args: text: Raw model response text - + Returns: Cleaned text with reasoning tags removed """ if not text: return text - + # List of reasoning tag patterns to remove reasoning_patterns = [ - r'.*?', - r'.*?', - r'.*?', - r'.*?', - r'.*?', - r'.*?', + r".*?", + r".*?", + r".*?", + r".*?", + r".*?", + r".*?", ] - + cleaned_text = text for pattern in reasoning_patterns: # Use DOTALL flag to match across newlines - cleaned_text = re.sub(pattern, '', cleaned_text, flags=re.DOTALL | re.IGNORECASE) - + cleaned_text = re.sub( + pattern, "", cleaned_text, flags=re.DOTALL | re.IGNORECASE + ) + # Clean up any extra whitespace left behind, but preserve markdown formatting - cleaned_text = re.sub(r'\n\s*\n\s*\n+', '\n\n', cleaned_text) # Multiple empty lines to double - cleaned_text = re.sub(r' +', ' ', cleaned_text) # Multiple spaces to single space (but preserve intentional double spaces) + cleaned_text = re.sub( + r"\n\s*\n\s*\n+", "\n\n", cleaned_text + ) # Multiple empty lines to double + cleaned_text = re.sub( + r" +", " ", cleaned_text + ) # Multiple spaces to single space (but preserve intentional double spaces) cleaned_text = cleaned_text.strip() - + return cleaned_text def cleanup_placeholder_tags(text: str) -> str: """ Remove any remaining placeholder tags from the final report. - + This is a final cleanup step to ensure no incomplete research tags remain in the published report. - + Args: text: Research report text - + Returns: Text with all placeholder tags removed """ if not text: return text - + # Comprehensive patterns for research placeholder tags placeholder_patterns = [ # Research placeholders - r'\[NEEDS RESEARCH[^\]]*\]', - r'\[SOURCE NEEDED[^\]]*\]', - r'\[RESEARCH NEEDED[^\]]*\]', - r'\[CITATION NEEDED[^\]]*\]', - r'\[MORE RESEARCH NEEDED[^\]]*\]', - r'\[REQUIRES INVESTIGATION[^\]]*\]', - r'\[TO BE RESEARCHED[^\]]*\]', - r'\[VERIFY[^\]]*\]', - r'\[CHECK[^\]]*\]', - + r"\[NEEDS RESEARCH[^\]]*\]", + r"\[SOURCE NEEDED[^\]]*\]", + r"\[RESEARCH NEEDED[^\]]*\]", + r"\[CITATION NEEDED[^\]]*\]", + r"\[MORE RESEARCH NEEDED[^\]]*\]", + r"\[REQUIRES INVESTIGATION[^\]]*\]", + r"\[TO BE RESEARCHED[^\]]*\]", + r"\[VERIFY[^\]]*\]", + r"\[CHECK[^\]]*\]", # Citation placeholders (like your example) - r'\[Placeholder for[^\]]+\]', - r'\[\d+\]\s*\[Placeholder[^\]]+\]', - r'\[Insert citation[^\]]*\]', - r'\[Add reference[^\]]*\]', - r'\[Reference needed[^\]]*\]', - + r"\[Placeholder for[^\]]+\]", + r"\[\d+\]\s*\[Placeholder[^\]]+\]", + r"\[Insert citation[^\]]*\]", + r"\[Add reference[^\]]*\]", + r"\[Reference needed[^\]]*\]", # Content placeholders - r'\[To be completed[^\]]*\]', - r'\[Under development[^\]]*\]', - r'\[Coming soon[^\]]*\]', - r'\[TBD[^\]]*\]', - r'\[TODO[^\]]*\]', - + r"\[To be completed[^\]]*\]", + r"\[Under development[^\]]*\]", + r"\[Coming soon[^\]]*\]", + r"\[TBD[^\]]*\]", + r"\[TODO[^\]]*\]", # Question placeholders and incomplete sections - r'\[Question \d+[^\]]*\]', - r'\[Research question[^\]]*\]', + r"\[Question \d+[^\]]*\]", + r"\[Research question[^\]]*\]", ] - + cleaned_text = text for pattern in placeholder_patterns: # Remove the placeholder tags - cleaned_text = re.sub(pattern, '', cleaned_text, flags=re.IGNORECASE) - + cleaned_text = re.sub(pattern, "", cleaned_text, flags=re.IGNORECASE) + # Also remove any sentences that are entirely placeholder-based - lines = cleaned_text.split('\n') + lines = cleaned_text.split("\n") filtered_lines = [] - + for line in lines: # Skip lines that are mostly just removed placeholders (now empty or just punctuation) stripped = line.strip() - if stripped and not re.match(r'^[\s\-\*\.\,\;\:]*$', stripped): + if stripped and not re.match(r"^[\s\-\*\.\,\;\:]*$", stripped): filtered_lines.append(line) elif not stripped: # Keep empty lines for formatting filtered_lines.append(line) - + # Rejoin and clean up extra whitespace - result = '\n'.join(filtered_lines) - result = re.sub(r'\n\s*\n\s*\n+', '\n\n', result) # Multiple empty lines to double + result = "\n".join(filtered_lines) + result = re.sub(r"\n\s*\n\s*\n+", "\n\n", result) # Multiple empty lines to double result = result.strip() - + return result @@ -155,7 +158,7 @@ def validate_citation_usage(text: str, total_citations: int) -> Dict[str, Any]: "citations_total": total_citations, "usage_percentage": 0.0, "unused_citations": list(range(1, total_citations + 1)), - "warning": "Empty report text" + "warning": "Empty report text", } # Find all citation references in the body text (before References section) @@ -163,7 +166,7 @@ def validate_citation_usage(text: str, total_citations: int) -> Dict[str, Any]: # Extract all citation numbers from body text using regex citations_in_body = set() - citation_pattern = r'\[(\d+)\]' + citation_pattern = r"\[(\d+)\]" for match in re.finditer(citation_pattern, body_text): citation_num = int(match.group(1)) citations_in_body.add(citation_num) @@ -172,21 +175,27 @@ def validate_citation_usage(text: str, total_citations: int) -> Dict[str, Any]: all_citations = set(range(1, total_citations + 1)) unused_citations = sorted(all_citations - citations_in_body) - usage_percentage = (len(citations_in_body) / total_citations * 100) if total_citations > 0 else 0 + usage_percentage = ( + (len(citations_in_body) / total_citations * 100) if total_citations > 0 else 0 + ) result = { "citations_used": len(citations_in_body), "citations_total": total_citations, "usage_percentage": usage_percentage, "unused_citations": unused_citations, - "used_citations": sorted(citations_in_body) + "used_citations": sorted(citations_in_body), } # Add warnings if citation usage is low if usage_percentage < 30: - result["warning"] = f"Very low citation usage ({usage_percentage:.1f}%). Most sources are not cited in the text." + result["warning"] = ( + f"Very low citation usage ({usage_percentage:.1f}%). Most sources are not cited in the text." + ) elif usage_percentage < 50: - result["warning"] = f"Low citation usage ({usage_percentage:.1f}%). Many sources are not cited in the text." + result["warning"] = ( + f"Low citation usage ({usage_percentage:.1f}%). Many sources are not cited in the text." + ) return result @@ -194,79 +203,99 @@ def validate_citation_usage(text: str, total_citations: int) -> Dict[str, Any]: def validate_report_completeness(text: str) -> Dict[str, Any]: """ Validate that the research report is complete and ready for publication. - + Checks for: - Placeholder citations - Incomplete sections - Unfinished research questions - Missing content indicators - + Returns: Dict with validation results and suggestions for fixes """ if not text: return {"is_complete": False, "issues": ["Empty report"], "suggestions": []} - + issues = [] suggestions = [] - + # Check for placeholder citations placeholder_citation_patterns = [ - r'\[Placeholder for[^\]]+\]', - r'\[\d+\]\s*\[Placeholder[^\]]+\]', - r'\[Insert citation[^\]]*\]', - r'\[Reference needed[^\]]*\]', + r"\[Placeholder for[^\]]+\]", + r"\[\d+\]\s*\[Placeholder[^\]]+\]", + r"\[Insert citation[^\]]*\]", + r"\[Reference needed[^\]]*\]", ] - + for pattern in placeholder_citation_patterns: matches = re.findall(pattern, text, re.IGNORECASE) if matches: issues.append(f"Found {len(matches)} placeholder citations: {matches[:3]}") - suggestions.append("Replace placeholder citations with actual sources or remove incomplete claims") - + suggestions.append( + "Replace placeholder citations with actual sources or remove incomplete claims" + ) + # Check for incomplete research questions sections if "Research Questions for Investigation" in text: # Look for sections that seem to be lists of questions without answers - question_section_match = re.search(r'## Research Questions for Investigation.*?(?=##|$)', text, re.DOTALL) + question_section_match = re.search( + r"## Research Questions for Investigation.*?(?=##|$)", text, re.DOTALL + ) if question_section_match: question_content = question_section_match.group(0) # Count questions vs answers - question_lines = [line for line in question_content.split('\n') if line.strip().startswith('*') or line.strip().startswith('-')] + question_lines = [ + line + for line in question_content.split("\n") + if line.strip().startswith("*") or line.strip().startswith("-") + ] if len(question_lines) > 3: # Many unanswered questions issues.append("Report contains unanswered research questions section") - suggestions.append("Convert research questions into answered findings or remove incomplete section") - + suggestions.append( + "Convert research questions into answered findings or remove incomplete section" + ) + # Check for incomplete sections (sections with only placeholders) - section_pattern = r'##\s+([^#\n]+)\n(.*?)(?=##|$)' + section_pattern = r"##\s+([^#\n]+)\n(.*?)(?=##|$)" sections = re.findall(section_pattern, text, re.DOTALL) - + for section_title, section_content in sections: # Check if section is mostly placeholders - placeholder_count = len(re.findall(r'\[[^\]]*(?:placeholder|needed|research|todo|tbd)[^\]]*\]', section_content, re.IGNORECASE)) - content_lines = [line.strip() for line in section_content.split('\n') if line.strip()] - + placeholder_count = len( + re.findall( + r"\[[^\]]*(?:placeholder|needed|research|todo|tbd)[^\]]*\]", + section_content, + re.IGNORECASE, + ) + ) + content_lines = [ + line.strip() for line in section_content.split("\n") if line.strip() + ] + if placeholder_count > len(content_lines) / 3: # More than 1/3 placeholders issues.append(f"Section '{section_title.strip()}' is mostly placeholders") - suggestions.append(f"Complete content for '{section_title.strip()}' section or remove it") - + suggestions.append( + f"Complete content for '{section_title.strip()}' section or remove it" + ) + # Check for incomplete reference lists - if text.count('[') - text.count(']') != 0: + if text.count("[") - text.count("]") != 0: issues.append("Unmatched brackets detected - possible incomplete citations") suggestions.append("Review and fix citation formatting") - + # Check for very short sections that might be incomplete if len(text.split()) < 500: # Very short report issues.append("Report appears to be very short, possibly incomplete") suggestions.append("Ensure all research areas are adequately covered") - + is_complete = len(issues) == 0 - + return { "is_complete": is_complete, "issues": issues, "suggestions": suggestions, "word_count": len(text.split()), - "section_count": len(sections) + "section_count": len(sections), } @@ -289,22 +318,22 @@ class DeepResearcher: Based on: https://arxiv.org/abs/2507.16075v1 """ - - def __init__(self, client, model: str, max_iterations: int = 5, max_sources: int = 30): + + def __init__( + self, client, model: str, max_iterations: int = 5, max_sources: int = 30 + ): self.client = client self.model = model self.max_iterations = max_iterations self.max_sources = max_sources self.session_id = str(uuid.uuid4()) # Unique session ID for this research self.session_manager = None # Will be set when research starts - self.research_state = { - "iteration": 0 # Track current iteration for metadata - } + self.research_state = {"iteration": 0} # Track current iteration for metadata self.total_tokens = 0 self.citations = {} # Map citation number to source info self.citation_counter = 0 self.source_content_map = {} # Map URL to content for citations - + # TTD-DR specific components self.current_draft = "" # Persistent evolving draft self.draft_history = [] # Track draft evolution @@ -312,59 +341,57 @@ def __init__(self, client, model: str, max_iterations: int = 5, max_sources: int "search_strategy": 1.0, "synthesis_quality": 1.0, "gap_detection": 1.0, - "integration_ability": 1.0 + "integration_ability": 1.0, } self.gap_analysis_history = [] # Track identified gaps over time self.session_manager = None # Browser session manager for web searches - + def cleanup_placeholder_tags(self, text: str) -> str: """ Remove any remaining placeholder tags from the final report. - + This is a final cleanup step to ensure no incomplete research tags remain in the published report. - + Args: text: Research report text - + Returns: Text with all placeholder tags removed """ return cleanup_placeholder_tags(text) - - def fix_incomplete_report(self, report: str, validation: Dict[str, Any], original_query: str) -> str: + + def fix_incomplete_report( + self, report: str, validation: Dict[str, Any], original_query: str + ) -> str: """ Attempt to fix an incomplete report by removing problematic sections and ensuring a coherent final document. - + This is a fallback when the report contains placeholders or incomplete sections. """ print("🔧 Attempting to fix incomplete report...") - + # Start with the basic cleanup fixed_report = cleanup_placeholder_tags(report) - + # Remove sections that are mostly placeholders or incomplete if "Research Questions for Investigation" in fixed_report: # Remove unanswered research questions sections fixed_report = re.sub( - r'## Research Questions for Investigation.*?(?=##|$)', - '', - fixed_report, - flags=re.DOTALL + r"## Research Questions for Investigation.*?(?=##|$)", + "", + fixed_report, + flags=re.DOTALL, ) print(" - Removed incomplete research questions section") - - # Remove citation placeholders from reference section - fixed_report = re.sub( - r'\[\d+\]\s*\[Placeholder[^\]]+\]\n?', - '', - fixed_report - ) - + + # Remove citation placeholders from reference section + fixed_report = re.sub(r"\[\d+\]\s*\[Placeholder[^\]]+\]\n?", "", fixed_report) + # Clean up any empty sections - fixed_report = re.sub(r'##\s+([^#\n]+)\n\s*(?=##)', '', fixed_report) - + fixed_report = re.sub(r"##\s+([^#\n]+)\n\s*(?=##)", "", fixed_report) + # If report is still very short, add a completion note if len(fixed_report.split()) < 300: completion_note = f""" @@ -377,25 +404,29 @@ def fix_incomplete_report(self, report: str, validation: Dict[str, Any], origina """ # Insert before references section if it exists if "## References" in fixed_report: - fixed_report = fixed_report.replace("## References", completion_note + "\n## References") + fixed_report = fixed_report.replace( + "## References", completion_note + "\n## References" + ) else: fixed_report += completion_note - + print(" - Added completion note due to short report length") - + # Final cleanup - fixed_report = re.sub(r'\n\s*\n\s*\n+', '\n\n', fixed_report) + fixed_report = re.sub(r"\n\s*\n\s*\n+", "\n\n", fixed_report) fixed_report = fixed_report.strip() - + # Validate the fix new_validation = validate_report_completeness(fixed_report) if new_validation["is_complete"]: print("✅ Report successfully fixed and validated") else: - print(f"⚠️ Report still has {len(new_validation['issues'])} issues after fixing") - + print( + f"⚠️ Report still has {len(new_validation['issues'])} issues after fixing" + ) + return fixed_report - + def decompose_query(self, system_prompt: str, initial_query: str) -> List[str]: """ Decompose complex research query into focused sub-queries @@ -414,83 +445,95 @@ def decompose_query(self, system_prompt: str, initial_query: str) -> List[str]: Make each sub-query specific and searchable. Focus on different aspects of the main topic. """ - + try: response = self.client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": system_prompt}, - {"role": "user", "content": decomposition_prompt} + {"role": "user", "content": decomposition_prompt}, ], temperature=0.7, - max_tokens=1000 + max_tokens=1000, ) - + + if not response.choices: + raise ValueError("LLM returned empty response in decompose_query") content = response.choices[0].message.content.strip() # Clean reasoning tags from query decomposition response content = clean_reasoning_tags(content) self.total_tokens += response.usage.completion_tokens - + # Extract numbered queries queries = [] - for line in content.split('\n'): + for line in content.split("\n"): line = line.strip() - if re.match(r'^\d+\.', line): - query = re.sub(r'^\d+\.\s*\[?(.*?)\]?$', r'\1', line).strip() + if re.match(r"^\d+\.", line): + query = re.sub(r"^\d+\.\s*\[?(.*?)\]?$", r"\1", line).strip() if query: queries.append(query) - + return queries[:5] # Limit to 5 sub-queries - - except Exception as e: + + except Exception: # Fallback: use original query return [initial_query] - + def perform_web_search(self, queries: List[str]) -> str: """ Perform web search for multiple queries using the web_search plugin """ all_results = [] - + # Check if session manager is available - if not hasattr(self, 'session_manager') or self.session_manager is None: + if not hasattr(self, "session_manager") or self.session_manager is None: # Log warning - this shouldn't happen in normal flow - print(f"⚠️ Warning: session_manager not available in perform_web_search (session_id: {getattr(self, 'session_id', 'N/A')})") + print( + f"⚠️ Warning: session_manager not available in perform_web_search (session_id: {getattr(self, 'session_id', 'N/A')})" + ) self.session_manager = None else: - print(f"📊 Using existing session manager for web search (session_id: {self.session_id}, manager: {id(self.session_manager)})") - + print( + f"📊 Using existing session manager for web search (session_id: {self.session_id}, manager: {id(self.session_manager)})" + ) + # Perform individual searches for each query to avoid truncation issues for i, query in enumerate(queries): try: # Format as a clean search query search_query = f"search for {query.strip()}" - + # Perform search with reduced results per query to stay within limits results_per_query = max(1, self.max_sources // len(queries)) - - enhanced_query, _ = web_search_run("", search_query, None, None, { - "num_results": results_per_query, - "delay_seconds": None, # Use default random delay (4-32 seconds) - "headless": False, # Allow CAPTCHA solving if needed - "session_manager": self.session_manager # Use shared browser session - }) - + + enhanced_query, _ = web_search_run( + "", + search_query, + None, + None, + { + "num_results": results_per_query, + "delay_seconds": None, # Use default random delay (4-32 seconds) + "headless": False, # Allow CAPTCHA solving if needed + "session_manager": self.session_manager, # Use shared browser session + }, + ) + if enhanced_query and "Web Search Results" in enhanced_query: all_results.append(enhanced_query) - + except Exception as e: # Continue with other queries if one fails all_results.append(f"Search failed for query '{query}': {str(e)}") continue - + if not all_results: return "Web search failed: No results obtained from any query" - + # Combine all search results combined_results = "\n\n".join(all_results) return combined_results - + def extract_and_fetch_urls(self, search_results: str) -> Tuple[str, List[Dict]]: """ Extract URLs from search results and fetch their content using readurls plugin @@ -499,53 +542,56 @@ def extract_and_fetch_urls(self, search_results: str) -> Tuple[str, List[Dict]]: try: # First extract URLs and metadata from search results sources = [] - + # Pattern to match search result blocks - result_pattern = r'(\d+)\.\s*\*\*(.+?)\*\*\s*\n\s*URL:\s*(.+?)\n' + result_pattern = r"(\d+)\.\s*\*\*(.+?)\*\*\s*\n\s*URL:\s*(.+?)\n" matches = re.findall(result_pattern, search_results, re.MULTILINE) - + for match in matches: source = { - 'number': match[0], - 'title': match[1].strip(), - 'url': match[2].strip(), - 'access_date': datetime.now().strftime('%Y-%m-%d') + "number": match[0], + "title": match[1].strip(), + "url": match[2].strip(), + "access_date": datetime.now().strftime("%Y-%m-%d"), } sources.append(source) - + # If regex doesn't work, try line-by-line parsing if not sources: - lines = search_results.split('\n') + lines = search_results.split("\n") current_source = {} - + for i, line in enumerate(lines): # Check for numbered item with title - title_match = re.match(r'^(\d+)\.\s*\*\*(.+?)\*\*', line.strip()) + title_match = re.match(r"^(\d+)\.\s*\*\*(.+?)\*\*", line.strip()) if title_match: - if current_source and 'url' in current_source: + if current_source and "url" in current_source: sources.append(current_source) current_source = { - 'number': title_match.group(1), - 'title': title_match.group(2).strip() + "number": title_match.group(1), + "title": title_match.group(2).strip(), } # Check for URL line - elif line.strip().startswith('URL:') and current_source: + elif line.strip().startswith("URL:") and current_source: url = line.strip()[4:].strip() - current_source['url'] = url - current_source['access_date'] = datetime.now().strftime('%Y-%m-%d') - - if current_source and 'url' in current_source: + current_source["url"] = url + current_source["access_date"] = datetime.now().strftime( + "%Y-%m-%d" + ) + + if current_source and "url" in current_source: sources.append(current_source) - + # Fetch content for all URLs content_with_urls, _ = readurls_run("", search_results, None, None) - + return content_with_urls, sources except Exception as e: return f"URL fetching failed: {str(e)}", [] - - - def evaluate_completeness(self, system_prompt: str, query: str, current_synthesis: str) -> Tuple[bool, List[str]]: + + def evaluate_completeness( + self, system_prompt: str, query: str, current_synthesis: str + ) -> Tuple[bool, List[str]]: """ Evaluate if the current research is complete or needs more information Returns (is_complete, list_of_missing_aspects) @@ -565,39 +611,47 @@ def evaluate_completeness(self, system_prompt: str, query: str, current_synthesi COMPLETE: [YES/NO] MISSING: [list any missing aspects, one per line, or "None" if complete] """ - + try: response = self.client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": system_prompt}, - {"role": "user", "content": evaluation_prompt} + {"role": "user", "content": evaluation_prompt}, ], temperature=0.3, - max_tokens=500 + max_tokens=500, ) - + + if not response.choices: + raise ValueError("LLM returned empty response in evaluate_completeness") content = response.choices[0].message.content.strip() # Clean reasoning tags from completeness evaluation response content = clean_reasoning_tags(content) self.total_tokens += response.usage.completion_tokens - + # Parse response is_complete = "COMPLETE: YES" in content.upper() - + missing_aspects = [] if "MISSING:" in content.upper(): missing_section = content.split("MISSING:")[-1].strip() if missing_section.upper() != "NONE": - missing_aspects = [line.strip() for line in missing_section.split('\n') if line.strip()] - + missing_aspects = [ + line.strip() + for line in missing_section.split("\n") + if line.strip() + ] + return is_complete, missing_aspects - - except Exception as e: + + except Exception: # Default to not complete on error return False, ["Error in evaluation"] - - def generate_focused_queries(self, missing_aspects: List[str], original_query: str) -> List[str]: + + def generate_focused_queries( + self, missing_aspects: List[str], original_query: str + ) -> List[str]: """ Generate focused search queries to address missing aspects """ @@ -606,9 +660,9 @@ def generate_focused_queries(self, missing_aspects: List[str], original_query: s # Create a focused query combining the original topic with the missing aspect focused_query = f"{original_query} {aspect}" focused_queries.append(focused_query) - + return focused_queries[:3] # Limit to 3 additional queries per iteration - + def generate_preliminary_draft(self, system_prompt: str, initial_query: str) -> str: """ Generate the preliminary draft (updatable skeleton) from LLM internal knowledge @@ -640,28 +694,34 @@ def generate_preliminary_draft(self, system_prompt: str, initial_query: str) -> Include AT LEAST 5-10 [NEEDS RESEARCH] or [SOURCE NEEDED] tags throughout the draft. Be explicit about what you don't know and what needs external validation. """ - + try: response = self.client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": system_prompt}, - {"role": "user", "content": draft_prompt} + {"role": "user", "content": draft_prompt}, ], temperature=0.7, - max_tokens=2000 + max_tokens=2000, ) - + + if not response.choices: + raise ValueError( + "LLM returned empty response in generate_preliminary_draft" + ) draft = response.choices[0].message.content.strip() draft = clean_reasoning_tags(draft) self.total_tokens += response.usage.completion_tokens - + return draft - + except Exception as e: return f"Failed to generate preliminary draft: {str(e)}" - - def analyze_draft_gaps(self, current_draft: str, original_query: str) -> List[Dict[str, str]]: + + def analyze_draft_gaps( + self, current_draft: str, original_query: str + ) -> List[Dict[str, str]]: """ Analyze the current draft to identify gaps, weaknesses, and areas needing research This guides the next retrieval iteration (draft-guided search) @@ -702,105 +762,128 @@ def analyze_draft_gaps(self, current_draft: str, original_query: str) -> List[Di Even well-written sections can benefit from additional evidence, examples, or perspectives. Push for depth, accuracy, and comprehensiveness in the research. """ - + try: response = self.client.chat.completions.create( model=self.model, messages=[ - {"role": "system", "content": "You are an expert research analyst."}, - {"role": "user", "content": gap_analysis_prompt} + { + "role": "system", + "content": "You are an expert research analyst.", + }, + {"role": "user", "content": gap_analysis_prompt}, ], temperature=0.3, - max_tokens=1000 + max_tokens=1000, ) - + + if not response.choices: + raise ValueError("LLM returned empty response in analyze_draft_gaps") content = response.choices[0].message.content.strip() content = clean_reasoning_tags(content) self.total_tokens += response.usage.completion_tokens - + # Parse the gaps gaps = [] current_gap = {} - - for line in content.split('\n'): + + for line in content.split("\n"): line = line.strip() - if line.startswith('GAP_ID:'): + if line.startswith("GAP_ID:"): if current_gap: gaps.append(current_gap) - current_gap = {'id': line.split(':', 1)[1].strip()} - elif line.startswith('SECTION:'): - current_gap['section'] = line.split(':', 1)[1].strip() - elif line.startswith('GAP_TYPE:'): - current_gap['gap_type'] = line.split(':', 1)[1].strip() - elif line.startswith('SPECIFIC_NEED:'): - current_gap['specific_need'] = line.split(':', 1)[1].strip() - elif line.startswith('SEARCH_QUERY:'): - current_gap['search_query'] = line.split(':', 1)[1].strip() - elif line.startswith('PRIORITY:'): - current_gap['priority'] = line.split(':', 1)[1].strip() - + current_gap = {"id": line.split(":", 1)[1].strip()} + elif line.startswith("SECTION:"): + current_gap["section"] = line.split(":", 1)[1].strip() + elif line.startswith("GAP_TYPE:"): + current_gap["gap_type"] = line.split(":", 1)[1].strip() + elif line.startswith("SPECIFIC_NEED:"): + current_gap["specific_need"] = line.split(":", 1)[1].strip() + elif line.startswith("SEARCH_QUERY:"): + current_gap["search_query"] = line.split(":", 1)[1].strip() + elif line.startswith("PRIORITY:"): + current_gap["priority"] = line.split(":", 1)[1].strip() + if current_gap: gaps.append(current_gap) - + return gaps - - except Exception as e: + + except Exception: # Fallback: create basic gaps from the draft - return [{ - 'id': '1', - 'section': 'General', - 'gap_type': 'MISSING_INFO', - 'specific_need': 'More detailed information needed', - 'search_query': original_query - }] - + return [ + { + "id": "1", + "section": "General", + "gap_type": "MISSING_INFO", + "specific_need": "More detailed information needed", + "search_query": original_query, + } + ] + def perform_gap_targeted_search(self, gaps: List[Dict[str, str]]) -> str: """ Perform targeted searches based on identified gaps in the current draft Prioritizes HIGH priority gaps (placeholder tags) first """ all_results = [] - + # Check if session manager is available - if not hasattr(self, 'session_manager') or self.session_manager is None: + if not hasattr(self, "session_manager") or self.session_manager is None: # Log warning - this shouldn't happen in normal flow print("⚠️ Warning: session_manager not available in perform_web_search") self.session_manager = None - + # Sort gaps by priority - HIGH priority first (placeholder tags) - sorted_gaps = sorted(gaps, key=lambda g: ( - 0 if g.get('priority', '').upper() == 'HIGH' else - 1 if g.get('priority', '').upper() == 'MEDIUM' else 2 - )) - + sorted_gaps = sorted( + gaps, + key=lambda g: ( + 0 + if g.get("priority", "").upper() == "HIGH" + else 1 if g.get("priority", "").upper() == "MEDIUM" else 2 + ), + ) + for gap in sorted_gaps: - search_query = gap.get('search_query', '') + search_query = gap.get("search_query", "") if not search_query: continue - + try: # Format as a clean search query search_query = f"search for {search_query.strip()}" - + # Perform search with context about what gap we're filling - enhanced_query, _ = web_search_run("", search_query, None, None, { - "num_results": max(1, self.max_sources // len(gaps)), - "delay_seconds": None, # Use default random delay (4-32 seconds) - "headless": False, - "session_manager": self.session_manager # Use shared browser session - }) - + enhanced_query, _ = web_search_run( + "", + search_query, + None, + None, + { + "num_results": max(1, self.max_sources // len(gaps)), + "delay_seconds": None, # Use default random delay (4-32 seconds) + "headless": False, + "session_manager": self.session_manager, # Use shared browser session + }, + ) + if enhanced_query and "Web Search Results" in enhanced_query: # Tag results with gap context gap_context = f"[ADDRESSING GAP: {gap.get('section', 'Unknown')} - {gap.get('specific_need', 'General research')}]\n" all_results.append(gap_context + enhanced_query) - - except Exception as e: + + except Exception: continue - - return "\n\n".join(all_results) if all_results else "No gap-targeted search results obtained" - - def denoise_draft_with_retrieval(self, current_draft: str, retrieval_content: str, original_query: str) -> str: + + return ( + "\n\n".join(all_results) + if all_results + else "No gap-targeted search results obtained" + ) + + def denoise_draft_with_retrieval( + self, current_draft: str, retrieval_content: str, original_query: str + ) -> str: """ Core denoising step: integrate retrieved information with current draft This is the heart of the diffusion process @@ -847,28 +930,37 @@ def denoise_draft_with_retrieval(self, current_draft: str, retrieval_content: st Return the improved draft with integrated information and comprehensive citations. """ - + try: response = self.client.chat.completions.create( model=self.model, messages=[ - {"role": "system", "content": "You are an expert research synthesizer performing draft denoising."}, - {"role": "user", "content": denoising_prompt} + { + "role": "system", + "content": "You are an expert research synthesizer performing draft denoising.", + }, + {"role": "user", "content": denoising_prompt}, ], temperature=0.6, - max_tokens=3000 + max_tokens=3000, ) - + + if not response.choices: + raise ValueError( + "LLM returned empty response in denoise_draft_with_retrieval" + ) denoised_draft = response.choices[0].message.content.strip() denoised_draft = clean_reasoning_tags(denoised_draft) self.total_tokens += response.usage.completion_tokens - + return denoised_draft - + except Exception as e: return f"Denoising failed: {str(e)}\n\nFalling back to current draft:\n{current_draft}" - - def evaluate_draft_quality(self, draft: str, previous_draft: str, original_query: str) -> Dict[str, float]: + + def evaluate_draft_quality( + self, draft: str, previous_draft: str, original_query: str + ) -> Dict[str, float]: """ Evaluate the quality improvement of the current draft vs previous iteration Used for termination decisions and component fitness updates @@ -901,46 +993,53 @@ def evaluate_draft_quality(self, draft: str, previous_draft: str, original_query CITATIONS: [score] IMPROVEMENT: [score] """ - + try: response = self.client.chat.completions.create( model=self.model, messages=[ - {"role": "system", "content": "You are an expert research quality evaluator."}, - {"role": "user", "content": evaluation_prompt} + { + "role": "system", + "content": "You are an expert research quality evaluator.", + }, + {"role": "user", "content": evaluation_prompt}, ], temperature=0.2, - max_tokens=500 + max_tokens=500, ) - + + if not response.choices: + raise ValueError( + "LLM returned empty response in evaluate_draft_quality" + ) content = response.choices[0].message.content.strip() content = clean_reasoning_tags(content) self.total_tokens += response.usage.completion_tokens - + # Parse scores scores = {} - for line in content.split('\n'): - if ':' in line: - key, value = line.split(':', 1) + for line in content.split("\n"): + if ":" in line: + key, value = line.split(":", 1) key = key.strip().lower() try: scores[key] = float(value.strip()) except ValueError: scores[key] = 0.5 # Default score - + return scores - - except Exception as e: + + except Exception: # Default scores return { - 'completeness': 0.5, - 'accuracy': 0.5, - 'depth': 0.5, - 'coherence': 0.5, - 'citations': 0.5, - 'improvement': 0.1 + "completeness": 0.5, + "accuracy": 0.5, + "depth": 0.5, + "coherence": 0.5, + "citations": 0.5, + "improvement": 0.1, } - + def update_component_fitness(self, quality_scores: Dict[str, float]): """ Update component fitness based on performance (self-evolution) @@ -950,20 +1049,22 @@ def update_component_fitness(self, quality_scores: Dict[str, float]): component-wise self-evolutionary optimization as described in the TTD-DR paper. """ # Update fitness based on quality improvements - improvement = quality_scores.get('improvement', 0.0) - + improvement = quality_scores.get("improvement", 0.0) + if improvement > 0.1: # Significant improvement - self.component_fitness['search_strategy'] *= 1.1 - self.component_fitness['synthesis_quality'] *= 1.1 - self.component_fitness['integration_ability'] *= 1.1 + self.component_fitness["search_strategy"] *= 1.1 + self.component_fitness["synthesis_quality"] *= 1.1 + self.component_fitness["integration_ability"] *= 1.1 elif improvement < 0.05: # Poor improvement - self.component_fitness['search_strategy'] *= 0.95 - self.component_fitness['synthesis_quality'] *= 0.95 - + self.component_fitness["search_strategy"] *= 0.95 + self.component_fitness["synthesis_quality"] *= 0.95 + # Cap fitness values for key in self.component_fitness: - self.component_fitness[key] = max(0.1, min(2.0, self.component_fitness[key])) - + self.component_fitness[key] = max( + 0.1, min(2.0, self.component_fitness[key]) + ) + def research(self, system_prompt: str, initial_query: str) -> Tuple[str, int]: """ TTD-DR (Test-Time Diffusion Deep Researcher) main algorithm @@ -978,35 +1079,46 @@ def research(self, system_prompt: str, initial_query: str) -> Tuple[str, int]: Note: Component-wise self-evolutionary optimization is tracked but not yet used to modify behavior (future enhancement). """ - + # Get or create a browser session for this research session - self.session_manager = get_session_manager(self.session_id, headless=False, timeout=30) + self.session_manager = get_session_manager( + self.session_id, headless=False, timeout=30 + ) if self.session_manager: - print(f"🔬 Starting deep research with session ID: {self.session_id} (DeepResearcher instance: {id(self)})") + print( + f"🔬 Starting deep research with session ID: {self.session_id} (DeepResearcher instance: {id(self)})" + ) else: print("⚠️ Failed to create browser session, proceeding without web search") - + try: # PHASE 1: INITIALIZATION - Generate preliminary draft (updatable skeleton) print("TTD-DR: Generating preliminary draft...") - self.current_draft = self.generate_preliminary_draft(system_prompt, initial_query) + self.current_draft = self.generate_preliminary_draft( + system_prompt, initial_query + ) self.draft_history.append(self.current_draft) - + # PHASE 1.5: INITIAL RESEARCH - Ensure we always gather external sources print("TTD-DR: Performing initial research...") initial_queries = self.decompose_query(system_prompt, initial_query) if initial_queries: print(f" - Searching for {len(initial_queries)} initial topics...") initial_search_results = self.perform_web_search(initial_queries) - + # Extract and fetch URLs from initial search - if initial_search_results and "Web Search Results" in initial_search_results: + if ( + initial_search_results + and "Web Search Results" in initial_search_results + ): print(" - Extracting initial sources...") - initial_content, initial_sources = self.extract_and_fetch_urls(initial_search_results) - + initial_content, initial_sources = self.extract_and_fetch_urls( + initial_search_results + ) + # Register initial sources for citations for source in initial_sources: - if 'url' in source: + if "url" in source: self.citation_counter += 1 self.citations[self.citation_counter] = source @@ -1017,44 +1129,55 @@ def research(self, system_prompt: str, initial_query: str) -> Tuple[str, int]: print(" - Warning: Could not decompose query for initial research") # Fallback: Create simple search queries from the original query print(" - Using fallback search strategy...") - fallback_queries = [initial_query] # At minimum, search for the original query + fallback_queries = [ + initial_query + ] # At minimum, search for the original query fallback_search_results = self.perform_web_search(fallback_queries) - if fallback_search_results and "Web Search Results" in fallback_search_results: - fallback_content, fallback_sources = self.extract_and_fetch_urls(fallback_search_results) + if ( + fallback_search_results + and "Web Search Results" in fallback_search_results + ): + fallback_content, fallback_sources = self.extract_and_fetch_urls( + fallback_search_results + ) for source in fallback_sources: - if 'url' in source: + if "url" in source: self.citation_counter += 1 self.citations[self.citation_counter] = source print(f" - Fallback search found {len(fallback_sources)} sources") - + # PHASE 2: ITERATIVE DENOISING LOOP for iteration in range(self.max_iterations): self.research_state["iteration"] = iteration + 1 - print(f"TTD-DR: Denoising iteration {iteration + 1}/{self.max_iterations}") - + print( + f"TTD-DR: Denoising iteration {iteration + 1}/{self.max_iterations}" + ) + # STEP 1: Analyze current draft for gaps (draft-guided search) print(" - Analyzing draft gaps...") gaps = self.analyze_draft_gaps(self.current_draft, initial_query) self.gap_analysis_history.append(gaps) - + if not gaps: print(" - No significant gaps found, research complete") break - + # STEP 2: Perform gap-targeted retrieval print(f" - Performing targeted search for {len(gaps)} gaps...") retrieval_content = self.perform_gap_targeted_search(gaps) - + # STEP 3: Extract and fetch URLs from search results print(" - Extracting and fetching content...") - content_with_urls, sources = self.extract_and_fetch_urls(retrieval_content) - + content_with_urls, sources = self.extract_and_fetch_urls( + retrieval_content + ) + # Register sources for citations for source in sources: - if 'url' in source: + if "url" in source: self.citation_counter += 1 self.citations[self.citation_counter] = source - + # STEP 4: DENOISING - Integrate retrieved info with current draft print(" - Performing denoising step...") previous_draft = self.current_draft @@ -1062,22 +1185,24 @@ def research(self, system_prompt: str, initial_query: str) -> Tuple[str, int]: self.current_draft, content_with_urls, initial_query ) self.draft_history.append(self.current_draft) - + # STEP 5: Evaluate quality improvement print(" - Evaluating draft quality...") quality_scores = self.evaluate_draft_quality( self.current_draft, previous_draft, initial_query ) - + # STEP 6: Component self-evolution based on feedback self.update_component_fitness(quality_scores) - + # STEP 7: Check termination conditions - completeness = quality_scores.get('completeness', 0.0) - improvement = quality_scores.get('improvement', 0.0) - - print(f" - Quality scores: Completeness={completeness:.2f}, Improvement={improvement:.2f}") - + completeness = quality_scores.get("completeness", 0.0) + improvement = quality_scores.get("improvement", 0.0) + + print( + f" - Quality scores: Completeness={completeness:.2f}, Improvement={improvement:.2f}" + ) + # Terminate if high quality achieved or minimal improvement # More lenient termination to ensure complete research if completeness > 0.9 or (improvement < 0.03 and completeness > 0.7): @@ -1086,26 +1211,30 @@ def research(self, system_prompt: str, initial_query: str) -> Tuple[str, int]: # PHASE 3: FINALIZATION - Polish the final draft print("TTD-DR: Finalizing research report...") - + # Ensure we have gathered some sources if len(self.citations) == 0: print("⚠️ Warning: No external sources found during research!") print(" Deep research should always consult external sources.") else: print(f"✅ Research completed with {len(self.citations)} sources") - - final_report = self.finalize_research_report(system_prompt, initial_query, self.current_draft) - + + final_report = self.finalize_research_report( + system_prompt, initial_query, self.current_draft + ) + return final_report, self.total_tokens - + finally: # Clean up browser session if self.session_manager: print(f"🏁 Closing research session: {self.session_id}") close_session(self.session_id) self.session_manager = None - - def finalize_research_report(self, system_prompt: str, original_query: str, final_draft: str) -> str: + + def finalize_research_report( + self, system_prompt: str, original_query: str, final_draft: str + ) -> str: """ Apply final polishing to the research report """ @@ -1152,80 +1281,109 @@ def finalize_research_report(self, system_prompt: str, original_query: str, fina Return the final polished research report with comprehensive citations. """ - + try: response = self.client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": system_prompt}, - {"role": "user", "content": finalization_prompt} + {"role": "user", "content": finalization_prompt}, ], temperature=0.5, - max_tokens=3000 + max_tokens=3000, ) - + + if not response.choices: + raise ValueError( + "LLM returned empty response in finalize_research_report" + ) polished_report = response.choices[0].message.content.strip() polished_report = clean_reasoning_tags(polished_report) - + # Final cleanup: Remove any remaining placeholder tags polished_report = self.cleanup_placeholder_tags(polished_report) - + # Validate report completeness validation = validate_report_completeness(polished_report) - + if not validation["is_complete"]: - print(f"⚠️ Report validation found {len(validation['issues'])} issues:") - for issue in validation['issues']: + print( + f"⚠️ Report validation found {len(validation['issues'])} issues:" + ) + for issue in validation["issues"]: print(f" - {issue}") - + # Attempt to fix incomplete report - polished_report = self.fix_incomplete_report(polished_report, validation, original_query) + polished_report = self.fix_incomplete_report( + polished_report, validation, original_query + ) else: print("✅ Report validation passed - report is complete") - + self.total_tokens += response.usage.completion_tokens # Remove any References section the LLM might have created - polished_report = re.sub(r'##\s*References.*?(?=##|\Z)', '', polished_report, flags=re.DOTALL) - polished_report = re.sub(r'(?m)^References\s*\n\s*(?:\[\d+\]\s*\n)+', '', polished_report) - polished_report = re.sub(r'\n\s*\n\s*\n+', '\n\n', polished_report) # Clean up extra newlines + polished_report = re.sub( + r"##\s*References.*?(?=##|\Z)", "", polished_report, flags=re.DOTALL + ) + polished_report = re.sub( + r"(?m)^References\s*\n\s*(?:\[\d+\]\s*\n)+", "", polished_report + ) + polished_report = re.sub( + r"\n\s*\n\s*\n+", "\n\n", polished_report + ) # Clean up extra newlines # Validate citation usage before adding references - citation_validation = validate_citation_usage(polished_report, len(self.citations)) - print(f"📊 Citation Statistics:") - print(f" - Used citations: {citation_validation['citations_used']}/{citation_validation['citations_total']}") - print(f" - Usage percentage: {citation_validation['usage_percentage']:.1f}%") + citation_validation = validate_citation_usage( + polished_report, len(self.citations) + ) + print("📊 Citation Statistics:") + print( + f" - Used citations: {citation_validation['citations_used']}/{citation_validation['citations_total']}" + ) + print( + f" - Usage percentage: {citation_validation['usage_percentage']:.1f}%" + ) if "warning" in citation_validation: print(f"⚠️ {citation_validation['warning']}") - if len(citation_validation['unused_citations']) > 0: - print(f" - Unused citations: {citation_validation['unused_citations'][:10]}" + - (f"... and {len(citation_validation['unused_citations']) - 10} more" - if len(citation_validation['unused_citations']) > 10 else "")) + if len(citation_validation["unused_citations"]) > 0: + print( + f" - Unused citations: {citation_validation['unused_citations'][:10]}" + + ( + f"... and {len(citation_validation['unused_citations']) - 10} more" + if len(citation_validation["unused_citations"]) > 10 + else "" + ) + ) # Add references section (only for citations that are actually used) references = "\n\n## References\n\n" - used_citations = set(citation_validation['used_citations']) + used_citations = set(citation_validation["used_citations"]) for num, source in sorted(self.citations.items()): # Only include citations that are actually used in the text if num in used_citations: - title = source.get('title', 'Untitled') - url = source['url'] - access_date = source.get('access_date', datetime.now().strftime('%Y-%m-%d')) + title = source.get("title", "Untitled") + url = source["url"] + access_date = source.get( + "access_date", datetime.now().strftime("%Y-%m-%d") + ) references += f"[{num}] {title}. Available at: <{url}> [Accessed: {access_date}]\n\n" - + # Add TTD-DR metadata metadata = "\n---\n\n**TTD-DR Research Metadata:**\n" - metadata += f"- Algorithm: Test-Time Diffusion Deep Researcher\n" + metadata += "- Algorithm: Test-Time Diffusion Deep Researcher\n" metadata += f"- Denoising iterations: {len(self.draft_history) - 1}\n" metadata += f"- Total gaps addressed: {sum(len(gaps) for gaps in self.gap_analysis_history)}\n" metadata += f"- Total sources consulted: {len(self.citations)}\n" metadata += f"- Citations used in text: {citation_validation['citations_used']} ({citation_validation['usage_percentage']:.1f}%)\n" - metadata += f"- Report generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" + metadata += ( + f"- Report generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" + ) metadata += f"- Total tokens used: {self.total_tokens}\n" return polished_report + references + metadata - + except Exception as e: - return f"Finalization failed: {str(e)}\n\nReturning current draft:\n{final_draft}" \ No newline at end of file + return f"Finalization failed: {str(e)}\n\nReturning current draft:\n{final_draft}" diff --git a/optillm/plugins/spl/evaluation.py b/optillm/plugins/spl/evaluation.py index 87d07f86..c386d9b7 100644 --- a/optillm/plugins/spl/evaluation.py +++ b/optillm/plugins/spl/evaluation.py @@ -4,73 +4,97 @@ import logging from datetime import datetime -from typing import Dict, List, Optional, Tuple, Any +from typing import Dict, List, Optional, Any from optillm.plugins.spl.strategy import Strategy from optillm.plugins.spl.utils import extract_thinking from optillm.plugins.spl.prompts import ( STRATEGY_EVALUATION_PROMPT, - STRATEGY_REFINEMENT_PROMPT + STRATEGY_REFINEMENT_PROMPT, ) from optillm.plugins.spl.config import ( DEFAULT_MAX_TOKENS, MAX_STRATEGIES_FOR_INFERENCE, - MIN_SUCCESS_RATE_FOR_INFERENCE + MIN_SUCCESS_RATE_FOR_INFERENCE, ) # Setup logging logger = logging.getLogger(__name__) -def select_relevant_strategies(query: str, problem_type: str, db: Any, learning_mode: bool = False, max_strategies: int = MAX_STRATEGIES_FOR_INFERENCE) -> List[Strategy]: + +def select_relevant_strategies( + query: str, + problem_type: str, + db: Any, + learning_mode: bool = False, + max_strategies: int = MAX_STRATEGIES_FOR_INFERENCE, +) -> List[Strategy]: """ Select the most relevant strategies for a given problem to be used during inference. This controls how many strategies are included in the system prompt augmentation. - + When in inference mode (not learning_mode), only strategies with: - - A matching problem type + - A matching problem type - Success rate >= MIN_SUCCESS_RATE_FOR_INFERENCE - At least 5 attempts are selected. - + In learning mode, strategies with fewer attempts are also considered. - + Args: query: The problem/query text problem_type: The type of problem db: Strategy database learning_mode: Whether we're in learning mode (affects filtering criteria) max_strategies: Maximum number of strategies to return - + Returns: List[Strategy]: The selected strategies (may be empty if none meet criteria) """ # First, get strategies specifically for this problem type type_specific = db.get_strategies_for_problem(problem_type) - logger.info(f"Found {len(type_specific)} strategies for problem type '{problem_type}'") - + logger.info( + f"Found {len(type_specific)} strategies for problem type '{problem_type}'" + ) + # Filter strategies by minimum success rate and attempts qualified_strategies = [] for strategy in type_specific: # In learning mode, we're more lenient with new strategies if learning_mode and strategy.total_attempts < 5: - logger.info(f"Strategy {strategy.strategy_id} included (learning mode - only {strategy.total_attempts} attempts so far)") + logger.info( + f"Strategy {strategy.strategy_id} included (learning mode - only {strategy.total_attempts} attempts so far)" + ) qualified_strategies.append(strategy) # For inference or well-tested strategies, we require minimum success rate - elif strategy.success_rate >= MIN_SUCCESS_RATE_FOR_INFERENCE and strategy.total_attempts >= 5: - logger.info(f"Strategy {strategy.strategy_id} qualified - success rate {strategy.success_rate:.2f} >= minimum {MIN_SUCCESS_RATE_FOR_INFERENCE:.2f} with {strategy.total_attempts} attempts") + elif ( + strategy.success_rate >= MIN_SUCCESS_RATE_FOR_INFERENCE + and strategy.total_attempts >= 5 + ): + logger.info( + f"Strategy {strategy.strategy_id} qualified - success rate {strategy.success_rate:.2f} >= minimum {MIN_SUCCESS_RATE_FOR_INFERENCE:.2f} with {strategy.total_attempts} attempts" + ) qualified_strategies.append(strategy) else: if strategy.total_attempts < 5: - logger.info(f"Strategy {strategy.strategy_id} skipped - insufficient attempts ({strategy.total_attempts} < 5) in inference mode") + logger.info( + f"Strategy {strategy.strategy_id} skipped - insufficient attempts ({strategy.total_attempts} < 5) in inference mode" + ) else: - logger.info(f"Strategy {strategy.strategy_id} skipped - success rate {strategy.success_rate:.2f} < minimum {MIN_SUCCESS_RATE_FOR_INFERENCE:.2f}") - + logger.info( + f"Strategy {strategy.strategy_id} skipped - success rate {strategy.success_rate:.2f} < minimum {MIN_SUCCESS_RATE_FOR_INFERENCE:.2f}" + ) + if not qualified_strategies: - logger.info(f"No strategies meet the minimum success rate threshold ({MIN_SUCCESS_RATE_FOR_INFERENCE:.2f}) for problem type '{problem_type}'") + logger.info( + f"No strategies meet the minimum success rate threshold ({MIN_SUCCESS_RATE_FOR_INFERENCE:.2f}) for problem type '{problem_type}'" + ) return [] - - logger.info(f"Found {len(qualified_strategies)} strategies that meet minimum success rate requirement") - + + logger.info( + f"Found {len(qualified_strategies)} strategies that meet minimum success rate requirement" + ) + # If we have more qualified strategies than needed, sort and select the best ones if len(qualified_strategies) > max_strategies: # Score each strategy based on success rate and recency @@ -81,103 +105,129 @@ def select_relevant_strategies(query: str, problem_type: str, db: Any, learning_ # Calculate days since last use last_used = datetime.fromisoformat(strategy.last_used) days_since = (datetime.now() - last_used).days - recency_score = max(0, 1.0 - min(1.0, days_since / 30.0)) # Higher for more recent - + recency_score = max( + 0, 1.0 - min(1.0, days_since / 30.0) + ) # Higher for more recent + # Combined score with success rate weighing more score = (0.7 * strategy.success_rate) + (0.3 * recency_score) scored_strategies.append((strategy, score)) - + # Sort by score (descending) and take top strategies scored_strategies.sort(key=lambda x: x[1], reverse=True) selected = [s[0] for s in scored_strategies[:max_strategies]] - + # Log which strategies we're using for i, strategy in enumerate(selected, 1): - logger.info(f"Selected strategy {i}/{max_strategies} for inference: {strategy.strategy_id} (success rate: {strategy.success_rate:.2f})") - + logger.info( + f"Selected strategy {i}/{max_strategies} for inference: {strategy.strategy_id} (success rate: {strategy.success_rate:.2f})" + ) + return selected - + # If we have fewer or equal to the maximum, use all qualified strategies for i, strategy in enumerate(qualified_strategies, 1): - logger.info(f"Selected strategy {i}/{len(qualified_strategies)} for inference: {strategy.strategy_id} (success rate: {strategy.success_rate:.2f})") - + logger.info( + f"Selected strategy {i}/{len(qualified_strategies)} for inference: {strategy.strategy_id} (success rate: {strategy.success_rate:.2f})" + ) + return qualified_strategies -def evaluate_strategy_effectiveness(response: str, thinking: Optional[str], selected_strategies: List[Strategy], client, model: str) -> Dict[str, bool]: + +def evaluate_strategy_effectiveness( + response: str, + thinking: Optional[str], + selected_strategies: List[Strategy], + client, + model: str, +) -> Dict[str, bool]: """ Evaluate how effective each strategy was in generating the response. - + Args: response: The LLM's final response to the query thinking: The LLM's reasoning process (if any) selected_strategies: The strategies that were used client: LLM client for making API calls model: Model identifier - + Returns: Dict[str, bool]: Mapping from strategy ID to effectiveness (True/False) """ if not selected_strategies: return {} - + results = {} - + try: for strategy in selected_strategies: # Include thinking in the evaluation if available full_response = thinking + "\n\n" + response if thinking else response - + messages = [ + {"role": "system", "content": STRATEGY_EVALUATION_PROMPT}, { - "role": "system", - "content": STRATEGY_EVALUATION_PROMPT - }, - { - "role": "user", + "role": "user", "content": ( f"Strategy:\n{strategy.strategy_text}\n\n" f"Response (including reasoning):\n{full_response}\n\n" f"Does the response show clear evidence that the strategy was effectively applied? " f"Answer with ONLY YES or NO." - ) - } + ), + }, ] - + eval_response = client.chat.completions.create( model=model, messages=messages, temperature=0.1, # Low temperature for more deterministic output - max_tokens=DEFAULT_MAX_TOKENS # Increased token limit for reasoning LLMs + max_tokens=DEFAULT_MAX_TOKENS, # Increased token limit for reasoning LLMs ) - + # Get the response and extract final answer (remove thinking blocks) + if not eval_response.choices: + raise ValueError( + "LLM returned empty response in evaluate_strategy_effectiveness" + ) result_text = eval_response.choices[0].message.content final_result, eval_thinking = extract_thinking(result_text) - + # Clean up and normalize the result final_result = final_result.strip().upper() - + logger.debug(f"Strategy evaluation - raw response: '{result_text}'") - logger.debug(f"Strategy evaluation - final result after removing thinking: '{final_result}'") - + logger.debug( + f"Strategy evaluation - final result after removing thinking: '{final_result}'" + ) + # Check for YES in the final answer (not in thinking blocks) is_effective = "YES" in final_result - + results[strategy.strategy_id] = is_effective - logger.info(f"Strategy {strategy.strategy_id} evaluation: {final_result} -> {is_effective}") - + logger.info( + f"Strategy {strategy.strategy_id} evaluation: {final_result} -> {is_effective}" + ) + except Exception as e: logger.error(f"Error evaluating strategy effectiveness: {str(e)}") # Default to neutral results if evaluation fails for strategy in selected_strategies: results[strategy.strategy_id] = False - + return results -def refine_strategy(strategy: Strategy, problem: str, response: str, thinking: Optional[str], client, model: str) -> Strategy: + +def refine_strategy( + strategy: Strategy, + problem: str, + response: str, + thinking: Optional[str], + client, + model: str, +) -> Strategy: """ Refine a strategy based on its application to a specific problem. - + Args: strategy: The strategy to refine problem: The problem that was solved @@ -185,48 +235,49 @@ def refine_strategy(strategy: Strategy, problem: str, response: str, thinking: O thinking: The LLM's reasoning process (if any) client: LLM client for making API calls model: Model identifier - + Returns: Strategy: The refined strategy """ try: # Include thinking in refinement if available full_response = thinking + "\n\n" + response if thinking else response - + messages = [ + {"role": "system", "content": STRATEGY_REFINEMENT_PROMPT}, { - "role": "system", - "content": STRATEGY_REFINEMENT_PROMPT - }, - { - "role": "user", + "role": "user", "content": ( f"Original strategy for {strategy.problem_type} problems:\n{strategy.strategy_text}\n\n" f"New problem:\n{problem}\n\n" f"Solution process (including reasoning):\n{full_response}\n\n" f"Provide a refined version of the original strategy that incorporates " f"any insights from this new example." - ) - } + ), + }, ] - + refine_response = client.chat.completions.create( model=model, messages=messages, temperature=0.5, - max_tokens=DEFAULT_MAX_TOKENS # Increased token limit for reasoning LLMs + max_tokens=DEFAULT_MAX_TOKENS, # Increased token limit for reasoning LLMs ) - + + if not refine_response.choices: + raise ValueError("LLM returned empty response in refine_strategy") response_text = refine_response.choices[0].message.content - + # Extract refined strategy and thinking refined_text, refinement_thinking = extract_thinking(response_text) if not refined_text.strip(): refined_text = response_text # Use full response if extraction failed - + logger.debug(f"Strategy refinement - raw response: '{response_text}'") - logger.debug(f"Strategy refinement - final text after removing thinking: '{refined_text}'") - + logger.debug( + f"Strategy refinement - final text after removing thinking: '{refined_text}'" + ) + # Create a copy of the strategy with the refined text refined_strategy = Strategy( strategy_id=strategy.strategy_id, @@ -240,15 +291,15 @@ def refine_strategy(strategy: Strategy, problem: str, response: str, thinking: O last_updated=datetime.now().isoformat(), confidence=strategy.confidence, tags=strategy.tags, - reasoning_examples=strategy.reasoning_examples.copy() + reasoning_examples=strategy.reasoning_examples.copy(), ) - + # Add the refinement thinking if available if refinement_thinking: refined_strategy.add_reasoning_example(refinement_thinking) - + return refined_strategy - + except Exception as e: logger.error(f"Error refining strategy: {str(e)}") # Return the original strategy if refinement fails