Skip to content

Commit e6e64cb

Browse files
committed
fix: 增强异常处理和Issue#12修复
- consumer.py: * 添加lambda_handler的JSON解析异常处理 * 包装asyncio.run的异常捕获和重新抛出 * 初始化result默认值防止未定义错误 * 添加message为None的检查 * 增强异常处理的日志记录 * 修复Issue#12:使用 or 运算符处理空response - handler.py: * 包装asyncio.run的异常处理,返回结构化错误响应 * 添加session操作的异常处理 * 关键操作失败返回500,非关键操作log后继续 * 完整的日志记录便于调试 确保系统在异常情况下不会导致SQS消息卡死,并正确处理Agent返回空响应的场景。
1 parent d18e3bd commit e6e64cb

2 files changed

Lines changed: 89 additions & 20 deletions

File tree

agent-sdk-client/consumer.py

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,58 @@
1818
def lambda_handler(event: dict, context: Any) -> dict:
1919
"""SQS Consumer Lambda entry point."""
2020
for record in event['Records']:
21-
message_data = json.loads(record['body'])
22-
asyncio.run(process_message(message_data))
21+
try:
22+
message_data = json.loads(record['body'])
23+
except json.JSONDecodeError as e:
24+
# Invalid message format - log and skip
25+
import logging
26+
logger = logging.getLogger()
27+
logger.error(f"Failed to parse SQS message: {e}")
28+
continue
29+
30+
try:
31+
asyncio.run(process_message(message_data))
32+
except Exception as e:
33+
# Log and let SQS retry on failure
34+
import logging
35+
logger = logging.getLogger()
36+
logger.exception(f"Failed to process message: {e}")
37+
raise # Re-raise to fail the batch item
2338

2439
return {'statusCode': 200}
2540

2641

2742
async def process_message(message_data: dict) -> None:
2843
"""Process single message from SQS queue."""
44+
import logging
45+
logger = logging.getLogger()
46+
2947
config = Config.from_env()
3048
bot = Bot(config.telegram_token)
3149

3250
# Reconstruct Update object from stored data
3351
update = Update.de_json(message_data['telegram_update'], bot)
3452
message = update.message or update.edited_message
3553

54+
if not message:
55+
logger.warning("Received update with no message or edited_message")
56+
return
57+
3658
# Send typing indicator
3759
await bot.send_chat_action(
3860
chat_id=message.chat_id,
3961
action=ChatAction.TYPING,
4062
message_thread_id=message.message_thread_id,
4163
)
4264

65+
# Initialize result with default error response
66+
# This ensures result is always defined, even if Agent Server call fails
67+
result = {
68+
'response': '',
69+
'is_error': True,
70+
'error_message': 'Failed to get response from Agent Server'
71+
}
72+
4373
# Call Agent Server
4474
try:
4575
async with httpx.AsyncClient(timeout=600.0) as client:
@@ -59,6 +89,7 @@ async def process_message(message_data: dict) -> None:
5989
result = response.json()
6090

6191
except httpx.TimeoutException:
92+
logger.warning(f"Agent Server timeout for chat_id={message.chat_id}")
6293
await bot.send_message(
6394
chat_id=message.chat_id,
6495
text="Request timed out.",
@@ -67,19 +98,23 @@ async def process_message(message_data: dict) -> None:
6798
raise # Re-raise to trigger SQS retry for transient errors
6899

69100
except Exception as e:
70-
await bot.send_message(
71-
chat_id=message.chat_id,
72-
text=f"Error: {str(e)[:200]}",
73-
message_thread_id=message.message_thread_id,
74-
)
75-
# Don't re-raise for general exceptions - error message already sent
76-
# to user, retrying would cause duplicate messages
101+
logger.exception(f"Agent Server error for chat_id={message.chat_id}")
102+
error_text = f"Error: {str(e)[:200]}"
103+
try:
104+
await bot.send_message(
105+
chat_id=message.chat_id,
106+
text=error_text,
107+
message_thread_id=message.message_thread_id,
108+
)
109+
except Exception as send_error:
110+
logger.error(f"Failed to send error message to Telegram: {send_error}")
111+
# Don't re-raise - error message already sent to user, retrying would cause duplicate messages
77112

78-
# Format response
113+
# Format response (result is guaranteed to be defined now)
79114
if result.get('is_error'):
80115
text = f"Agent error: {result.get('error_message', 'Unknown')}"
81116
else:
82-
text = result.get('response', 'No response')
117+
text = result.get('response') or 'No response'
83118

84119
if len(text) > 4000:
85120
text = text[:4000] + "\n\n... (truncated)"

agent-sdk-server/handler.py

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@
44
"""
55
import asyncio
66
import json
7+
import logging
78
from typing import Any
89

910
from config import Config
1011
from session_store import SessionStore
1112
from agent_session import process_message
1213

14+
logger = logging.getLogger()
15+
logger.setLevel(logging.INFO)
16+
1317

1418
def lambda_handler(event: dict, context: Any) -> dict:
1519
"""Lambda entry point.
@@ -76,26 +80,56 @@ def lambda_handler(event: dict, context: Any) -> dict:
7680

7781
# Download session files if resuming
7882
if session_id:
79-
store.download_session_files(session_id)
83+
try:
84+
store.download_session_files(session_id)
85+
except Exception as e:
86+
logger.error(f"Failed to download session files for session_id={session_id}: {e}")
87+
return {
88+
'statusCode': 500,
89+
'body': json.dumps({'error': 'Failed to download session'})
90+
}
8091

8192
# Process message with Agent SDK
82-
result = asyncio.run(process_message(
83-
user_message=user_message,
84-
session_id=session_id,
85-
model=model,
86-
))
93+
try:
94+
result = asyncio.run(process_message(
95+
user_message=user_message,
96+
session_id=session_id,
97+
model=model,
98+
))
99+
except Exception as e:
100+
logger.exception(f"Agent SDK processing failed for chat_id={chat_id}")
101+
return {
102+
'statusCode': 500,
103+
'headers': {'Content-Type': 'application/json'},
104+
'body': json.dumps({
105+
'response': '',
106+
'session_id': session_id,
107+
'cost_usd': 0.0,
108+
'num_turns': 0,
109+
'is_error': True,
110+
'error_message': f'Agent processing error: {str(e)[:200]}'
111+
})
112+
}
87113

88114
# Get session_id from result (SDK generates it for new sessions)
89115
result_session_id = result.get('session_id', '')
90116

91117
# Save session mapping if new session
92118
if result_session_id and result_session_id != session_id:
93-
store.save_session_id(chat_id, thread_id, result_session_id)
119+
try:
120+
store.save_session_id(chat_id, thread_id, result_session_id)
121+
except Exception as e:
122+
logger.error(f"Failed to save session mapping for chat_id={chat_id}: {e}")
123+
# Continue anyway - session can still work with S3 files
94124

95125
# Upload session files
96126
if result_session_id:
97-
store.upload_session_files(result_session_id)
98-
store.update_session_timestamp(chat_id, thread_id)
127+
try:
128+
store.upload_session_files(result_session_id)
129+
store.update_session_timestamp(chat_id, thread_id)
130+
except Exception as e:
131+
logger.error(f"Failed to upload session files for session_id={result_session_id}: {e}")
132+
# Continue anyway - result is already available to return
99133

100134
return {
101135
'statusCode': 200,

0 commit comments

Comments
 (0)