diff --git a/optillm/mars/agent.py b/optillm/mars/agent.py
index 704c33d3..e9ebd275 100644
--- a/optillm/mars/agent.py
+++ b/optillm/mars/agent.py
@@ -5,17 +5,17 @@
import logging
from typing import Dict, Any, Tuple
from datetime import datetime
-import random
from .prompts import (
MATHEMATICAL_SYSTEM_PROMPT,
AGENT_EXPLORATION_PROMPT,
VERIFICATION_PROMPT,
- IMPROVEMENT_PROMPT
+ IMPROVEMENT_PROMPT,
)
from .workspace import AgentSolution, VerificationResult
logger = logging.getLogger(__name__)
+
class MARSAgent:
"""Individual agent for mathematical reasoning with OpenRouter reasoning API"""
@@ -40,28 +40,33 @@ def _get_reasoning_effort(self) -> str:
else:
return "high" # ~80% of max_tokens for reasoning
- def generate_solution(self, problem: str, request_id: str = None) -> Tuple[AgentSolution, int]:
+ def generate_solution(
+ self, problem: str, request_id: str = None
+ ) -> Tuple[AgentSolution, int]:
"""Generate a solution for the given problem using reasoning API"""
import time
+
start_time = time.time()
- logger.info(f"🤖 AGENT {self.agent_id}: Starting solution generation (temp: {self.temperature}, effort: {self._get_reasoning_effort()})")
- logger.info(f"🤖 AGENT {self.agent_id}: Problem length: {len(problem)} characters")
+ logger.info(
+ f"🤖 AGENT {self.agent_id}: Starting solution generation (temp: {self.temperature}, effort: {self._get_reasoning_effort()})"
+ )
+ logger.info(
+ f"🤖 AGENT {self.agent_id}: Problem length: {len(problem)} characters"
+ )
# Prepare the prompt
exploration_prompt = AGENT_EXPLORATION_PROMPT.format(
- agent_id=self.agent_id,
- temperature=self.temperature,
- problem=problem
+ agent_id=self.agent_id, temperature=self.temperature, problem=problem
)
# Configure reasoning parameters - simplified with effort only
reasoning_effort = self._get_reasoning_effort()
- max_tokens = self.config['max_tokens']
- logger.info(f"🤖 AGENT {self.agent_id}: Using max_tokens={max_tokens}, reasoning_effort={reasoning_effort}")
+ max_tokens = self.config["max_tokens"]
+ logger.info(
+ f"🤖 AGENT {self.agent_id}: Using max_tokens={max_tokens}, reasoning_effort={reasoning_effort}"
+ )
- reasoning_config = {
- "effort": reasoning_effort
- }
+ reasoning_config = {"effort": reasoning_effort}
try:
# Make API call with reasoning via extra_body for OpenRouter compatibility
@@ -71,52 +76,74 @@ def generate_solution(self, problem: str, request_id: str = None) -> Tuple[Agent
model=self.model,
messages=[
{"role": "system", "content": MATHEMATICAL_SYSTEM_PROMPT},
- {"role": "user", "content": exploration_prompt}
+ {"role": "user", "content": exploration_prompt},
],
max_tokens=max_tokens,
temperature=self.temperature,
timeout=300, # 5 minute timeout for complex problems
- extra_body={
- "reasoning": reasoning_config
- }
+ extra_body={"reasoning": reasoning_config},
)
api_duration = time.time() - api_start
- logger.info(f"🤖 AGENT {self.agent_id}: API call completed in {api_duration:.2f}s")
+ logger.info(
+ f"🤖 AGENT {self.agent_id}: API call completed in {api_duration:.2f}s"
+ )
+ if not response.choices:
+ raise ValueError(
+ f"LLM returned empty response in generate_solution (agent {self.agent_id})"
+ )
solution_text = response.choices[0].message.content.strip()
# ENHANCED LOGGING: Log solution details
solution_length = len(solution_text)
word_count = len(solution_text.split())
has_boxed = "\\boxed{" in solution_text
- has_proof_words = any(word in solution_text.lower() for word in ['therefore', 'thus', 'proof', 'qed'])
+ has_proof_words = any(
+ word in solution_text.lower()
+ for word in ["therefore", "thus", "proof", "qed"]
+ )
logger.info(f"🤖 AGENT {self.agent_id}: Solution analysis:")
logger.info(f" 📝 Length: {solution_length:,} chars, {word_count:,} words")
logger.info(f" 📦 Has boxed answer: {has_boxed}")
logger.info(f" 🔍 Has proof indicators: {has_proof_words}")
- logger.info(f" 📄 Preview: {solution_text[:200]}{'...' if len(solution_text) > 200 else ''}")
- logger.info(f" 📄 Last 100 chars: ...{solution_text[-100:] if solution_length > 100 else solution_text}")
+ logger.info(
+ f" 📄 Preview: {solution_text[:200]}{'...' if len(solution_text) > 200 else ''}"
+ )
+ logger.info(
+ f" 📄 Last 100 chars: ...{solution_text[-100:] if solution_length > 100 else solution_text}"
+ )
# Extract reasoning tokens from the correct nested structure
reasoning_tokens = 0
total_tokens = 0
- if hasattr(response, 'usage') and response.usage:
- total_tokens = getattr(response.usage, 'total_tokens', 0)
+ if hasattr(response, "usage") and response.usage:
+ total_tokens = getattr(response.usage, "total_tokens", 0)
# Check completion_tokens_details first (OpenRouter structure)
- if hasattr(response.usage, 'completion_tokens_details') and response.usage.completion_tokens_details:
- reasoning_tokens = getattr(response.usage.completion_tokens_details, 'reasoning_tokens', 0)
+ if (
+ hasattr(response.usage, "completion_tokens_details")
+ and response.usage.completion_tokens_details
+ ):
+ reasoning_tokens = getattr(
+ response.usage.completion_tokens_details, "reasoning_tokens", 0
+ )
# Fallback to direct usage field (standard OpenAI structure)
if reasoning_tokens == 0:
- reasoning_tokens = getattr(response.usage, 'reasoning_tokens', 0)
+ reasoning_tokens = getattr(response.usage, "reasoning_tokens", 0)
- reasoning_ratio = (reasoning_tokens / total_tokens * 100) if total_tokens > 0 else 0
- logger.info(f"🤖 AGENT {self.agent_id}: Token usage: reasoning={reasoning_tokens:,}, total={total_tokens:,} ({reasoning_ratio:.1f}% reasoning)")
+ reasoning_ratio = (
+ (reasoning_tokens / total_tokens * 100) if total_tokens > 0 else 0
+ )
+ logger.info(
+ f"🤖 AGENT {self.agent_id}: Token usage: reasoning={reasoning_tokens:,}, total={total_tokens:,} ({reasoning_ratio:.1f}% reasoning)"
+ )
# Extract confidence from solution (heuristic based on response characteristics)
confidence = self._estimate_confidence(solution_text)
- logger.info(f"🤖 AGENT {self.agent_id}: Estimated confidence: {confidence:.3f}")
+ logger.info(
+ f"🤖 AGENT {self.agent_id}: Estimated confidence: {confidence:.3f}"
+ )
# Create agent solution object with enhanced metadata
agent_solution = AgentSolution(
@@ -126,17 +153,23 @@ def generate_solution(self, problem: str, request_id: str = None) -> Tuple[Agent
reasoning_tokens=reasoning_tokens,
total_tokens=total_tokens,
solution_length=solution_length,
- temperature=self.temperature
+ temperature=self.temperature,
)
total_duration = time.time() - start_time
- logger.info(f"🤖 AGENT {self.agent_id}: ✅ Solution generated in {total_duration:.2f}s (API: {api_duration:.2f}s, processing: {total_duration-api_duration:.2f}s)")
+ logger.info(
+ f"🤖 AGENT {self.agent_id}: ✅ Solution generated in {total_duration:.2f}s (API: {api_duration:.2f}s, processing: {total_duration-api_duration:.2f}s)"
+ )
return agent_solution, reasoning_tokens
except Exception as e:
error_duration = time.time() - start_time
- logger.error(f"🤖 AGENT {self.agent_id}: ❌ Error generating solution after {error_duration:.2f}s: {str(e)}")
- logger.error(f"🤖 AGENT {self.agent_id}: Model: {self.model}, Temperature: {self.temperature}, Max tokens: {max_tokens}")
+ logger.error(
+ f"🤖 AGENT {self.agent_id}: ❌ Error generating solution after {error_duration:.2f}s: {str(e)}"
+ )
+ logger.error(
+ f"🤖 AGENT {self.agent_id}: Model: {self.model}, Temperature: {self.temperature}, Max tokens: {max_tokens}"
+ )
# Return empty solution with error indication
error_message = f"Error generating solution: {str(e)}"
error_solution = AgentSolution(
@@ -146,24 +179,35 @@ def generate_solution(self, problem: str, request_id: str = None) -> Tuple[Agent
reasoning_tokens=0,
total_tokens=0,
solution_length=len(error_message),
- temperature=self.temperature
+ temperature=self.temperature,
)
return error_solution, 0
- def verify_solution(self, problem: str, solution: str, verifier_id: int, solution_agent_id: int, request_id: str = None) -> VerificationResult:
+ def verify_solution(
+ self,
+ problem: str,
+ solution: str,
+ verifier_id: int,
+ solution_agent_id: int,
+ request_id: str = None,
+ ) -> VerificationResult:
"""Verify a solution using mathematical reasoning"""
import time
+
start_time = time.time()
- logger.info(f"🔍 VERIFIER {self.agent_id}: Starting verification (target: Agent {solution_agent_id}, verifier_id: {verifier_id})")
- logger.info(f"🔍 VERIFIER {self.agent_id}: Solution length: {len(solution):,} chars")
+ logger.info(
+ f"🔍 VERIFIER {self.agent_id}: Starting verification (target: Agent {solution_agent_id}, verifier_id: {verifier_id})"
+ )
+ logger.info(
+ f"🔍 VERIFIER {self.agent_id}: Solution length: {len(solution):,} chars"
+ )
verification_prompt = VERIFICATION_PROMPT.format(
- problem=problem,
- solution=solution
+ problem=problem, solution=solution
)
# Use simplified verification with effort parameter
- max_tokens = self.config['max_tokens']
+ max_tokens = self.config["max_tokens"]
try:
api_start = time.time()
@@ -172,7 +216,7 @@ def verify_solution(self, problem: str, solution: str, verifier_id: int, solutio
model=self.model,
messages=[
{"role": "system", "content": MATHEMATICAL_SYSTEM_PROMPT},
- {"role": "user", "content": verification_prompt}
+ {"role": "user", "content": verification_prompt},
],
max_tokens=max_tokens,
temperature=0.1, # Low temperature for consistent verification
@@ -181,17 +225,29 @@ def verify_solution(self, problem: str, solution: str, verifier_id: int, solutio
"reasoning": {
"effort": "low" # Low effort for verification consistency
}
- }
+ },
)
api_duration = time.time() - api_start
- logger.info(f"🔍 VERIFIER {self.agent_id}: Verification API call completed in {api_duration:.2f}s")
+ logger.info(
+ f"🔍 VERIFIER {self.agent_id}: Verification API call completed in {api_duration:.2f}s"
+ )
+ if not response.choices:
+ raise ValueError(
+ f"LLM returned empty response in verify_solution (agent {self.agent_id})"
+ )
verification_text = response.choices[0].message.content.strip()
# Parse verification result
- assessment, confidence, issues, suggestions = self._parse_verification(verification_text)
- logger.info(f"🔍 VERIFIER {self.agent_id}: Assessment: {assessment}, Confidence: {confidence:.3f}")
- logger.info(f"🔍 VERIFIER {self.agent_id}: Issues found: {len(issues)}, Suggestions: {len(suggestions)}")
+ assessment, confidence, issues, suggestions = self._parse_verification(
+ verification_text
+ )
+ logger.info(
+ f"🔍 VERIFIER {self.agent_id}: Assessment: {assessment}, Confidence: {confidence:.3f}"
+ )
+ logger.info(
+ f"🔍 VERIFIER {self.agent_id}: Issues found: {len(issues)}, Suggestions: {len(suggestions)}"
+ )
if issues:
logger.info(f"🔍 VERIFIER {self.agent_id}: Key issues: {issues[:2]}")
@@ -203,16 +259,20 @@ def verify_solution(self, problem: str, solution: str, verifier_id: int, solutio
issues=issues,
suggestions=suggestions,
detailed_report=verification_text,
- timestamp=datetime.now()
+ timestamp=datetime.now(),
)
total_duration = time.time() - start_time
- logger.info(f"🔍 VERIFIER {self.agent_id}: ✅ Verification completed in {total_duration:.2f}s")
+ logger.info(
+ f"🔍 VERIFIER {self.agent_id}: ✅ Verification completed in {total_duration:.2f}s"
+ )
return result
except Exception as e:
error_duration = time.time() - start_time
- logger.error(f"🔍 VERIFIER {self.agent_id}: ❌ Verification error after {error_duration:.2f}s: {str(e)}")
+ logger.error(
+ f"🔍 VERIFIER {self.agent_id}: ❌ Verification error after {error_duration:.2f}s: {str(e)}"
+ )
return VerificationResult(
verifier_id=verifier_id,
solution_id=f"agent_{solution_agent_id}_iter_0",
@@ -221,26 +281,36 @@ def verify_solution(self, problem: str, solution: str, verifier_id: int, solutio
issues=[f"Verification error: {str(e)}"],
suggestions=["Retry verification"],
detailed_report=f"Error during verification: {str(e)}",
- timestamp=datetime.now()
+ timestamp=datetime.now(),
)
- def improve_solution(self, problem: str, current_solution: str, feedback: str, issues: list, request_id: str = None) -> Tuple[str, int]:
+ def improve_solution(
+ self,
+ problem: str,
+ current_solution: str,
+ feedback: str,
+ issues: list,
+ request_id: str = None,
+ ) -> Tuple[str, int]:
"""Improve a solution based on verification feedback"""
import time
+
start_time = time.time()
logger.info(f"🔧 IMPROVER {self.agent_id}: Starting solution improvement")
- logger.info(f"🔧 IMPROVER {self.agent_id}: Current solution: {len(current_solution):,} chars")
+ logger.info(
+ f"🔧 IMPROVER {self.agent_id}: Current solution: {len(current_solution):,} chars"
+ )
logger.info(f"🔧 IMPROVER {self.agent_id}: Issues to address: {len(issues)}")
improvement_prompt = IMPROVEMENT_PROMPT.format(
problem=problem,
current_solution=current_solution,
feedback=feedback,
- issues="\n".join(f"- {issue}" for issue in issues)
+ issues="\n".join(f"- {issue}" for issue in issues),
)
# Use simplified improvement with high effort
- max_tokens = self.config['max_tokens']
+ max_tokens = self.config["max_tokens"]
try:
api_start = time.time()
@@ -249,36 +319,51 @@ def improve_solution(self, problem: str, current_solution: str, feedback: str, i
model=self.model,
messages=[
{"role": "system", "content": MATHEMATICAL_SYSTEM_PROMPT},
- {"role": "user", "content": improvement_prompt}
+ {"role": "user", "content": improvement_prompt},
],
max_tokens=max_tokens,
- temperature=self.temperature * 0.8, # Slightly lower temperature for improvement
+ temperature=self.temperature
+ * 0.8, # Slightly lower temperature for improvement
timeout=300,
extra_body={
- "reasoning": {
- "effort": "high" # High effort for improvements
- }
- }
+ "reasoning": {"effort": "high"} # High effort for improvements
+ },
)
api_duration = time.time() - api_start
- logger.info(f"🔧 IMPROVER {self.agent_id}: Improvement API call completed in {api_duration:.2f}s")
+ logger.info(
+ f"🔧 IMPROVER {self.agent_id}: Improvement API call completed in {api_duration:.2f}s"
+ )
+ if not response.choices:
+ raise ValueError(
+ f"LLM returned empty response in improve_solution (agent {self.agent_id})"
+ )
improved_solution = response.choices[0].message.content.strip()
- reasoning_tokens = getattr(response.usage, 'reasoning_tokens', 0)
+ reasoning_tokens = getattr(response.usage, "reasoning_tokens", 0)
# Log improvement analysis
length_change = len(improved_solution) - len(current_solution)
- logger.info(f"🔧 IMPROVER {self.agent_id}: Solution length change: {length_change:+,} chars")
- logger.info(f"🔧 IMPROVER {self.agent_id}: Improved solution preview: {improved_solution[:200]}{'...' if len(improved_solution) > 200 else ''}")
+ logger.info(
+ f"🔧 IMPROVER {self.agent_id}: Solution length change: {length_change:+,} chars"
+ )
+ logger.info(
+ f"🔧 IMPROVER {self.agent_id}: Improved solution preview: {improved_solution[:200]}{'...' if len(improved_solution) > 200 else ''}"
+ )
total_duration = time.time() - start_time
- logger.info(f"🔧 IMPROVER {self.agent_id}: ✅ Solution improved in {total_duration:.2f}s with {reasoning_tokens:,} reasoning tokens")
+ logger.info(
+ f"🔧 IMPROVER {self.agent_id}: ✅ Solution improved in {total_duration:.2f}s with {reasoning_tokens:,} reasoning tokens"
+ )
return improved_solution, reasoning_tokens
except Exception as e:
error_duration = time.time() - start_time
- logger.error(f"🔧 IMPROVER {self.agent_id}: ❌ Improvement error after {error_duration:.2f}s: {str(e)}")
- logger.warning(f"🔧 IMPROVER {self.agent_id}: Returning original solution due to error")
+ logger.error(
+ f"🔧 IMPROVER {self.agent_id}: ❌ Improvement error after {error_duration:.2f}s: {str(e)}"
+ )
+ logger.warning(
+ f"🔧 IMPROVER {self.agent_id}: Returning original solution due to error"
+ )
return current_solution, 0 # Return original solution if improvement fails
def _estimate_confidence(self, solution: str) -> float:
@@ -313,10 +398,14 @@ def _estimate_confidence(self, solution: str) -> float:
uncertainty_factors.append("explicit_uncertainty")
final_confidence = max(0.1, min(1.0, confidence))
- logger.debug(f"🤖 AGENT {self.agent_id}: Confidence factors: +{confidence_factors}, -{uncertainty_factors} → {final_confidence:.3f}")
+ logger.debug(
+ f"🤖 AGENT {self.agent_id}: Confidence factors: +{confidence_factors}, -{uncertainty_factors} → {final_confidence:.3f}"
+ )
return final_confidence
- def _parse_verification(self, verification_text: str) -> Tuple[str, float, list, list]:
+ def _parse_verification(
+ self, verification_text: str
+ ) -> Tuple[str, float, list, list]:
"""Parse verification result to extract structured information"""
assessment = "INCOMPLETE" # Default
confidence = 0.5
@@ -338,23 +427,32 @@ def _parse_verification(self, verification_text: str) -> Tuple[str, float, list,
# Extract confidence if explicitly mentioned
import re
- confidence_match = re.search(r'confidence.*?(\d+).*?(?:out of|/)\s*(\d+)', text_lower)
+
+ confidence_match = re.search(
+ r"confidence.*?(\d+).*?(?:out of|/)\s*(\d+)", text_lower
+ )
if confidence_match:
conf_score = float(confidence_match.group(1))
conf_total = float(confidence_match.group(2))
confidence = conf_score / conf_total
# Extract issues (simple heuristic)
- lines = verification_text.split('\n')
+ lines = verification_text.split("\n")
for line in lines:
line_lower = line.lower()
- if any(word in line_lower for word in ['error', 'mistake', 'incorrect', 'wrong', 'issue']):
+ if any(
+ word in line_lower
+ for word in ["error", "mistake", "incorrect", "wrong", "issue"]
+ ):
issues.append(line.strip())
# Extract suggestions (simple heuristic)
for line in lines:
line_lower = line.lower()
- if any(word in line_lower for word in ['suggest', 'recommend', 'should', 'could improve']):
+ if any(
+ word in line_lower
+ for word in ["suggest", "recommend", "should", "could improve"]
+ ):
suggestions.append(line.strip())
- return assessment, confidence, issues, suggestions
\ No newline at end of file
+ return assessment, confidence, issues, suggestions
diff --git a/optillm/plugins/coc_plugin.py b/optillm/plugins/coc_plugin.py
index 3db92e91..83104486 100644
--- a/optillm/plugins/coc_plugin.py
+++ b/optillm/plugins/coc_plugin.py
@@ -1,7 +1,7 @@
"""
Chain of Code (CoC) plugin for OptILLM.
-This plugin implements a chain-of-code approach that combines Chain-of-Thought (CoT)
+This plugin implements a chain-of-code approach that combines Chain-of-Thought (CoT)
reasoning with code execution and LLM-based code simulation.
SAFETY NOTE: This plugin has been refactored to use Jupyter notebook kernel execution
@@ -18,12 +18,8 @@
import re
import logging
-from typing import Tuple, Dict, Any, List
+from typing import Tuple, Any, List
import ast
-import traceback
-import math
-import importlib
-import json
import nbformat
from nbconvert.preprocessors import ExecutePreprocessor
import os
@@ -38,7 +34,7 @@
MAX_FIX_ATTEMPTS = 3
# Initial code generation prompt
-CHAIN_OF_CODE_PROMPT = '''
+CHAIN_OF_CODE_PROMPT = """
Write Python code to solve this problem. The code should:
1. Break down the problem into clear computational steps
2. Use standard Python features and math operations
@@ -50,10 +46,10 @@
```python
[Your complete Python program here]
```
-'''
+"""
# Code fix prompt
-CODE_FIX_PROMPT = '''
+CODE_FIX_PROMPT = """
The following Python code failed to execute. Fix the code to make it work.
Original code:
```python
@@ -73,10 +69,10 @@
```python
[Your fixed code here]
```
-'''
+"""
# Simulation prompt
-SIMULATION_PROMPT = '''
+SIMULATION_PROMPT = """
The following Python code could not be executed directly. Analyze the code and determine what the answer would be.
Pay special attention to:
1. The core computational logic, ignoring any visualization or display code
@@ -92,11 +88,12 @@
{error}
Return ONLY the final value that would be in the 'answer' variable. Return just the value, no explanations.
-'''
+"""
+
def extract_code_blocks(text: str) -> List[str]:
"""Extract Python code blocks from text."""
- pattern = r'```python\s*(.*?)\s*```'
+ pattern = r"```python\s*(.*?)\s*```"
matches = re.findall(pattern, text, re.DOTALL)
blocks = [m.strip() for m in matches]
logger.info(f"Extracted {len(blocks)} code blocks")
@@ -104,34 +101,39 @@ def extract_code_blocks(text: str) -> List[str]:
logger.info(f"Code block {i+1}:\n{block}")
return blocks
+
def sanitize_code(code: str) -> str:
"""Prepare code for safe execution by removing problematic visualization code."""
# Remove or modify problematic visualization code
- lines = code.split('\n')
+ lines = code.split("\n")
safe_lines = []
for line in lines:
# Skip matplotlib-related imports and plotting commands that could cause issues
- if any(x in line.lower() for x in ['matplotlib', 'plt.', '.plot(', '.show(', 'figure', 'subplot']):
+ if any(
+ x in line.lower()
+ for x in ["matplotlib", "plt.", ".plot(", ".show(", "figure", "subplot"]
+ ):
# Replace with a comment to maintain code structure
safe_lines.append(f"# {line} # Removed for safety")
else:
# Keep the line if it's not visualization-related
safe_lines.append(line)
-
- return '\n'.join(safe_lines)
+
+ return "\n".join(safe_lines)
+
def execute_code(code: str) -> Tuple[Any, str]:
"""Attempt to execute the code using Jupyter notebook kernel and return result or error."""
logger.info("Attempting to execute code in notebook kernel")
logger.info(f"Code:\n{code}")
-
+
try:
# Sanitize the code first
sanitized_code = sanitize_code(code)
-
+
# Create a notebook with the code
notebook = nbformat.v4.new_notebook()
-
+
# Add code that captures the answer variable
enhanced_code = f"""
{sanitized_code}
@@ -142,70 +144,74 @@ def execute_code(code: str) -> Tuple[Any, str]:
else:
print("ANSWER_RESULT: No answer variable found")
"""
-
- notebook['cells'] = [nbformat.v4.new_code_cell(enhanced_code)]
-
+
+ notebook["cells"] = [nbformat.v4.new_code_cell(enhanced_code)]
+
# Convert notebook to JSON string and then to bytes
notebook_json = nbformat.writes(notebook)
- notebook_bytes = notebook_json.encode('utf-8')
+ notebook_bytes = notebook_json.encode("utf-8")
# Create temporary notebook file
- with tempfile.NamedTemporaryFile(mode='wb', suffix='.ipynb', delete=False) as tmp:
+ with tempfile.NamedTemporaryFile(
+ mode="wb", suffix=".ipynb", delete=False
+ ) as tmp:
tmp.write(notebook_bytes)
tmp.flush()
tmp_name = tmp.name
try:
# Read and execute the notebook
- with open(tmp_name, 'r', encoding='utf-8') as f:
+ with open(tmp_name, "r", encoding="utf-8") as f:
nb = nbformat.read(f, as_version=4)
-
+
# Execute with timeout and isolation
- ep = ExecutePreprocessor(timeout=30, kernel_name='python3')
- ep.preprocess(nb, {'metadata': {'path': './'}})
+ ep = ExecutePreprocessor(timeout=30, kernel_name="python3")
+ ep.preprocess(nb, {"metadata": {"path": "./"}})
# Extract the output
output = ""
error_output = ""
-
+
for cell in nb.cells:
- if cell.cell_type == 'code' and cell.outputs:
+ if cell.cell_type == "code" and cell.outputs:
for output_item in cell.outputs:
- if output_item.output_type == 'stream':
- if output_item.name == 'stdout':
+ if output_item.output_type == "stream":
+ if output_item.name == "stdout":
output += output_item.text
- elif output_item.name == 'stderr':
+ elif output_item.name == "stderr":
error_output += output_item.text
- elif output_item.output_type == 'execute_result':
- output += str(output_item.data.get('text/plain', ''))
- elif output_item.output_type == 'error':
+ elif output_item.output_type == "execute_result":
+ output += str(output_item.data.get("text/plain", ""))
+ elif output_item.output_type == "error":
error_output += f"{output_item.ename}: {output_item.evalue}"
-
+
# Check for errors first
if error_output:
logger.error(f"Execution failed: {error_output}")
return None, error_output
-
+
# Parse the answer from output
output = output.strip()
-
+
# Look for our special ANSWER_RESULT marker
if "ANSWER_RESULT:" in output:
- answer_line = [line for line in output.split('\n') if 'ANSWER_RESULT:' in line][-1]
- answer_str = answer_line.split('ANSWER_RESULT:', 1)[1].strip()
-
+ answer_line = [
+ line for line in output.split("\n") if "ANSWER_RESULT:" in line
+ ][-1]
+ answer_str = answer_line.split("ANSWER_RESULT:", 1)[1].strip()
+
if answer_str == "No answer variable found":
error = "Code executed but did not produce an answer variable"
logger.warning(error)
return None, error
-
+
try:
# Try to evaluate the answer to convert it to proper type
answer = ast.literal_eval(answer_str)
except (ValueError, SyntaxError):
# If literal_eval fails, keep as string
answer = answer_str
-
+
logger.info(f"Execution successful. Answer: {answer}")
return answer, None
else:
@@ -217,37 +223,44 @@ def execute_code(code: str) -> Tuple[Any, str]:
error = "Code executed but produced no output"
logger.warning(error)
return None, error
-
+
finally:
# Clean up temporary file
try:
os.unlink(tmp_name)
except:
pass
-
+
except Exception as e:
error = f"Notebook execution failed: {str(e)}"
logger.error(error)
return None, error
-def generate_fixed_code(original_code: str, error: str, client, model: str) -> Tuple[str, int]:
+
+def generate_fixed_code(
+ original_code: str, error: str, client, model: str
+) -> Tuple[str, int]:
"""Ask LLM to fix the broken code."""
logger.info("Requesting code fix from LLM")
logger.info(f"Original error: {error}")
-
+
response = client.chat.completions.create(
model=model,
messages=[
- {"role": "system", "content": CODE_FIX_PROMPT.format(
- code=original_code, error=error)},
- {"role": "user", "content": "Fix the code to make it work."}
+ {
+ "role": "system",
+ "content": CODE_FIX_PROMPT.format(code=original_code, error=error),
+ },
+ {"role": "user", "content": "Fix the code to make it work."},
],
- temperature=0.2
+ temperature=0.2,
)
-
+
+ if not response.choices:
+ raise ValueError("LLM returned empty response in generate_fixed_code")
fixed_code = response.choices[0].message.content
code_blocks = extract_code_blocks(fixed_code)
-
+
if code_blocks:
logger.info("Received fixed code from LLM")
return code_blocks[0], response.usage.completion_tokens
@@ -255,21 +268,29 @@ def generate_fixed_code(original_code: str, error: str, client, model: str) -> T
logger.warning("No code block found in LLM response")
return None, response.usage.completion_tokens
+
def simulate_execution(code: str, error: str, client, model: str) -> Tuple[Any, int]:
"""Ask LLM to simulate code execution."""
logger.info("Attempting code simulation with LLM")
-
+
response = client.chat.completions.create(
model=model,
messages=[
- {"role": "system", "content": SIMULATION_PROMPT.format(
- code=code, error=error)},
- {"role": "user", "content": "Simulate this code and return the final answer value."}
+ {
+ "role": "system",
+ "content": SIMULATION_PROMPT.format(code=code, error=error),
+ },
+ {
+ "role": "user",
+ "content": "Simulate this code and return the final answer value.",
+ },
],
- temperature=0.2
+ temperature=0.2,
)
-
+
try:
+ if not response.choices:
+ raise ValueError("LLM returned empty response in simulate_execution")
result = response.choices[0].message.content.strip()
# Try to convert to appropriate type
try:
@@ -282,25 +303,26 @@ def simulate_execution(code: str, error: str, client, model: str) -> Tuple[Any,
logger.error(f"Failed to parse simulation result: {str(e)}")
return None, response.usage.completion_tokens
+
def run(system_prompt: str, initial_query: str, client, model: str) -> Tuple[str, int]:
"""Main Chain of Code execution function."""
logger.info("Starting Chain of Code execution")
logger.info(f"Query: {initial_query}")
-
+
# Initial code generation
messages = [
{"role": "system", "content": system_prompt + "\n" + CHAIN_OF_CODE_PROMPT},
- {"role": "user", "content": initial_query}
+ {"role": "user", "content": initial_query},
]
-
+
response = client.chat.completions.create(
- model=model,
- messages=messages,
- temperature=0.7
+ model=model, messages=messages, temperature=0.7
)
total_tokens = response.usage.completion_tokens
-
+
# Extract initial code
+ if not response.choices:
+ raise ValueError("LLM returned empty response in run (initial code generation)")
code_blocks = extract_code_blocks(response.choices[0].message.content)
if not code_blocks:
logger.warning("No code blocks found in response")
@@ -309,47 +331,52 @@ def run(system_prompt: str, initial_query: str, client, model: str) -> Tuple[str
current_code = code_blocks[0]
fix_attempts = 0
last_error = None
-
+
# Strategy 1: Direct execution and fix attempts
while fix_attempts < MAX_FIX_ATTEMPTS:
fix_attempts += 1
logger.info(f"Execution attempt {fix_attempts}/{MAX_FIX_ATTEMPTS}")
-
+
# Try to execute current code
answer, error = execute_code(current_code)
-
+
# If successful, return the answer
if error is None:
logger.info(f"Successful execution on attempt {fix_attempts}")
return str(answer), total_tokens
-
+
last_error = error
-
+
# If we hit max attempts, break to try simulation
if fix_attempts >= MAX_FIX_ATTEMPTS:
logger.warning(f"Failed after {fix_attempts} fix attempts")
break
-
+
# Otherwise, try to get fixed code from LLM
logger.info(f"Requesting code fix, attempt {fix_attempts}")
fixed_code, fix_tokens = generate_fixed_code(current_code, error, client, model)
total_tokens += fix_tokens
-
+
if fixed_code:
current_code = fixed_code
else:
logger.error("Failed to get fixed code from LLM")
break
-
+
# Strategy 2: If all execution attempts failed, try simulation
logger.info("All execution attempts failed, trying simulation")
- simulated_answer, sim_tokens = simulate_execution(current_code, last_error, client, model)
+ simulated_answer, sim_tokens = simulate_execution(
+ current_code, last_error, client, model
+ )
total_tokens += sim_tokens
-
+
if simulated_answer is not None:
logger.info("Successfully got answer from simulation")
return str(simulated_answer), total_tokens
-
+
# If we get here, everything failed
logger.warning("All strategies failed")
- return f"Error: Could not solve problem after all attempts. Last error: {last_error}", total_tokens
\ No newline at end of file
+ return (
+ f"Error: Could not solve problem after all attempts. Last error: {last_error}",
+ total_tokens,
+ )
diff --git a/optillm/plugins/deep_research/research_engine.py b/optillm/plugins/deep_research/research_engine.py
index d36cb6bb..da7decca 100644
--- a/optillm/plugins/deep_research/research_engine.py
+++ b/optillm/plugins/deep_research/research_engine.py
@@ -8,130 +8,133 @@
through denoising and retrieval, generating comprehensive research reports.
"""
-import asyncio
-import json
import re
-from typing import Tuple, List, Dict, Optional, Any
+from typing import Tuple, List, Dict, Any
from datetime import datetime
-from collections import defaultdict
-from optillm.plugins.web_search_plugin import run as web_search_run, BrowserSessionManager
+from optillm.plugins.web_search_plugin import run as web_search_run
from optillm.plugins.readurls_plugin import run as readurls_run
-from optillm.plugins.deep_research.session_state import get_session_manager, close_session
+from optillm.plugins.deep_research.session_state import (
+ get_session_manager,
+ close_session,
+)
import uuid
def clean_reasoning_tags(text: str) -> str:
"""
Remove reasoning tags from model responses for clean final output.
-
+
Removes common reasoning tags like:
-
-
-
-
-
+
Args:
text: Raw model response text
-
+
Returns:
Cleaned text with reasoning tags removed
"""
if not text:
return text
-
+
# List of reasoning tag patterns to remove
reasoning_patterns = [
- r'.*?',
- r'.*?',
- r'.*?',
- r'.*?',
- r'.*?',
- r'.*?',
+ r".*?",
+ r".*?",
+ r".*?",
+ r".*?",
+ r".*?",
+ r".*?",
]
-
+
cleaned_text = text
for pattern in reasoning_patterns:
# Use DOTALL flag to match across newlines
- cleaned_text = re.sub(pattern, '', cleaned_text, flags=re.DOTALL | re.IGNORECASE)
-
+ cleaned_text = re.sub(
+ pattern, "", cleaned_text, flags=re.DOTALL | re.IGNORECASE
+ )
+
# Clean up any extra whitespace left behind, but preserve markdown formatting
- cleaned_text = re.sub(r'\n\s*\n\s*\n+', '\n\n', cleaned_text) # Multiple empty lines to double
- cleaned_text = re.sub(r' +', ' ', cleaned_text) # Multiple spaces to single space (but preserve intentional double spaces)
+ cleaned_text = re.sub(
+ r"\n\s*\n\s*\n+", "\n\n", cleaned_text
+ ) # Multiple empty lines to double
+ cleaned_text = re.sub(
+ r" +", " ", cleaned_text
+ ) # Multiple spaces to single space (but preserve intentional double spaces)
cleaned_text = cleaned_text.strip()
-
+
return cleaned_text
def cleanup_placeholder_tags(text: str) -> str:
"""
Remove any remaining placeholder tags from the final report.
-
+
This is a final cleanup step to ensure no incomplete research tags remain
in the published report.
-
+
Args:
text: Research report text
-
+
Returns:
Text with all placeholder tags removed
"""
if not text:
return text
-
+
# Comprehensive patterns for research placeholder tags
placeholder_patterns = [
# Research placeholders
- r'\[NEEDS RESEARCH[^\]]*\]',
- r'\[SOURCE NEEDED[^\]]*\]',
- r'\[RESEARCH NEEDED[^\]]*\]',
- r'\[CITATION NEEDED[^\]]*\]',
- r'\[MORE RESEARCH NEEDED[^\]]*\]',
- r'\[REQUIRES INVESTIGATION[^\]]*\]',
- r'\[TO BE RESEARCHED[^\]]*\]',
- r'\[VERIFY[^\]]*\]',
- r'\[CHECK[^\]]*\]',
-
+ r"\[NEEDS RESEARCH[^\]]*\]",
+ r"\[SOURCE NEEDED[^\]]*\]",
+ r"\[RESEARCH NEEDED[^\]]*\]",
+ r"\[CITATION NEEDED[^\]]*\]",
+ r"\[MORE RESEARCH NEEDED[^\]]*\]",
+ r"\[REQUIRES INVESTIGATION[^\]]*\]",
+ r"\[TO BE RESEARCHED[^\]]*\]",
+ r"\[VERIFY[^\]]*\]",
+ r"\[CHECK[^\]]*\]",
# Citation placeholders (like your example)
- r'\[Placeholder for[^\]]+\]',
- r'\[\d+\]\s*\[Placeholder[^\]]+\]',
- r'\[Insert citation[^\]]*\]',
- r'\[Add reference[^\]]*\]',
- r'\[Reference needed[^\]]*\]',
-
+ r"\[Placeholder for[^\]]+\]",
+ r"\[\d+\]\s*\[Placeholder[^\]]+\]",
+ r"\[Insert citation[^\]]*\]",
+ r"\[Add reference[^\]]*\]",
+ r"\[Reference needed[^\]]*\]",
# Content placeholders
- r'\[To be completed[^\]]*\]',
- r'\[Under development[^\]]*\]',
- r'\[Coming soon[^\]]*\]',
- r'\[TBD[^\]]*\]',
- r'\[TODO[^\]]*\]',
-
+ r"\[To be completed[^\]]*\]",
+ r"\[Under development[^\]]*\]",
+ r"\[Coming soon[^\]]*\]",
+ r"\[TBD[^\]]*\]",
+ r"\[TODO[^\]]*\]",
# Question placeholders and incomplete sections
- r'\[Question \d+[^\]]*\]',
- r'\[Research question[^\]]*\]',
+ r"\[Question \d+[^\]]*\]",
+ r"\[Research question[^\]]*\]",
]
-
+
cleaned_text = text
for pattern in placeholder_patterns:
# Remove the placeholder tags
- cleaned_text = re.sub(pattern, '', cleaned_text, flags=re.IGNORECASE)
-
+ cleaned_text = re.sub(pattern, "", cleaned_text, flags=re.IGNORECASE)
+
# Also remove any sentences that are entirely placeholder-based
- lines = cleaned_text.split('\n')
+ lines = cleaned_text.split("\n")
filtered_lines = []
-
+
for line in lines:
# Skip lines that are mostly just removed placeholders (now empty or just punctuation)
stripped = line.strip()
- if stripped and not re.match(r'^[\s\-\*\.\,\;\:]*$', stripped):
+ if stripped and not re.match(r"^[\s\-\*\.\,\;\:]*$", stripped):
filtered_lines.append(line)
elif not stripped: # Keep empty lines for formatting
filtered_lines.append(line)
-
+
# Rejoin and clean up extra whitespace
- result = '\n'.join(filtered_lines)
- result = re.sub(r'\n\s*\n\s*\n+', '\n\n', result) # Multiple empty lines to double
+ result = "\n".join(filtered_lines)
+ result = re.sub(r"\n\s*\n\s*\n+", "\n\n", result) # Multiple empty lines to double
result = result.strip()
-
+
return result
@@ -155,7 +158,7 @@ def validate_citation_usage(text: str, total_citations: int) -> Dict[str, Any]:
"citations_total": total_citations,
"usage_percentage": 0.0,
"unused_citations": list(range(1, total_citations + 1)),
- "warning": "Empty report text"
+ "warning": "Empty report text",
}
# Find all citation references in the body text (before References section)
@@ -163,7 +166,7 @@ def validate_citation_usage(text: str, total_citations: int) -> Dict[str, Any]:
# Extract all citation numbers from body text using regex
citations_in_body = set()
- citation_pattern = r'\[(\d+)\]'
+ citation_pattern = r"\[(\d+)\]"
for match in re.finditer(citation_pattern, body_text):
citation_num = int(match.group(1))
citations_in_body.add(citation_num)
@@ -172,21 +175,27 @@ def validate_citation_usage(text: str, total_citations: int) -> Dict[str, Any]:
all_citations = set(range(1, total_citations + 1))
unused_citations = sorted(all_citations - citations_in_body)
- usage_percentage = (len(citations_in_body) / total_citations * 100) if total_citations > 0 else 0
+ usage_percentage = (
+ (len(citations_in_body) / total_citations * 100) if total_citations > 0 else 0
+ )
result = {
"citations_used": len(citations_in_body),
"citations_total": total_citations,
"usage_percentage": usage_percentage,
"unused_citations": unused_citations,
- "used_citations": sorted(citations_in_body)
+ "used_citations": sorted(citations_in_body),
}
# Add warnings if citation usage is low
if usage_percentage < 30:
- result["warning"] = f"Very low citation usage ({usage_percentage:.1f}%). Most sources are not cited in the text."
+ result["warning"] = (
+ f"Very low citation usage ({usage_percentage:.1f}%). Most sources are not cited in the text."
+ )
elif usage_percentage < 50:
- result["warning"] = f"Low citation usage ({usage_percentage:.1f}%). Many sources are not cited in the text."
+ result["warning"] = (
+ f"Low citation usage ({usage_percentage:.1f}%). Many sources are not cited in the text."
+ )
return result
@@ -194,79 +203,99 @@ def validate_citation_usage(text: str, total_citations: int) -> Dict[str, Any]:
def validate_report_completeness(text: str) -> Dict[str, Any]:
"""
Validate that the research report is complete and ready for publication.
-
+
Checks for:
- Placeholder citations
- Incomplete sections
- Unfinished research questions
- Missing content indicators
-
+
Returns:
Dict with validation results and suggestions for fixes
"""
if not text:
return {"is_complete": False, "issues": ["Empty report"], "suggestions": []}
-
+
issues = []
suggestions = []
-
+
# Check for placeholder citations
placeholder_citation_patterns = [
- r'\[Placeholder for[^\]]+\]',
- r'\[\d+\]\s*\[Placeholder[^\]]+\]',
- r'\[Insert citation[^\]]*\]',
- r'\[Reference needed[^\]]*\]',
+ r"\[Placeholder for[^\]]+\]",
+ r"\[\d+\]\s*\[Placeholder[^\]]+\]",
+ r"\[Insert citation[^\]]*\]",
+ r"\[Reference needed[^\]]*\]",
]
-
+
for pattern in placeholder_citation_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
if matches:
issues.append(f"Found {len(matches)} placeholder citations: {matches[:3]}")
- suggestions.append("Replace placeholder citations with actual sources or remove incomplete claims")
-
+ suggestions.append(
+ "Replace placeholder citations with actual sources or remove incomplete claims"
+ )
+
# Check for incomplete research questions sections
if "Research Questions for Investigation" in text:
# Look for sections that seem to be lists of questions without answers
- question_section_match = re.search(r'## Research Questions for Investigation.*?(?=##|$)', text, re.DOTALL)
+ question_section_match = re.search(
+ r"## Research Questions for Investigation.*?(?=##|$)", text, re.DOTALL
+ )
if question_section_match:
question_content = question_section_match.group(0)
# Count questions vs answers
- question_lines = [line for line in question_content.split('\n') if line.strip().startswith('*') or line.strip().startswith('-')]
+ question_lines = [
+ line
+ for line in question_content.split("\n")
+ if line.strip().startswith("*") or line.strip().startswith("-")
+ ]
if len(question_lines) > 3: # Many unanswered questions
issues.append("Report contains unanswered research questions section")
- suggestions.append("Convert research questions into answered findings or remove incomplete section")
-
+ suggestions.append(
+ "Convert research questions into answered findings or remove incomplete section"
+ )
+
# Check for incomplete sections (sections with only placeholders)
- section_pattern = r'##\s+([^#\n]+)\n(.*?)(?=##|$)'
+ section_pattern = r"##\s+([^#\n]+)\n(.*?)(?=##|$)"
sections = re.findall(section_pattern, text, re.DOTALL)
-
+
for section_title, section_content in sections:
# Check if section is mostly placeholders
- placeholder_count = len(re.findall(r'\[[^\]]*(?:placeholder|needed|research|todo|tbd)[^\]]*\]', section_content, re.IGNORECASE))
- content_lines = [line.strip() for line in section_content.split('\n') if line.strip()]
-
+ placeholder_count = len(
+ re.findall(
+ r"\[[^\]]*(?:placeholder|needed|research|todo|tbd)[^\]]*\]",
+ section_content,
+ re.IGNORECASE,
+ )
+ )
+ content_lines = [
+ line.strip() for line in section_content.split("\n") if line.strip()
+ ]
+
if placeholder_count > len(content_lines) / 3: # More than 1/3 placeholders
issues.append(f"Section '{section_title.strip()}' is mostly placeholders")
- suggestions.append(f"Complete content for '{section_title.strip()}' section or remove it")
-
+ suggestions.append(
+ f"Complete content for '{section_title.strip()}' section or remove it"
+ )
+
# Check for incomplete reference lists
- if text.count('[') - text.count(']') != 0:
+ if text.count("[") - text.count("]") != 0:
issues.append("Unmatched brackets detected - possible incomplete citations")
suggestions.append("Review and fix citation formatting")
-
+
# Check for very short sections that might be incomplete
if len(text.split()) < 500: # Very short report
issues.append("Report appears to be very short, possibly incomplete")
suggestions.append("Ensure all research areas are adequately covered")
-
+
is_complete = len(issues) == 0
-
+
return {
"is_complete": is_complete,
"issues": issues,
"suggestions": suggestions,
"word_count": len(text.split()),
- "section_count": len(sections)
+ "section_count": len(sections),
}
@@ -289,22 +318,22 @@ class DeepResearcher:
Based on: https://arxiv.org/abs/2507.16075v1
"""
-
- def __init__(self, client, model: str, max_iterations: int = 5, max_sources: int = 30):
+
+ def __init__(
+ self, client, model: str, max_iterations: int = 5, max_sources: int = 30
+ ):
self.client = client
self.model = model
self.max_iterations = max_iterations
self.max_sources = max_sources
self.session_id = str(uuid.uuid4()) # Unique session ID for this research
self.session_manager = None # Will be set when research starts
- self.research_state = {
- "iteration": 0 # Track current iteration for metadata
- }
+ self.research_state = {"iteration": 0} # Track current iteration for metadata
self.total_tokens = 0
self.citations = {} # Map citation number to source info
self.citation_counter = 0
self.source_content_map = {} # Map URL to content for citations
-
+
# TTD-DR specific components
self.current_draft = "" # Persistent evolving draft
self.draft_history = [] # Track draft evolution
@@ -312,59 +341,57 @@ def __init__(self, client, model: str, max_iterations: int = 5, max_sources: int
"search_strategy": 1.0,
"synthesis_quality": 1.0,
"gap_detection": 1.0,
- "integration_ability": 1.0
+ "integration_ability": 1.0,
}
self.gap_analysis_history = [] # Track identified gaps over time
self.session_manager = None # Browser session manager for web searches
-
+
def cleanup_placeholder_tags(self, text: str) -> str:
"""
Remove any remaining placeholder tags from the final report.
-
+
This is a final cleanup step to ensure no incomplete research tags remain
in the published report.
-
+
Args:
text: Research report text
-
+
Returns:
Text with all placeholder tags removed
"""
return cleanup_placeholder_tags(text)
-
- def fix_incomplete_report(self, report: str, validation: Dict[str, Any], original_query: str) -> str:
+
+ def fix_incomplete_report(
+ self, report: str, validation: Dict[str, Any], original_query: str
+ ) -> str:
"""
Attempt to fix an incomplete report by removing problematic sections
and ensuring a coherent final document.
-
+
This is a fallback when the report contains placeholders or incomplete sections.
"""
print("🔧 Attempting to fix incomplete report...")
-
+
# Start with the basic cleanup
fixed_report = cleanup_placeholder_tags(report)
-
+
# Remove sections that are mostly placeholders or incomplete
if "Research Questions for Investigation" in fixed_report:
# Remove unanswered research questions sections
fixed_report = re.sub(
- r'## Research Questions for Investigation.*?(?=##|$)',
- '',
- fixed_report,
- flags=re.DOTALL
+ r"## Research Questions for Investigation.*?(?=##|$)",
+ "",
+ fixed_report,
+ flags=re.DOTALL,
)
print(" - Removed incomplete research questions section")
-
- # Remove citation placeholders from reference section
- fixed_report = re.sub(
- r'\[\d+\]\s*\[Placeholder[^\]]+\]\n?',
- '',
- fixed_report
- )
-
+
+ # Remove citation placeholders from reference section
+ fixed_report = re.sub(r"\[\d+\]\s*\[Placeholder[^\]]+\]\n?", "", fixed_report)
+
# Clean up any empty sections
- fixed_report = re.sub(r'##\s+([^#\n]+)\n\s*(?=##)', '', fixed_report)
-
+ fixed_report = re.sub(r"##\s+([^#\n]+)\n\s*(?=##)", "", fixed_report)
+
# If report is still very short, add a completion note
if len(fixed_report.split()) < 300:
completion_note = f"""
@@ -377,25 +404,29 @@ def fix_incomplete_report(self, report: str, validation: Dict[str, Any], origina
"""
# Insert before references section if it exists
if "## References" in fixed_report:
- fixed_report = fixed_report.replace("## References", completion_note + "\n## References")
+ fixed_report = fixed_report.replace(
+ "## References", completion_note + "\n## References"
+ )
else:
fixed_report += completion_note
-
+
print(" - Added completion note due to short report length")
-
+
# Final cleanup
- fixed_report = re.sub(r'\n\s*\n\s*\n+', '\n\n', fixed_report)
+ fixed_report = re.sub(r"\n\s*\n\s*\n+", "\n\n", fixed_report)
fixed_report = fixed_report.strip()
-
+
# Validate the fix
new_validation = validate_report_completeness(fixed_report)
if new_validation["is_complete"]:
print("✅ Report successfully fixed and validated")
else:
- print(f"⚠️ Report still has {len(new_validation['issues'])} issues after fixing")
-
+ print(
+ f"⚠️ Report still has {len(new_validation['issues'])} issues after fixing"
+ )
+
return fixed_report
-
+
def decompose_query(self, system_prompt: str, initial_query: str) -> List[str]:
"""
Decompose complex research query into focused sub-queries
@@ -414,83 +445,95 @@ def decompose_query(self, system_prompt: str, initial_query: str) -> List[str]:
Make each sub-query specific and searchable. Focus on different aspects of the main topic.
"""
-
+
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
- {"role": "user", "content": decomposition_prompt}
+ {"role": "user", "content": decomposition_prompt},
],
temperature=0.7,
- max_tokens=1000
+ max_tokens=1000,
)
-
+
+ if not response.choices:
+ raise ValueError("LLM returned empty response in decompose_query")
content = response.choices[0].message.content.strip()
# Clean reasoning tags from query decomposition response
content = clean_reasoning_tags(content)
self.total_tokens += response.usage.completion_tokens
-
+
# Extract numbered queries
queries = []
- for line in content.split('\n'):
+ for line in content.split("\n"):
line = line.strip()
- if re.match(r'^\d+\.', line):
- query = re.sub(r'^\d+\.\s*\[?(.*?)\]?$', r'\1', line).strip()
+ if re.match(r"^\d+\.", line):
+ query = re.sub(r"^\d+\.\s*\[?(.*?)\]?$", r"\1", line).strip()
if query:
queries.append(query)
-
+
return queries[:5] # Limit to 5 sub-queries
-
- except Exception as e:
+
+ except Exception:
# Fallback: use original query
return [initial_query]
-
+
def perform_web_search(self, queries: List[str]) -> str:
"""
Perform web search for multiple queries using the web_search plugin
"""
all_results = []
-
+
# Check if session manager is available
- if not hasattr(self, 'session_manager') or self.session_manager is None:
+ if not hasattr(self, "session_manager") or self.session_manager is None:
# Log warning - this shouldn't happen in normal flow
- print(f"⚠️ Warning: session_manager not available in perform_web_search (session_id: {getattr(self, 'session_id', 'N/A')})")
+ print(
+ f"⚠️ Warning: session_manager not available in perform_web_search (session_id: {getattr(self, 'session_id', 'N/A')})"
+ )
self.session_manager = None
else:
- print(f"📊 Using existing session manager for web search (session_id: {self.session_id}, manager: {id(self.session_manager)})")
-
+ print(
+ f"📊 Using existing session manager for web search (session_id: {self.session_id}, manager: {id(self.session_manager)})"
+ )
+
# Perform individual searches for each query to avoid truncation issues
for i, query in enumerate(queries):
try:
# Format as a clean search query
search_query = f"search for {query.strip()}"
-
+
# Perform search with reduced results per query to stay within limits
results_per_query = max(1, self.max_sources // len(queries))
-
- enhanced_query, _ = web_search_run("", search_query, None, None, {
- "num_results": results_per_query,
- "delay_seconds": None, # Use default random delay (4-32 seconds)
- "headless": False, # Allow CAPTCHA solving if needed
- "session_manager": self.session_manager # Use shared browser session
- })
-
+
+ enhanced_query, _ = web_search_run(
+ "",
+ search_query,
+ None,
+ None,
+ {
+ "num_results": results_per_query,
+ "delay_seconds": None, # Use default random delay (4-32 seconds)
+ "headless": False, # Allow CAPTCHA solving if needed
+ "session_manager": self.session_manager, # Use shared browser session
+ },
+ )
+
if enhanced_query and "Web Search Results" in enhanced_query:
all_results.append(enhanced_query)
-
+
except Exception as e:
# Continue with other queries if one fails
all_results.append(f"Search failed for query '{query}': {str(e)}")
continue
-
+
if not all_results:
return "Web search failed: No results obtained from any query"
-
+
# Combine all search results
combined_results = "\n\n".join(all_results)
return combined_results
-
+
def extract_and_fetch_urls(self, search_results: str) -> Tuple[str, List[Dict]]:
"""
Extract URLs from search results and fetch their content using readurls plugin
@@ -499,53 +542,56 @@ def extract_and_fetch_urls(self, search_results: str) -> Tuple[str, List[Dict]]:
try:
# First extract URLs and metadata from search results
sources = []
-
+
# Pattern to match search result blocks
- result_pattern = r'(\d+)\.\s*\*\*(.+?)\*\*\s*\n\s*URL:\s*(.+?)\n'
+ result_pattern = r"(\d+)\.\s*\*\*(.+?)\*\*\s*\n\s*URL:\s*(.+?)\n"
matches = re.findall(result_pattern, search_results, re.MULTILINE)
-
+
for match in matches:
source = {
- 'number': match[0],
- 'title': match[1].strip(),
- 'url': match[2].strip(),
- 'access_date': datetime.now().strftime('%Y-%m-%d')
+ "number": match[0],
+ "title": match[1].strip(),
+ "url": match[2].strip(),
+ "access_date": datetime.now().strftime("%Y-%m-%d"),
}
sources.append(source)
-
+
# If regex doesn't work, try line-by-line parsing
if not sources:
- lines = search_results.split('\n')
+ lines = search_results.split("\n")
current_source = {}
-
+
for i, line in enumerate(lines):
# Check for numbered item with title
- title_match = re.match(r'^(\d+)\.\s*\*\*(.+?)\*\*', line.strip())
+ title_match = re.match(r"^(\d+)\.\s*\*\*(.+?)\*\*", line.strip())
if title_match:
- if current_source and 'url' in current_source:
+ if current_source and "url" in current_source:
sources.append(current_source)
current_source = {
- 'number': title_match.group(1),
- 'title': title_match.group(2).strip()
+ "number": title_match.group(1),
+ "title": title_match.group(2).strip(),
}
# Check for URL line
- elif line.strip().startswith('URL:') and current_source:
+ elif line.strip().startswith("URL:") and current_source:
url = line.strip()[4:].strip()
- current_source['url'] = url
- current_source['access_date'] = datetime.now().strftime('%Y-%m-%d')
-
- if current_source and 'url' in current_source:
+ current_source["url"] = url
+ current_source["access_date"] = datetime.now().strftime(
+ "%Y-%m-%d"
+ )
+
+ if current_source and "url" in current_source:
sources.append(current_source)
-
+
# Fetch content for all URLs
content_with_urls, _ = readurls_run("", search_results, None, None)
-
+
return content_with_urls, sources
except Exception as e:
return f"URL fetching failed: {str(e)}", []
-
-
- def evaluate_completeness(self, system_prompt: str, query: str, current_synthesis: str) -> Tuple[bool, List[str]]:
+
+ def evaluate_completeness(
+ self, system_prompt: str, query: str, current_synthesis: str
+ ) -> Tuple[bool, List[str]]:
"""
Evaluate if the current research is complete or needs more information
Returns (is_complete, list_of_missing_aspects)
@@ -565,39 +611,47 @@ def evaluate_completeness(self, system_prompt: str, query: str, current_synthesi
COMPLETE: [YES/NO]
MISSING: [list any missing aspects, one per line, or "None" if complete]
"""
-
+
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
- {"role": "user", "content": evaluation_prompt}
+ {"role": "user", "content": evaluation_prompt},
],
temperature=0.3,
- max_tokens=500
+ max_tokens=500,
)
-
+
+ if not response.choices:
+ raise ValueError("LLM returned empty response in evaluate_completeness")
content = response.choices[0].message.content.strip()
# Clean reasoning tags from completeness evaluation response
content = clean_reasoning_tags(content)
self.total_tokens += response.usage.completion_tokens
-
+
# Parse response
is_complete = "COMPLETE: YES" in content.upper()
-
+
missing_aspects = []
if "MISSING:" in content.upper():
missing_section = content.split("MISSING:")[-1].strip()
if missing_section.upper() != "NONE":
- missing_aspects = [line.strip() for line in missing_section.split('\n') if line.strip()]
-
+ missing_aspects = [
+ line.strip()
+ for line in missing_section.split("\n")
+ if line.strip()
+ ]
+
return is_complete, missing_aspects
-
- except Exception as e:
+
+ except Exception:
# Default to not complete on error
return False, ["Error in evaluation"]
-
- def generate_focused_queries(self, missing_aspects: List[str], original_query: str) -> List[str]:
+
+ def generate_focused_queries(
+ self, missing_aspects: List[str], original_query: str
+ ) -> List[str]:
"""
Generate focused search queries to address missing aspects
"""
@@ -606,9 +660,9 @@ def generate_focused_queries(self, missing_aspects: List[str], original_query: s
# Create a focused query combining the original topic with the missing aspect
focused_query = f"{original_query} {aspect}"
focused_queries.append(focused_query)
-
+
return focused_queries[:3] # Limit to 3 additional queries per iteration
-
+
def generate_preliminary_draft(self, system_prompt: str, initial_query: str) -> str:
"""
Generate the preliminary draft (updatable skeleton) from LLM internal knowledge
@@ -640,28 +694,34 @@ def generate_preliminary_draft(self, system_prompt: str, initial_query: str) ->
Include AT LEAST 5-10 [NEEDS RESEARCH] or [SOURCE NEEDED] tags throughout the draft.
Be explicit about what you don't know and what needs external validation.
"""
-
+
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
- {"role": "user", "content": draft_prompt}
+ {"role": "user", "content": draft_prompt},
],
temperature=0.7,
- max_tokens=2000
+ max_tokens=2000,
)
-
+
+ if not response.choices:
+ raise ValueError(
+ "LLM returned empty response in generate_preliminary_draft"
+ )
draft = response.choices[0].message.content.strip()
draft = clean_reasoning_tags(draft)
self.total_tokens += response.usage.completion_tokens
-
+
return draft
-
+
except Exception as e:
return f"Failed to generate preliminary draft: {str(e)}"
-
- def analyze_draft_gaps(self, current_draft: str, original_query: str) -> List[Dict[str, str]]:
+
+ def analyze_draft_gaps(
+ self, current_draft: str, original_query: str
+ ) -> List[Dict[str, str]]:
"""
Analyze the current draft to identify gaps, weaknesses, and areas needing research
This guides the next retrieval iteration (draft-guided search)
@@ -702,105 +762,128 @@ def analyze_draft_gaps(self, current_draft: str, original_query: str) -> List[Di
Even well-written sections can benefit from additional evidence, examples, or perspectives.
Push for depth, accuracy, and comprehensiveness in the research.
"""
-
+
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
- {"role": "system", "content": "You are an expert research analyst."},
- {"role": "user", "content": gap_analysis_prompt}
+ {
+ "role": "system",
+ "content": "You are an expert research analyst.",
+ },
+ {"role": "user", "content": gap_analysis_prompt},
],
temperature=0.3,
- max_tokens=1000
+ max_tokens=1000,
)
-
+
+ if not response.choices:
+ raise ValueError("LLM returned empty response in analyze_draft_gaps")
content = response.choices[0].message.content.strip()
content = clean_reasoning_tags(content)
self.total_tokens += response.usage.completion_tokens
-
+
# Parse the gaps
gaps = []
current_gap = {}
-
- for line in content.split('\n'):
+
+ for line in content.split("\n"):
line = line.strip()
- if line.startswith('GAP_ID:'):
+ if line.startswith("GAP_ID:"):
if current_gap:
gaps.append(current_gap)
- current_gap = {'id': line.split(':', 1)[1].strip()}
- elif line.startswith('SECTION:'):
- current_gap['section'] = line.split(':', 1)[1].strip()
- elif line.startswith('GAP_TYPE:'):
- current_gap['gap_type'] = line.split(':', 1)[1].strip()
- elif line.startswith('SPECIFIC_NEED:'):
- current_gap['specific_need'] = line.split(':', 1)[1].strip()
- elif line.startswith('SEARCH_QUERY:'):
- current_gap['search_query'] = line.split(':', 1)[1].strip()
- elif line.startswith('PRIORITY:'):
- current_gap['priority'] = line.split(':', 1)[1].strip()
-
+ current_gap = {"id": line.split(":", 1)[1].strip()}
+ elif line.startswith("SECTION:"):
+ current_gap["section"] = line.split(":", 1)[1].strip()
+ elif line.startswith("GAP_TYPE:"):
+ current_gap["gap_type"] = line.split(":", 1)[1].strip()
+ elif line.startswith("SPECIFIC_NEED:"):
+ current_gap["specific_need"] = line.split(":", 1)[1].strip()
+ elif line.startswith("SEARCH_QUERY:"):
+ current_gap["search_query"] = line.split(":", 1)[1].strip()
+ elif line.startswith("PRIORITY:"):
+ current_gap["priority"] = line.split(":", 1)[1].strip()
+
if current_gap:
gaps.append(current_gap)
-
+
return gaps
-
- except Exception as e:
+
+ except Exception:
# Fallback: create basic gaps from the draft
- return [{
- 'id': '1',
- 'section': 'General',
- 'gap_type': 'MISSING_INFO',
- 'specific_need': 'More detailed information needed',
- 'search_query': original_query
- }]
-
+ return [
+ {
+ "id": "1",
+ "section": "General",
+ "gap_type": "MISSING_INFO",
+ "specific_need": "More detailed information needed",
+ "search_query": original_query,
+ }
+ ]
+
def perform_gap_targeted_search(self, gaps: List[Dict[str, str]]) -> str:
"""
Perform targeted searches based on identified gaps in the current draft
Prioritizes HIGH priority gaps (placeholder tags) first
"""
all_results = []
-
+
# Check if session manager is available
- if not hasattr(self, 'session_manager') or self.session_manager is None:
+ if not hasattr(self, "session_manager") or self.session_manager is None:
# Log warning - this shouldn't happen in normal flow
print("⚠️ Warning: session_manager not available in perform_web_search")
self.session_manager = None
-
+
# Sort gaps by priority - HIGH priority first (placeholder tags)
- sorted_gaps = sorted(gaps, key=lambda g: (
- 0 if g.get('priority', '').upper() == 'HIGH' else
- 1 if g.get('priority', '').upper() == 'MEDIUM' else 2
- ))
-
+ sorted_gaps = sorted(
+ gaps,
+ key=lambda g: (
+ 0
+ if g.get("priority", "").upper() == "HIGH"
+ else 1 if g.get("priority", "").upper() == "MEDIUM" else 2
+ ),
+ )
+
for gap in sorted_gaps:
- search_query = gap.get('search_query', '')
+ search_query = gap.get("search_query", "")
if not search_query:
continue
-
+
try:
# Format as a clean search query
search_query = f"search for {search_query.strip()}"
-
+
# Perform search with context about what gap we're filling
- enhanced_query, _ = web_search_run("", search_query, None, None, {
- "num_results": max(1, self.max_sources // len(gaps)),
- "delay_seconds": None, # Use default random delay (4-32 seconds)
- "headless": False,
- "session_manager": self.session_manager # Use shared browser session
- })
-
+ enhanced_query, _ = web_search_run(
+ "",
+ search_query,
+ None,
+ None,
+ {
+ "num_results": max(1, self.max_sources // len(gaps)),
+ "delay_seconds": None, # Use default random delay (4-32 seconds)
+ "headless": False,
+ "session_manager": self.session_manager, # Use shared browser session
+ },
+ )
+
if enhanced_query and "Web Search Results" in enhanced_query:
# Tag results with gap context
gap_context = f"[ADDRESSING GAP: {gap.get('section', 'Unknown')} - {gap.get('specific_need', 'General research')}]\n"
all_results.append(gap_context + enhanced_query)
-
- except Exception as e:
+
+ except Exception:
continue
-
- return "\n\n".join(all_results) if all_results else "No gap-targeted search results obtained"
-
- def denoise_draft_with_retrieval(self, current_draft: str, retrieval_content: str, original_query: str) -> str:
+
+ return (
+ "\n\n".join(all_results)
+ if all_results
+ else "No gap-targeted search results obtained"
+ )
+
+ def denoise_draft_with_retrieval(
+ self, current_draft: str, retrieval_content: str, original_query: str
+ ) -> str:
"""
Core denoising step: integrate retrieved information with current draft
This is the heart of the diffusion process
@@ -847,28 +930,37 @@ def denoise_draft_with_retrieval(self, current_draft: str, retrieval_content: st
Return the improved draft with integrated information and comprehensive citations.
"""
-
+
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
- {"role": "system", "content": "You are an expert research synthesizer performing draft denoising."},
- {"role": "user", "content": denoising_prompt}
+ {
+ "role": "system",
+ "content": "You are an expert research synthesizer performing draft denoising.",
+ },
+ {"role": "user", "content": denoising_prompt},
],
temperature=0.6,
- max_tokens=3000
+ max_tokens=3000,
)
-
+
+ if not response.choices:
+ raise ValueError(
+ "LLM returned empty response in denoise_draft_with_retrieval"
+ )
denoised_draft = response.choices[0].message.content.strip()
denoised_draft = clean_reasoning_tags(denoised_draft)
self.total_tokens += response.usage.completion_tokens
-
+
return denoised_draft
-
+
except Exception as e:
return f"Denoising failed: {str(e)}\n\nFalling back to current draft:\n{current_draft}"
-
- def evaluate_draft_quality(self, draft: str, previous_draft: str, original_query: str) -> Dict[str, float]:
+
+ def evaluate_draft_quality(
+ self, draft: str, previous_draft: str, original_query: str
+ ) -> Dict[str, float]:
"""
Evaluate the quality improvement of the current draft vs previous iteration
Used for termination decisions and component fitness updates
@@ -901,46 +993,53 @@ def evaluate_draft_quality(self, draft: str, previous_draft: str, original_query
CITATIONS: [score]
IMPROVEMENT: [score]
"""
-
+
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
- {"role": "system", "content": "You are an expert research quality evaluator."},
- {"role": "user", "content": evaluation_prompt}
+ {
+ "role": "system",
+ "content": "You are an expert research quality evaluator.",
+ },
+ {"role": "user", "content": evaluation_prompt},
],
temperature=0.2,
- max_tokens=500
+ max_tokens=500,
)
-
+
+ if not response.choices:
+ raise ValueError(
+ "LLM returned empty response in evaluate_draft_quality"
+ )
content = response.choices[0].message.content.strip()
content = clean_reasoning_tags(content)
self.total_tokens += response.usage.completion_tokens
-
+
# Parse scores
scores = {}
- for line in content.split('\n'):
- if ':' in line:
- key, value = line.split(':', 1)
+ for line in content.split("\n"):
+ if ":" in line:
+ key, value = line.split(":", 1)
key = key.strip().lower()
try:
scores[key] = float(value.strip())
except ValueError:
scores[key] = 0.5 # Default score
-
+
return scores
-
- except Exception as e:
+
+ except Exception:
# Default scores
return {
- 'completeness': 0.5,
- 'accuracy': 0.5,
- 'depth': 0.5,
- 'coherence': 0.5,
- 'citations': 0.5,
- 'improvement': 0.1
+ "completeness": 0.5,
+ "accuracy": 0.5,
+ "depth": 0.5,
+ "coherence": 0.5,
+ "citations": 0.5,
+ "improvement": 0.1,
}
-
+
def update_component_fitness(self, quality_scores: Dict[str, float]):
"""
Update component fitness based on performance (self-evolution)
@@ -950,20 +1049,22 @@ def update_component_fitness(self, quality_scores: Dict[str, float]):
component-wise self-evolutionary optimization as described in the TTD-DR paper.
"""
# Update fitness based on quality improvements
- improvement = quality_scores.get('improvement', 0.0)
-
+ improvement = quality_scores.get("improvement", 0.0)
+
if improvement > 0.1: # Significant improvement
- self.component_fitness['search_strategy'] *= 1.1
- self.component_fitness['synthesis_quality'] *= 1.1
- self.component_fitness['integration_ability'] *= 1.1
+ self.component_fitness["search_strategy"] *= 1.1
+ self.component_fitness["synthesis_quality"] *= 1.1
+ self.component_fitness["integration_ability"] *= 1.1
elif improvement < 0.05: # Poor improvement
- self.component_fitness['search_strategy'] *= 0.95
- self.component_fitness['synthesis_quality'] *= 0.95
-
+ self.component_fitness["search_strategy"] *= 0.95
+ self.component_fitness["synthesis_quality"] *= 0.95
+
# Cap fitness values
for key in self.component_fitness:
- self.component_fitness[key] = max(0.1, min(2.0, self.component_fitness[key]))
-
+ self.component_fitness[key] = max(
+ 0.1, min(2.0, self.component_fitness[key])
+ )
+
def research(self, system_prompt: str, initial_query: str) -> Tuple[str, int]:
"""
TTD-DR (Test-Time Diffusion Deep Researcher) main algorithm
@@ -978,35 +1079,46 @@ def research(self, system_prompt: str, initial_query: str) -> Tuple[str, int]:
Note: Component-wise self-evolutionary optimization is tracked but not yet
used to modify behavior (future enhancement).
"""
-
+
# Get or create a browser session for this research session
- self.session_manager = get_session_manager(self.session_id, headless=False, timeout=30)
+ self.session_manager = get_session_manager(
+ self.session_id, headless=False, timeout=30
+ )
if self.session_manager:
- print(f"🔬 Starting deep research with session ID: {self.session_id} (DeepResearcher instance: {id(self)})")
+ print(
+ f"🔬 Starting deep research with session ID: {self.session_id} (DeepResearcher instance: {id(self)})"
+ )
else:
print("⚠️ Failed to create browser session, proceeding without web search")
-
+
try:
# PHASE 1: INITIALIZATION - Generate preliminary draft (updatable skeleton)
print("TTD-DR: Generating preliminary draft...")
- self.current_draft = self.generate_preliminary_draft(system_prompt, initial_query)
+ self.current_draft = self.generate_preliminary_draft(
+ system_prompt, initial_query
+ )
self.draft_history.append(self.current_draft)
-
+
# PHASE 1.5: INITIAL RESEARCH - Ensure we always gather external sources
print("TTD-DR: Performing initial research...")
initial_queries = self.decompose_query(system_prompt, initial_query)
if initial_queries:
print(f" - Searching for {len(initial_queries)} initial topics...")
initial_search_results = self.perform_web_search(initial_queries)
-
+
# Extract and fetch URLs from initial search
- if initial_search_results and "Web Search Results" in initial_search_results:
+ if (
+ initial_search_results
+ and "Web Search Results" in initial_search_results
+ ):
print(" - Extracting initial sources...")
- initial_content, initial_sources = self.extract_and_fetch_urls(initial_search_results)
-
+ initial_content, initial_sources = self.extract_and_fetch_urls(
+ initial_search_results
+ )
+
# Register initial sources for citations
for source in initial_sources:
- if 'url' in source:
+ if "url" in source:
self.citation_counter += 1
self.citations[self.citation_counter] = source
@@ -1017,44 +1129,55 @@ def research(self, system_prompt: str, initial_query: str) -> Tuple[str, int]:
print(" - Warning: Could not decompose query for initial research")
# Fallback: Create simple search queries from the original query
print(" - Using fallback search strategy...")
- fallback_queries = [initial_query] # At minimum, search for the original query
+ fallback_queries = [
+ initial_query
+ ] # At minimum, search for the original query
fallback_search_results = self.perform_web_search(fallback_queries)
- if fallback_search_results and "Web Search Results" in fallback_search_results:
- fallback_content, fallback_sources = self.extract_and_fetch_urls(fallback_search_results)
+ if (
+ fallback_search_results
+ and "Web Search Results" in fallback_search_results
+ ):
+ fallback_content, fallback_sources = self.extract_and_fetch_urls(
+ fallback_search_results
+ )
for source in fallback_sources:
- if 'url' in source:
+ if "url" in source:
self.citation_counter += 1
self.citations[self.citation_counter] = source
print(f" - Fallback search found {len(fallback_sources)} sources")
-
+
# PHASE 2: ITERATIVE DENOISING LOOP
for iteration in range(self.max_iterations):
self.research_state["iteration"] = iteration + 1
- print(f"TTD-DR: Denoising iteration {iteration + 1}/{self.max_iterations}")
-
+ print(
+ f"TTD-DR: Denoising iteration {iteration + 1}/{self.max_iterations}"
+ )
+
# STEP 1: Analyze current draft for gaps (draft-guided search)
print(" - Analyzing draft gaps...")
gaps = self.analyze_draft_gaps(self.current_draft, initial_query)
self.gap_analysis_history.append(gaps)
-
+
if not gaps:
print(" - No significant gaps found, research complete")
break
-
+
# STEP 2: Perform gap-targeted retrieval
print(f" - Performing targeted search for {len(gaps)} gaps...")
retrieval_content = self.perform_gap_targeted_search(gaps)
-
+
# STEP 3: Extract and fetch URLs from search results
print(" - Extracting and fetching content...")
- content_with_urls, sources = self.extract_and_fetch_urls(retrieval_content)
-
+ content_with_urls, sources = self.extract_and_fetch_urls(
+ retrieval_content
+ )
+
# Register sources for citations
for source in sources:
- if 'url' in source:
+ if "url" in source:
self.citation_counter += 1
self.citations[self.citation_counter] = source
-
+
# STEP 4: DENOISING - Integrate retrieved info with current draft
print(" - Performing denoising step...")
previous_draft = self.current_draft
@@ -1062,22 +1185,24 @@ def research(self, system_prompt: str, initial_query: str) -> Tuple[str, int]:
self.current_draft, content_with_urls, initial_query
)
self.draft_history.append(self.current_draft)
-
+
# STEP 5: Evaluate quality improvement
print(" - Evaluating draft quality...")
quality_scores = self.evaluate_draft_quality(
self.current_draft, previous_draft, initial_query
)
-
+
# STEP 6: Component self-evolution based on feedback
self.update_component_fitness(quality_scores)
-
+
# STEP 7: Check termination conditions
- completeness = quality_scores.get('completeness', 0.0)
- improvement = quality_scores.get('improvement', 0.0)
-
- print(f" - Quality scores: Completeness={completeness:.2f}, Improvement={improvement:.2f}")
-
+ completeness = quality_scores.get("completeness", 0.0)
+ improvement = quality_scores.get("improvement", 0.0)
+
+ print(
+ f" - Quality scores: Completeness={completeness:.2f}, Improvement={improvement:.2f}"
+ )
+
# Terminate if high quality achieved or minimal improvement
# More lenient termination to ensure complete research
if completeness > 0.9 or (improvement < 0.03 and completeness > 0.7):
@@ -1086,26 +1211,30 @@ def research(self, system_prompt: str, initial_query: str) -> Tuple[str, int]:
# PHASE 3: FINALIZATION - Polish the final draft
print("TTD-DR: Finalizing research report...")
-
+
# Ensure we have gathered some sources
if len(self.citations) == 0:
print("⚠️ Warning: No external sources found during research!")
print(" Deep research should always consult external sources.")
else:
print(f"✅ Research completed with {len(self.citations)} sources")
-
- final_report = self.finalize_research_report(system_prompt, initial_query, self.current_draft)
-
+
+ final_report = self.finalize_research_report(
+ system_prompt, initial_query, self.current_draft
+ )
+
return final_report, self.total_tokens
-
+
finally:
# Clean up browser session
if self.session_manager:
print(f"🏁 Closing research session: {self.session_id}")
close_session(self.session_id)
self.session_manager = None
-
- def finalize_research_report(self, system_prompt: str, original_query: str, final_draft: str) -> str:
+
+ def finalize_research_report(
+ self, system_prompt: str, original_query: str, final_draft: str
+ ) -> str:
"""
Apply final polishing to the research report
"""
@@ -1152,80 +1281,109 @@ def finalize_research_report(self, system_prompt: str, original_query: str, fina
Return the final polished research report with comprehensive citations.
"""
-
+
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
- {"role": "user", "content": finalization_prompt}
+ {"role": "user", "content": finalization_prompt},
],
temperature=0.5,
- max_tokens=3000
+ max_tokens=3000,
)
-
+
+ if not response.choices:
+ raise ValueError(
+ "LLM returned empty response in finalize_research_report"
+ )
polished_report = response.choices[0].message.content.strip()
polished_report = clean_reasoning_tags(polished_report)
-
+
# Final cleanup: Remove any remaining placeholder tags
polished_report = self.cleanup_placeholder_tags(polished_report)
-
+
# Validate report completeness
validation = validate_report_completeness(polished_report)
-
+
if not validation["is_complete"]:
- print(f"⚠️ Report validation found {len(validation['issues'])} issues:")
- for issue in validation['issues']:
+ print(
+ f"⚠️ Report validation found {len(validation['issues'])} issues:"
+ )
+ for issue in validation["issues"]:
print(f" - {issue}")
-
+
# Attempt to fix incomplete report
- polished_report = self.fix_incomplete_report(polished_report, validation, original_query)
+ polished_report = self.fix_incomplete_report(
+ polished_report, validation, original_query
+ )
else:
print("✅ Report validation passed - report is complete")
-
+
self.total_tokens += response.usage.completion_tokens
# Remove any References section the LLM might have created
- polished_report = re.sub(r'##\s*References.*?(?=##|\Z)', '', polished_report, flags=re.DOTALL)
- polished_report = re.sub(r'(?m)^References\s*\n\s*(?:\[\d+\]\s*\n)+', '', polished_report)
- polished_report = re.sub(r'\n\s*\n\s*\n+', '\n\n', polished_report) # Clean up extra newlines
+ polished_report = re.sub(
+ r"##\s*References.*?(?=##|\Z)", "", polished_report, flags=re.DOTALL
+ )
+ polished_report = re.sub(
+ r"(?m)^References\s*\n\s*(?:\[\d+\]\s*\n)+", "", polished_report
+ )
+ polished_report = re.sub(
+ r"\n\s*\n\s*\n+", "\n\n", polished_report
+ ) # Clean up extra newlines
# Validate citation usage before adding references
- citation_validation = validate_citation_usage(polished_report, len(self.citations))
- print(f"📊 Citation Statistics:")
- print(f" - Used citations: {citation_validation['citations_used']}/{citation_validation['citations_total']}")
- print(f" - Usage percentage: {citation_validation['usage_percentage']:.1f}%")
+ citation_validation = validate_citation_usage(
+ polished_report, len(self.citations)
+ )
+ print("📊 Citation Statistics:")
+ print(
+ f" - Used citations: {citation_validation['citations_used']}/{citation_validation['citations_total']}"
+ )
+ print(
+ f" - Usage percentage: {citation_validation['usage_percentage']:.1f}%"
+ )
if "warning" in citation_validation:
print(f"⚠️ {citation_validation['warning']}")
- if len(citation_validation['unused_citations']) > 0:
- print(f" - Unused citations: {citation_validation['unused_citations'][:10]}" +
- (f"... and {len(citation_validation['unused_citations']) - 10} more"
- if len(citation_validation['unused_citations']) > 10 else ""))
+ if len(citation_validation["unused_citations"]) > 0:
+ print(
+ f" - Unused citations: {citation_validation['unused_citations'][:10]}"
+ + (
+ f"... and {len(citation_validation['unused_citations']) - 10} more"
+ if len(citation_validation["unused_citations"]) > 10
+ else ""
+ )
+ )
# Add references section (only for citations that are actually used)
references = "\n\n## References\n\n"
- used_citations = set(citation_validation['used_citations'])
+ used_citations = set(citation_validation["used_citations"])
for num, source in sorted(self.citations.items()):
# Only include citations that are actually used in the text
if num in used_citations:
- title = source.get('title', 'Untitled')
- url = source['url']
- access_date = source.get('access_date', datetime.now().strftime('%Y-%m-%d'))
+ title = source.get("title", "Untitled")
+ url = source["url"]
+ access_date = source.get(
+ "access_date", datetime.now().strftime("%Y-%m-%d")
+ )
references += f"[{num}] {title}. Available at: <{url}> [Accessed: {access_date}]\n\n"
-
+
# Add TTD-DR metadata
metadata = "\n---\n\n**TTD-DR Research Metadata:**\n"
- metadata += f"- Algorithm: Test-Time Diffusion Deep Researcher\n"
+ metadata += "- Algorithm: Test-Time Diffusion Deep Researcher\n"
metadata += f"- Denoising iterations: {len(self.draft_history) - 1}\n"
metadata += f"- Total gaps addressed: {sum(len(gaps) for gaps in self.gap_analysis_history)}\n"
metadata += f"- Total sources consulted: {len(self.citations)}\n"
metadata += f"- Citations used in text: {citation_validation['citations_used']} ({citation_validation['usage_percentage']:.1f}%)\n"
- metadata += f"- Report generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
+ metadata += (
+ f"- Report generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
+ )
metadata += f"- Total tokens used: {self.total_tokens}\n"
return polished_report + references + metadata
-
+
except Exception as e:
- return f"Finalization failed: {str(e)}\n\nReturning current draft:\n{final_draft}"
\ No newline at end of file
+ return f"Finalization failed: {str(e)}\n\nReturning current draft:\n{final_draft}"
diff --git a/optillm/plugins/spl/evaluation.py b/optillm/plugins/spl/evaluation.py
index 87d07f86..c386d9b7 100644
--- a/optillm/plugins/spl/evaluation.py
+++ b/optillm/plugins/spl/evaluation.py
@@ -4,73 +4,97 @@
import logging
from datetime import datetime
-from typing import Dict, List, Optional, Tuple, Any
+from typing import Dict, List, Optional, Any
from optillm.plugins.spl.strategy import Strategy
from optillm.plugins.spl.utils import extract_thinking
from optillm.plugins.spl.prompts import (
STRATEGY_EVALUATION_PROMPT,
- STRATEGY_REFINEMENT_PROMPT
+ STRATEGY_REFINEMENT_PROMPT,
)
from optillm.plugins.spl.config import (
DEFAULT_MAX_TOKENS,
MAX_STRATEGIES_FOR_INFERENCE,
- MIN_SUCCESS_RATE_FOR_INFERENCE
+ MIN_SUCCESS_RATE_FOR_INFERENCE,
)
# Setup logging
logger = logging.getLogger(__name__)
-def select_relevant_strategies(query: str, problem_type: str, db: Any, learning_mode: bool = False, max_strategies: int = MAX_STRATEGIES_FOR_INFERENCE) -> List[Strategy]:
+
+def select_relevant_strategies(
+ query: str,
+ problem_type: str,
+ db: Any,
+ learning_mode: bool = False,
+ max_strategies: int = MAX_STRATEGIES_FOR_INFERENCE,
+) -> List[Strategy]:
"""
Select the most relevant strategies for a given problem to be used during inference.
This controls how many strategies are included in the system prompt augmentation.
-
+
When in inference mode (not learning_mode), only strategies with:
- - A matching problem type
+ - A matching problem type
- Success rate >= MIN_SUCCESS_RATE_FOR_INFERENCE
- At least 5 attempts
are selected.
-
+
In learning mode, strategies with fewer attempts are also considered.
-
+
Args:
query: The problem/query text
problem_type: The type of problem
db: Strategy database
learning_mode: Whether we're in learning mode (affects filtering criteria)
max_strategies: Maximum number of strategies to return
-
+
Returns:
List[Strategy]: The selected strategies (may be empty if none meet criteria)
"""
# First, get strategies specifically for this problem type
type_specific = db.get_strategies_for_problem(problem_type)
- logger.info(f"Found {len(type_specific)} strategies for problem type '{problem_type}'")
-
+ logger.info(
+ f"Found {len(type_specific)} strategies for problem type '{problem_type}'"
+ )
+
# Filter strategies by minimum success rate and attempts
qualified_strategies = []
for strategy in type_specific:
# In learning mode, we're more lenient with new strategies
if learning_mode and strategy.total_attempts < 5:
- logger.info(f"Strategy {strategy.strategy_id} included (learning mode - only {strategy.total_attempts} attempts so far)")
+ logger.info(
+ f"Strategy {strategy.strategy_id} included (learning mode - only {strategy.total_attempts} attempts so far)"
+ )
qualified_strategies.append(strategy)
# For inference or well-tested strategies, we require minimum success rate
- elif strategy.success_rate >= MIN_SUCCESS_RATE_FOR_INFERENCE and strategy.total_attempts >= 5:
- logger.info(f"Strategy {strategy.strategy_id} qualified - success rate {strategy.success_rate:.2f} >= minimum {MIN_SUCCESS_RATE_FOR_INFERENCE:.2f} with {strategy.total_attempts} attempts")
+ elif (
+ strategy.success_rate >= MIN_SUCCESS_RATE_FOR_INFERENCE
+ and strategy.total_attempts >= 5
+ ):
+ logger.info(
+ f"Strategy {strategy.strategy_id} qualified - success rate {strategy.success_rate:.2f} >= minimum {MIN_SUCCESS_RATE_FOR_INFERENCE:.2f} with {strategy.total_attempts} attempts"
+ )
qualified_strategies.append(strategy)
else:
if strategy.total_attempts < 5:
- logger.info(f"Strategy {strategy.strategy_id} skipped - insufficient attempts ({strategy.total_attempts} < 5) in inference mode")
+ logger.info(
+ f"Strategy {strategy.strategy_id} skipped - insufficient attempts ({strategy.total_attempts} < 5) in inference mode"
+ )
else:
- logger.info(f"Strategy {strategy.strategy_id} skipped - success rate {strategy.success_rate:.2f} < minimum {MIN_SUCCESS_RATE_FOR_INFERENCE:.2f}")
-
+ logger.info(
+ f"Strategy {strategy.strategy_id} skipped - success rate {strategy.success_rate:.2f} < minimum {MIN_SUCCESS_RATE_FOR_INFERENCE:.2f}"
+ )
+
if not qualified_strategies:
- logger.info(f"No strategies meet the minimum success rate threshold ({MIN_SUCCESS_RATE_FOR_INFERENCE:.2f}) for problem type '{problem_type}'")
+ logger.info(
+ f"No strategies meet the minimum success rate threshold ({MIN_SUCCESS_RATE_FOR_INFERENCE:.2f}) for problem type '{problem_type}'"
+ )
return []
-
- logger.info(f"Found {len(qualified_strategies)} strategies that meet minimum success rate requirement")
-
+
+ logger.info(
+ f"Found {len(qualified_strategies)} strategies that meet minimum success rate requirement"
+ )
+
# If we have more qualified strategies than needed, sort and select the best ones
if len(qualified_strategies) > max_strategies:
# Score each strategy based on success rate and recency
@@ -81,103 +105,129 @@ def select_relevant_strategies(query: str, problem_type: str, db: Any, learning_
# Calculate days since last use
last_used = datetime.fromisoformat(strategy.last_used)
days_since = (datetime.now() - last_used).days
- recency_score = max(0, 1.0 - min(1.0, days_since / 30.0)) # Higher for more recent
-
+ recency_score = max(
+ 0, 1.0 - min(1.0, days_since / 30.0)
+ ) # Higher for more recent
+
# Combined score with success rate weighing more
score = (0.7 * strategy.success_rate) + (0.3 * recency_score)
scored_strategies.append((strategy, score))
-
+
# Sort by score (descending) and take top strategies
scored_strategies.sort(key=lambda x: x[1], reverse=True)
selected = [s[0] for s in scored_strategies[:max_strategies]]
-
+
# Log which strategies we're using
for i, strategy in enumerate(selected, 1):
- logger.info(f"Selected strategy {i}/{max_strategies} for inference: {strategy.strategy_id} (success rate: {strategy.success_rate:.2f})")
-
+ logger.info(
+ f"Selected strategy {i}/{max_strategies} for inference: {strategy.strategy_id} (success rate: {strategy.success_rate:.2f})"
+ )
+
return selected
-
+
# If we have fewer or equal to the maximum, use all qualified strategies
for i, strategy in enumerate(qualified_strategies, 1):
- logger.info(f"Selected strategy {i}/{len(qualified_strategies)} for inference: {strategy.strategy_id} (success rate: {strategy.success_rate:.2f})")
-
+ logger.info(
+ f"Selected strategy {i}/{len(qualified_strategies)} for inference: {strategy.strategy_id} (success rate: {strategy.success_rate:.2f})"
+ )
+
return qualified_strategies
-def evaluate_strategy_effectiveness(response: str, thinking: Optional[str], selected_strategies: List[Strategy], client, model: str) -> Dict[str, bool]:
+
+def evaluate_strategy_effectiveness(
+ response: str,
+ thinking: Optional[str],
+ selected_strategies: List[Strategy],
+ client,
+ model: str,
+) -> Dict[str, bool]:
"""
Evaluate how effective each strategy was in generating the response.
-
+
Args:
response: The LLM's final response to the query
thinking: The LLM's reasoning process (if any)
selected_strategies: The strategies that were used
client: LLM client for making API calls
model: Model identifier
-
+
Returns:
Dict[str, bool]: Mapping from strategy ID to effectiveness (True/False)
"""
if not selected_strategies:
return {}
-
+
results = {}
-
+
try:
for strategy in selected_strategies:
# Include thinking in the evaluation if available
full_response = thinking + "\n\n" + response if thinking else response
-
+
messages = [
+ {"role": "system", "content": STRATEGY_EVALUATION_PROMPT},
{
- "role": "system",
- "content": STRATEGY_EVALUATION_PROMPT
- },
- {
- "role": "user",
+ "role": "user",
"content": (
f"Strategy:\n{strategy.strategy_text}\n\n"
f"Response (including reasoning):\n{full_response}\n\n"
f"Does the response show clear evidence that the strategy was effectively applied? "
f"Answer with ONLY YES or NO."
- )
- }
+ ),
+ },
]
-
+
eval_response = client.chat.completions.create(
model=model,
messages=messages,
temperature=0.1, # Low temperature for more deterministic output
- max_tokens=DEFAULT_MAX_TOKENS # Increased token limit for reasoning LLMs
+ max_tokens=DEFAULT_MAX_TOKENS, # Increased token limit for reasoning LLMs
)
-
+
# Get the response and extract final answer (remove thinking blocks)
+ if not eval_response.choices:
+ raise ValueError(
+ "LLM returned empty response in evaluate_strategy_effectiveness"
+ )
result_text = eval_response.choices[0].message.content
final_result, eval_thinking = extract_thinking(result_text)
-
+
# Clean up and normalize the result
final_result = final_result.strip().upper()
-
+
logger.debug(f"Strategy evaluation - raw response: '{result_text}'")
- logger.debug(f"Strategy evaluation - final result after removing thinking: '{final_result}'")
-
+ logger.debug(
+ f"Strategy evaluation - final result after removing thinking: '{final_result}'"
+ )
+
# Check for YES in the final answer (not in thinking blocks)
is_effective = "YES" in final_result
-
+
results[strategy.strategy_id] = is_effective
- logger.info(f"Strategy {strategy.strategy_id} evaluation: {final_result} -> {is_effective}")
-
+ logger.info(
+ f"Strategy {strategy.strategy_id} evaluation: {final_result} -> {is_effective}"
+ )
+
except Exception as e:
logger.error(f"Error evaluating strategy effectiveness: {str(e)}")
# Default to neutral results if evaluation fails
for strategy in selected_strategies:
results[strategy.strategy_id] = False
-
+
return results
-def refine_strategy(strategy: Strategy, problem: str, response: str, thinking: Optional[str], client, model: str) -> Strategy:
+
+def refine_strategy(
+ strategy: Strategy,
+ problem: str,
+ response: str,
+ thinking: Optional[str],
+ client,
+ model: str,
+) -> Strategy:
"""
Refine a strategy based on its application to a specific problem.
-
+
Args:
strategy: The strategy to refine
problem: The problem that was solved
@@ -185,48 +235,49 @@ def refine_strategy(strategy: Strategy, problem: str, response: str, thinking: O
thinking: The LLM's reasoning process (if any)
client: LLM client for making API calls
model: Model identifier
-
+
Returns:
Strategy: The refined strategy
"""
try:
# Include thinking in refinement if available
full_response = thinking + "\n\n" + response if thinking else response
-
+
messages = [
+ {"role": "system", "content": STRATEGY_REFINEMENT_PROMPT},
{
- "role": "system",
- "content": STRATEGY_REFINEMENT_PROMPT
- },
- {
- "role": "user",
+ "role": "user",
"content": (
f"Original strategy for {strategy.problem_type} problems:\n{strategy.strategy_text}\n\n"
f"New problem:\n{problem}\n\n"
f"Solution process (including reasoning):\n{full_response}\n\n"
f"Provide a refined version of the original strategy that incorporates "
f"any insights from this new example."
- )
- }
+ ),
+ },
]
-
+
refine_response = client.chat.completions.create(
model=model,
messages=messages,
temperature=0.5,
- max_tokens=DEFAULT_MAX_TOKENS # Increased token limit for reasoning LLMs
+ max_tokens=DEFAULT_MAX_TOKENS, # Increased token limit for reasoning LLMs
)
-
+
+ if not refine_response.choices:
+ raise ValueError("LLM returned empty response in refine_strategy")
response_text = refine_response.choices[0].message.content
-
+
# Extract refined strategy and thinking
refined_text, refinement_thinking = extract_thinking(response_text)
if not refined_text.strip():
refined_text = response_text # Use full response if extraction failed
-
+
logger.debug(f"Strategy refinement - raw response: '{response_text}'")
- logger.debug(f"Strategy refinement - final text after removing thinking: '{refined_text}'")
-
+ logger.debug(
+ f"Strategy refinement - final text after removing thinking: '{refined_text}'"
+ )
+
# Create a copy of the strategy with the refined text
refined_strategy = Strategy(
strategy_id=strategy.strategy_id,
@@ -240,15 +291,15 @@ def refine_strategy(strategy: Strategy, problem: str, response: str, thinking: O
last_updated=datetime.now().isoformat(),
confidence=strategy.confidence,
tags=strategy.tags,
- reasoning_examples=strategy.reasoning_examples.copy()
+ reasoning_examples=strategy.reasoning_examples.copy(),
)
-
+
# Add the refinement thinking if available
if refinement_thinking:
refined_strategy.add_reasoning_example(refinement_thinking)
-
+
return refined_strategy
-
+
except Exception as e:
logger.error(f"Error refining strategy: {str(e)}")
# Return the original strategy if refinement fails