Skip to content

Commit 444efc7

Browse files
committed
feat: add /newchat command and topic precheck for groups
- Add /newchat local command to create new Topic and send first message - Add topic precheck when bot joins group (is_forum + can_manage_topics) - Send detailed setup instructions if precheck fails - Refactor consumer to handle newchat SQS message format
1 parent 328de52 commit 444efc7

3 files changed

Lines changed: 190 additions & 59 deletions

File tree

agent-sdk-client/config.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ commands = [
88
[local_commands]
99
# Local-only commands handled by the client
1010
help = "Hello World"
11+
newchat = "创建新对话"
1112

1213
[security]
1314
# User IDs allowed to add bot to groups and send private messages.

agent-sdk-client/consumer.py

Lines changed: 70 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -47,57 +47,68 @@ async def process_message(message_data: dict) -> None:
4747
config = Config.from_env()
4848
bot = Bot(config.telegram_token)
4949

50-
# Reconstruct Update object from stored data
51-
update = Update.de_json(message_data['telegram_update'], bot)
52-
message = update.message or update.edited_message
53-
54-
if not message:
55-
logger.warning("Received update with no message or edited_message")
56-
return
57-
58-
cmd = config.get_command(message.text)
59-
if cmd:
60-
if config.is_local_command(cmd):
61-
logger.info(
62-
"Handling local command in consumer (fallback path)",
63-
extra={'chat_id': message.chat_id, 'message_id': message.message_id},
64-
)
65-
try:
66-
await bot.send_message(
67-
chat_id=message.chat_id,
68-
text=config.local_response(cmd),
69-
message_thread_id=message.message_thread_id,
70-
reply_to_message_id=message.message_id,
71-
)
72-
except Exception:
73-
logger.warning("Failed to send local command response", exc_info=True)
50+
is_newchat = message_data.get('is_newchat', False)
51+
52+
if is_newchat:
53+
# newchat 消息直接使用 message_data
54+
chat_id = message_data['chat_id']
55+
thread_id = message_data['thread_id']
56+
user_message = message_data['text']
57+
message_id = message_data.get('message_id')
58+
else:
59+
# 正常消息解析 Update
60+
update = Update.de_json(message_data['telegram_update'], bot)
61+
message = update.message or update.edited_message
62+
63+
if not message:
64+
logger.warning("Received update with no message or edited_message")
7465
return
7566

76-
if not config.is_agent_command(cmd):
77-
# Defensive guard: producer should already block non-agent commands.
78-
logger.info(
79-
"Skipping non-agent command (consumer fallback)",
80-
extra={
81-
'chat_id': message.chat_id,
82-
'message_id': message.message_id,
83-
},
84-
)
85-
try:
86-
await bot.send_message(
87-
chat_id=message.chat_id,
88-
text=config.unknown_command_message(),
89-
message_thread_id=message.message_thread_id,
90-
reply_to_message_id=message.message_id,
67+
chat_id = message.chat_id
68+
thread_id = message.message_thread_id
69+
user_message = message.text
70+
message_id = message.message_id
71+
72+
cmd = config.get_command(user_message)
73+
if cmd:
74+
if config.is_local_command(cmd):
75+
logger.info(
76+
"Handling local command in consumer (fallback path)",
77+
extra={'chat_id': chat_id, 'message_id': message_id},
9178
)
92-
except Exception:
93-
logger.warning("Failed to send local command response", exc_info=True)
94-
return
79+
try:
80+
await bot.send_message(
81+
chat_id=chat_id,
82+
text=config.local_response(cmd),
83+
message_thread_id=thread_id,
84+
reply_to_message_id=message_id,
85+
)
86+
except Exception:
87+
logger.warning("Failed to send local command response", exc_info=True)
88+
return
89+
90+
if not config.is_agent_command(cmd):
91+
# Defensive guard: producer should already block non-agent commands.
92+
logger.info(
93+
"Skipping non-agent command (consumer fallback)",
94+
extra={'chat_id': chat_id, 'message_id': message_id},
95+
)
96+
try:
97+
await bot.send_message(
98+
chat_id=chat_id,
99+
text=config.unknown_command_message(),
100+
message_thread_id=thread_id,
101+
reply_to_message_id=message_id,
102+
)
103+
except Exception:
104+
logger.warning("Failed to send local command response", exc_info=True)
105+
return
95106

96107
# Send typing indicator
97108
await bot.send_chat_action(
98-
chat_id=message.chat_id,
109+
chat_id=chat_id,
99110
action=ChatAction.TYPING,
100-
message_thread_id=message.message_thread_id,
111+
message_thread_id=thread_id,
101112
)
102113

103114
# Initialize result with default error response
@@ -118,31 +129,31 @@ async def process_message(message_data: dict) -> None:
118129
'Content-Type': 'application/json',
119130
},
120131
json={
121-
'user_message': message.text,
122-
'chat_id': str(message.chat_id),
123-
'thread_id': str(message.message_thread_id) if message.message_thread_id else None,
132+
'user_message': user_message,
133+
'chat_id': str(chat_id),
134+
'thread_id': str(thread_id) if thread_id else None,
124135
},
125136
)
126137
response.raise_for_status()
127138
result = response.json()
128139

129140
except httpx.TimeoutException:
130-
logger.warning(f"Agent Server timeout for chat_id={message.chat_id}")
141+
logger.warning(f"Agent Server timeout for chat_id={chat_id}")
131142
await bot.send_message(
132-
chat_id=message.chat_id,
143+
chat_id=chat_id,
133144
text="Request timed out.",
134-
message_thread_id=message.message_thread_id,
145+
message_thread_id=thread_id,
135146
)
136147
raise # Re-raise to trigger SQS retry for transient errors
137148

138149
except Exception as e:
139-
logger.exception(f"Agent Server error for chat_id={message.chat_id}")
150+
logger.exception(f"Agent Server error for chat_id={chat_id}")
140151
error_text = f"Error: {str(e)[:200]}"
141152
try:
142153
await bot.send_message(
143-
chat_id=message.chat_id,
154+
chat_id=chat_id,
144155
text=error_text,
145-
message_thread_id=message.message_thread_id,
156+
message_thread_id=thread_id,
146157
)
147158
except Exception as send_error:
148159
logger.error(f"Failed to send error message to Telegram: {send_error}")
@@ -160,21 +171,21 @@ async def process_message(message_data: dict) -> None:
160171
# Send response to Telegram
161172
try:
162173
await bot.send_message(
163-
chat_id=message.chat_id,
174+
chat_id=chat_id,
164175
text=text,
165176
parse_mode=ParseMode.MARKDOWN_V2,
166-
message_thread_id=message.message_thread_id,
167-
reply_to_message_id=message.message_id,
177+
message_thread_id=thread_id,
178+
reply_to_message_id=message_id,
168179
)
169180
except BadRequest as e:
170181
if "parse entities" in str(e).lower():
171182
safe_text = escape_markdown(text, version=2)
172183
await bot.send_message(
173-
chat_id=message.chat_id,
184+
chat_id=chat_id,
174185
text=safe_text,
175186
parse_mode=ParseMode.MARKDOWN_V2,
176-
message_thread_id=message.message_thread_id,
177-
reply_to_message_id=message.message_id,
187+
message_thread_id=thread_id,
188+
reply_to_message_id=message_id,
178189
)
179190
else:
180191
raise

agent-sdk-client/handler.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,91 @@ def _handle_local_command(bot: Bot, message, config: Config, cmd: str) -> bool:
149149
return True
150150

151151

152+
async def _check_forum_requirements(bot: Bot, chat_id: int) -> tuple[bool, str]:
153+
"""检查群组 Topic 功能要求。
154+
155+
Returns:
156+
(is_ok, error_message) - 如果满足要求返回 (True, ""),否则返回 (False, 错误提示)
157+
"""
158+
try:
159+
chat = await bot.get_chat(chat_id)
160+
if not chat.is_forum:
161+
return False, (
162+
"⚠️ 群组未开启 Topics 功能\n\n"
163+
"请按以下步骤开启:\n"
164+
"1. 打开群组设置\n"
165+
"2. 点击「Topics」\n"
166+
"3. 开启 Topics 功能\n"
167+
"4. 重新添加 Bot"
168+
)
169+
170+
me = await bot.get_me()
171+
member = await bot.get_chat_member(chat_id, me.id)
172+
if not getattr(member, 'can_manage_topics', False):
173+
return False, (
174+
"⚠️ Bot 缺少「管理 Topics」权限\n\n"
175+
"请按以下步骤授权:\n"
176+
"1. 打开群组设置 > 管理员\n"
177+
"2. 选择此 Bot\n"
178+
"3. 开启「Manage Topics」权限"
179+
)
180+
return True, ""
181+
except Exception as e:
182+
logger.warning(f"Failed to check forum requirements: {e}")
183+
return False, f"检查权限失败: {str(e)[:100]}"
184+
185+
186+
async def _handle_newchat_command(
187+
bot: Bot, message, config: Config, sqs, prompts: str
188+
) -> bool:
189+
"""处理 /newchat - 创建 Topic 并发送消息到 SQS。
190+
191+
Args:
192+
bot: Telegram Bot 实例
193+
message: Telegram Message 对象
194+
config: 配置对象
195+
sqs: SQS 客户端
196+
prompts: 用户输入的消息内容
197+
198+
Returns:
199+
True 如果成功,False 如果失败
200+
"""
201+
from datetime import datetime
202+
203+
chat_id = message.chat_id
204+
topic_name = f"Chat {datetime.now().strftime('%m/%d %H:%M')}"
205+
206+
try:
207+
forum_topic = await bot.create_forum_topic(chat_id=chat_id, name=topic_name)
208+
new_thread_id = forum_topic.message_thread_id
209+
210+
message_body = {
211+
'chat_id': chat_id,
212+
'message_id': message.message_id,
213+
'text': prompts,
214+
'thread_id': new_thread_id,
215+
'is_newchat': True,
216+
}
217+
218+
success = _send_to_sqs_safe(sqs, config.queue_url, message_body)
219+
if not success:
220+
await bot.send_message(
221+
chat_id=chat_id,
222+
text="发送消息失败,请重试",
223+
message_thread_id=new_thread_id,
224+
)
225+
return success
226+
227+
except Exception as e:
228+
logger.warning(f"Failed to create forum topic: {e}")
229+
await bot.send_message(
230+
chat_id=chat_id,
231+
text=f"创建 Topic 失败: {str(e)[:100]}",
232+
message_thread_id=message.message_thread_id,
233+
)
234+
return False
235+
236+
152237
def lambda_handler(event: dict, context: Any) -> dict:
153238
"""Lambda entry point - Producer.
154239
@@ -182,6 +267,23 @@ def lambda_handler(event: dict, context: Any) -> dict:
182267
extra={'chat_id': chat_id, 'inviter_id': inviter_id},
183268
)
184269
_send_metric('SecurityBlock.UnauthorizedGroup')
270+
else:
271+
# 授权群组的 Topic 预检
272+
member_update = update.my_chat_member
273+
old_status = member_update.old_chat_member.status
274+
new_status = member_update.new_chat_member.status
275+
276+
if old_status in ('left', 'kicked') and new_status in (
277+
'member',
278+
'administrator',
279+
):
280+
chat_id = member_update.chat.id
281+
is_ok, error_msg = asyncio.run(
282+
_check_forum_requirements(bot, chat_id)
283+
)
284+
if not is_ok:
285+
asyncio.run(bot.send_message(chat_id=chat_id, text=error_msg))
286+
_send_metric('TopicPrecheck.Failed')
185287
return {'statusCode': 200}
186288

187289
message = update.message or update.edited_message
@@ -201,6 +303,23 @@ def lambda_handler(event: dict, context: Any) -> dict:
201303
return {'statusCode': 200}
202304

203305
cmd = config.get_command(message.text)
306+
307+
# /newchat 特殊处理 - 创建 Topic 后发 SQS
308+
if cmd == '/newchat':
309+
prompts = message.text[len('/newchat') :].strip()
310+
if not prompts:
311+
bot.send_message(
312+
chat_id=message.chat_id,
313+
text="用法: /newchat <消息内容>",
314+
message_thread_id=message.message_thread_id,
315+
)
316+
return {'statusCode': 200}
317+
318+
sqs = _get_sqs_client()
319+
asyncio.run(_handle_newchat_command(bot, message, config, sqs, prompts))
320+
return {'statusCode': 200}
321+
322+
# 其他 local command 正常处理
204323
if cmd and config.is_local_command(cmd):
205324
_handle_local_command(bot, message, config, cmd)
206325
return {'statusCode': 200}

0 commit comments

Comments
 (0)