From 7ec56e01c46a09d5e4d1fb7ca632c877d80b4a28 Mon Sep 17 00:00:00 2001 From: yibie Date: Sat, 6 Jun 2026 16:00:46 +0800 Subject: [PATCH 1/4] fix: ContextVar cross-Context token error in ForkTapeStore.fork() Replace ContextVar.reset(token) with save/restore pattern to avoid 'Token was created in a different Context' ValueError when the async generator cleanup runs in a different asyncio Task (e.g. cancellation, TaskGroup exception propagation). --- src/bub/builtin/store.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/bub/builtin/store.py b/src/bub/builtin/store.py index 714cfc88..b30dd751 100644 --- a/src/bub/builtin/store.py +++ b/src/bub/builtin/store.py @@ -101,16 +101,22 @@ async def append(self, tape: str, entry: TapeEntry) -> None: @contextlib.asynccontextmanager async def fork(self, tape: str, merge_back: bool = True) -> AsyncGenerator[None, None]: store = InMemoryTapeStore() - token = current_store.set(store) - tape_token = current_fork_tape.set(tape) - reset_token = current_tape_was_reset.set(False) + # Save/restore instead of ContextVar.reset(token) to avoid + # "Token was created in a different Context" when cleanup + # runs in a different asyncio Task (e.g. cancellation, TaskGroup). + prev_store = current_store.get(_empty_store) + prev_fork_tape = current_fork_tape.get() + prev_was_reset = current_tape_was_reset.get() + current_store.set(store) + current_fork_tape.set(tape) + current_tape_was_reset.set(False) try: yield finally: was_reset = current_tape_was_reset.get() - current_store.reset(token) - current_fork_tape.reset(tape_token) - current_tape_was_reset.reset(reset_token) + current_store.set(prev_store) + current_fork_tape.set(prev_fork_tape) + current_tape_was_reset.set(prev_was_reset) if merge_back: if was_reset: await self._parent.reset(tape) From a619a9fa3839339683d3dedf60c6d751577ac798 Mon Sep 17 00:00:00 2001 From: yibie Date: Tue, 9 Jun 2026 07:40:51 +0800 Subject: [PATCH 2/4] feat: support multiple Telegram bots in a single gateway instance - Add BotConfig pydantic model for per-bot configuration - Add TelegramSettings.bots field (JSON array) for multi-bot mode - TelegramSettings.bot_configs() returns list of BotConfig - Backward compatible: BUB_TELEGRAM_TOKEN still works for single bot - TelegramChannel name is now dynamic: telegram-{name} or 'telegram' - provide_channels creates one TelegramChannel per BotConfig --- src/bub/builtin/hook_impl.py | 11 +++-- src/bub/channels/telegram.py | 87 +++++++++++++++++++++++++++------ tests/test_builtin_hook_impl.py | 7 ++- tests/test_channels.py | 12 ++--- tests/test_image_message.py | 6 +-- 5 files changed, 93 insertions(+), 30 deletions(-) diff --git a/src/bub/builtin/hook_impl.py b/src/bub/builtin/hook_impl.py index 63f3a74a..bff006fd 100644 --- a/src/bub/builtin/hook_impl.py +++ b/src/bub/builtin/hook_impl.py @@ -251,12 +251,13 @@ def system_prompt(self, prompt: str | list[dict], state: State) -> str: @hookimpl def provide_channels(self, message_handler: MessageHandler) -> list[Channel]: from bub.channels.cli import CliChannel - from bub.channels.telegram import TelegramChannel + from bub.channels.telegram import TelegramChannel, TelegramSettings - return [ - TelegramChannel(on_receive=message_handler), - CliChannel(on_receive=message_handler, agent=self._get_agent()), - ] + channels: list[Channel] = [] + for bot_config in TelegramSettings.bot_configs(): + channels.append(TelegramChannel(on_receive=message_handler, bot_config=bot_config)) + channels.append(CliChannel(on_receive=message_handler, agent=self._get_agent())) + return channels @hookimpl async def on_error(self, stage: str, error: Exception, message: Envelope | None) -> None: diff --git a/src/bub/channels/telegram.py b/src/bub/channels/telegram.py index c8f5d029..9aa6046f 100644 --- a/src/bub/channels/telegram.py +++ b/src/bub/channels/telegram.py @@ -7,7 +7,7 @@ from typing import Any, ClassVar from loguru import logger -from pydantic import Field +from pydantic import BaseModel, Field from pydantic_settings import SettingsConfigDict from telegram import Bot, Message, Update from telegram.ext import Application, CommandHandler, ContextTypes, filters @@ -22,22 +22,74 @@ from bub.utils import exclude_none +class BotConfig(BaseModel): + """Configuration for a single Telegram bot instance.""" + + name: str = Field(default="", description="Unique bot name used as channel name suffix.") + token: str = Field(..., description="Telegram bot token.") + allow_users: str | None = Field(default=None, description="Comma-separated allowed user IDs.") + allow_chats: str | None = Field(default=None, description="Comma-separated allowed chat IDs.") + proxy: str | None = Field( + default=None, + description="Optional proxy URL for connecting to Telegram API.", + ) + + @config(name="telegram") class TelegramSettings(Settings): model_config = SettingsConfigDict(env_prefix="BUB_TELEGRAM_", extra="ignore", env_file=".env") - token: str = Field(default="", description="Telegram bot token.") + token: str = Field(default="", description="Telegram bot token (backward compat, single-bot mode).") + bots: str = Field( + default="", + description="JSON array of bot configs for multi-bot mode, e.g. '[{\"name\":\"personal\",\"token\":\"xxx\"}]'.", + ) allow_users: str | None = Field( - default=None, description="Comma-separated list of allowed Telegram user IDs, or empty for no restriction." + default=None, description="Comma-separated list of allowed Telegram user IDs (single-bot mode)." ) allow_chats: str | None = Field( - default=None, description="Comma-separated list of allowed Telegram chat IDs, or empty for no restriction." + default=None, description="Comma-separated list of allowed Telegram chat IDs (single-bot mode)." ) proxy: str | None = Field( default=None, - description="Optional proxy URL for connecting to Telegram API, e.g. 'http://user:pass@host:port' or 'socks5://host:port'.", + description="Optional proxy URL for connecting to Telegram API (single-bot mode).", ) + @staticmethod + def bot_configs() -> list[BotConfig]: + """Return the list of bot configurations, supporting both single and multi-bot modes.""" + settings = ensure_config(TelegramSettings) + if settings.bots: + try: + import json as _json + raw = _json.loads(settings.bots) + if not isinstance(raw, list): + logger.warning("telegram settings: BUB_TELEGRAM_BOTS is not a JSON array, falling back to single-bot") + return TelegramSettings._single_bot_config(settings) + configs = [BotConfig(**item) for item in raw] + if not configs: + return [] + return configs + except Exception as exc: + logger.warning("telegram settings: failed to parse BUB_TELEGRAM_BOTS: %s", exc) + return TelegramSettings._single_bot_config(settings) + return TelegramSettings._single_bot_config(settings) + + @staticmethod + def _single_bot_config(settings: TelegramSettings) -> list[BotConfig]: + """Build a single BotConfig from the legacy single-bot settings.""" + if settings.token: + return [ + BotConfig( + name="", + token=settings.token, + allow_users=settings.allow_users, + allow_chats=settings.allow_chats, + proxy=settings.proxy, + ) + ] + return [] + NO_ACCESS_MESSAGE = "You are not allowed to chat with me. Please deploy your own instance of Bub." @@ -146,35 +198,39 @@ def _extract_media_items(metadata: dict[str, Any]) -> list[MediaItem]: class TelegramChannel(Channel): - name = "telegram" _app: Application - def __init__(self, on_receive: MessageHandler) -> None: + def __init__(self, on_receive: MessageHandler, bot_config: BotConfig) -> None: self._on_receive = on_receive - self._settings = ensure_config(TelegramSettings) - self._allow_users = {uid.strip() for uid in (self._settings.allow_users or "").split(",") if uid.strip()} - self._allow_chats = {cid.strip() for cid in (self._settings.allow_chats or "").split(",") if cid.strip()} + self._config = bot_config + self._allow_users = {uid.strip() for uid in (self._config.allow_users or "").split(",") if uid.strip()} + self._allow_chats = {cid.strip() for cid in (self._config.allow_chats or "").split(",") if cid.strip()} self._parser = TelegramMessageParser(bot_getter=lambda: self._app.bot) self._typing_tasks: dict[str, asyncio.Task] = {} + @property + def name(self) -> str: + return f"telegram-{self._config.name}" if self._config.name else "telegram" + @property def enabled(self) -> bool: - return bool(self._settings.token) + return bool(self._config.token) @property def needs_debounce(self) -> bool: return True async def start(self, stop_event: asyncio.Event) -> None: - proxy = self._settings.proxy + proxy = self._config.proxy logger.info( - "telegram.start allow_users_count={} allow_chats_count={} proxy_enabled={}", + "telegram.start channel={} allow_users_count={} allow_chats_count={} proxy_enabled={}", + self.name, len(self._allow_users), len(self._allow_chats), bool(proxy), ) get_updates_request = HTTPXRequest(read_timeout=30, proxy=proxy) - builder = Application.builder().token(self._settings.token).get_updates_request(get_updates_request) + builder = Application.builder().token(self._config.token).get_updates_request(get_updates_request) if proxy: builder = builder.proxy(proxy) self._app = builder.build() @@ -187,6 +243,7 @@ async def start(self, stop_event: asyncio.Event) -> None: if updater is None: return await updater.start_polling(drop_pending_updates=True, allowed_updates=["message"]) + logger.info("telegram.start polling channel={}", self.name) logger.info("telegram.start polling") async def stop(self) -> None: @@ -201,7 +258,7 @@ async def stop(self) -> None: with contextlib.suppress(asyncio.CancelledError): await task self._typing_tasks.clear() - logger.info("telegram.stopped") + logger.info("telegram.stopped channel={}", self.name) async def send(self, message: ChannelMessage) -> None: chat_id = message.chat_id diff --git a/tests/test_builtin_hook_impl.py b/tests/test_builtin_hook_impl.py index ca10d3a6..7e51edac 100644 --- a/tests/test_builtin_hook_impl.py +++ b/tests/test_builtin_hook_impl.py @@ -206,7 +206,7 @@ def __init__(self, on_receive, agent) -> None: class DummyTelegramChannel: name = "telegram" - def __init__(self, on_receive) -> None: + def __init__(self, on_receive, bot_config=None) -> None: self.on_receive = on_receive @property @@ -218,6 +218,11 @@ def enabled(self) -> bool: monkeypatch.setattr(bub.channels.cli, "CliChannel", DummyCliChannel) monkeypatch.setattr(bub.channels.telegram, "TelegramChannel", DummyTelegramChannel) + monkeypatch.setattr( + bub.channels.telegram.TelegramSettings, + "bot_configs", + staticmethod(lambda: [bub.channels.telegram.BotConfig(token="test")]), + ) def message_handler(message) -> None: return None diff --git a/tests/test_channels.py b/tests/test_channels.py index 3235f967..7c0156db 100644 --- a/tests/test_channels.py +++ b/tests/test_channels.py @@ -15,7 +15,7 @@ from bub.channels.handler import BufferedMessageHandler from bub.channels.manager import ChannelManager from bub.channels.message import ChannelMessage -from bub.channels.telegram import BubMessageFilter, TelegramChannel, TelegramMessageParser +from bub.channels.telegram import BubMessageFilter, BotConfig, TelegramChannel, TelegramMessageParser from bub.turn_admission import AdmitDecision, SessionTurnController, SteeringBuffer @@ -283,7 +283,7 @@ def test_channel_manager_selects_channels_by_runtime_role( def test_channel_manager_selects_real_channel_types(load_config) -> None: _load_channel_config(load_config, telegram_value="test-token") cli = CliChannel.__new__(CliChannel) - telegram = TelegramChannel(lambda message: None) + telegram = TelegramChannel(lambda message: None, bot_config=BotConfig(token="test_token")) manager = ChannelManager( FakeFramework({"cli": cli, "telegram": telegram}), enabled_channels=["all"], @@ -752,7 +752,7 @@ def test_bub_message_filter_accepts_group_mention() -> None: @pytest.mark.asyncio async def test_telegram_channel_send_extracts_json_message_and_skips_blank(load_config) -> None: _load_channel_config(load_config, telegram_value="test-token") - channel = TelegramChannel(lambda message: None) + channel = TelegramChannel(lambda message: None, bot_config=BotConfig(token="test_token")) sent: list[tuple[str, str]] = [] async def send_message(chat_id: str, text: str) -> None: @@ -774,7 +774,7 @@ async def test_telegram_channel_start_with_proxy_does_not_call_get_updates_proxy fake_builder = _FakeTelegramBuilder() monkeypatch.setattr("bub.channels.telegram.Application.builder", lambda: fake_builder) - channel = TelegramChannel(lambda message: None) + channel = TelegramChannel(lambda message: None, bot_config=BotConfig(token="test_token", proxy="http://127.0.0.1:1087")) await channel.start(asyncio.Event()) assert fake_builder.proxy_value == "http://127.0.0.1:1087" @@ -785,7 +785,7 @@ async def test_telegram_channel_start_with_proxy_does_not_call_get_updates_proxy @pytest.mark.asyncio async def test_telegram_channel_build_message_returns_command_directly(load_config) -> None: _load_channel_config(load_config, telegram_value="test-token") - channel = TelegramChannel(lambda message: None) + channel = TelegramChannel(lambda message: None, bot_config=BotConfig(token="test_token")) channel._parser = SimpleNamespace(parse=_async_return((",help", {"type": "text"})), get_reply=_async_return(None)) message = SimpleNamespace(chat_id=42) @@ -803,7 +803,7 @@ async def test_telegram_channel_build_message_wraps_payload_and_disables_outboun monkeypatch: pytest.MonkeyPatch, load_config ) -> None: _load_channel_config(load_config, telegram_value="test-token") - channel = TelegramChannel(lambda message: None) + channel = TelegramChannel(lambda message: None, bot_config=BotConfig(token="test_token")) parser = SimpleNamespace( parse=_async_return(("hello", {"type": "text", "sender_id": "7"})), get_reply=_async_return({"message": "prev", "type": "text"}), diff --git a/tests/test_image_message.py b/tests/test_image_message.py index 132e15b6..7c92f1ae 100644 --- a/tests/test_image_message.py +++ b/tests/test_image_message.py @@ -10,7 +10,7 @@ from bub.builtin.hook_impl import BuiltinImpl from bub.channels.message import ChannelMessage, MediaItem -from bub.channels.telegram import TelegramChannel, _extract_media_items +from bub.channels.telegram import BotConfig, TelegramChannel, _extract_media_items from bub.framework import BubFramework # --------------------------------------------------------------------------- @@ -186,7 +186,7 @@ async def _receive_message(_message) -> None: @pytest.mark.asyncio async def test_telegram_build_message_extracts_media_items(monkeypatch: pytest.MonkeyPatch, load_config) -> None: load_config("telegram:\n token: test-token") - channel = TelegramChannel(_receive_message) + channel = TelegramChannel(_receive_message, bot_config=BotConfig(token="test_token")) photo_metadata = { "type": "photo", "sender_id": "7", @@ -213,7 +213,7 @@ async def test_telegram_build_message_extracts_media_items(monkeypatch: pytest.M @pytest.mark.asyncio async def test_telegram_build_message_no_media_for_text(monkeypatch: pytest.MonkeyPatch, load_config) -> None: load_config("telegram:\n token: test-token") - channel = TelegramChannel(_receive_message) + channel = TelegramChannel(_receive_message, bot_config=BotConfig(token="test_token")) channel._parser = SimpleNamespace( # type: ignore[assignment] parse=_async_return(("hello", {"type": "text", "sender_id": "7"})), get_reply=_async_return(None), From 4c486212f0e8cc70609a52917b8a134c302ad745 Mon Sep 17 00:00:00 2001 From: yibie Date: Wed, 10 Jun 2026 12:16:38 +0800 Subject: [PATCH 3/4] fix: resolve ruff S106 and mypy ClassVar override for dynamic channel name --- pyproject.toml | 2 +- src/bub/channels/telegram.py | 7 ++++--- tests/test_channels.py | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index cc05424f..0ecd69cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -153,7 +153,7 @@ ignore = [ ] [tool.ruff.lint.per-file-ignores] -"tests/*" = ["S101"] +"tests/*" = ["S101", "S106"] [tool.ruff.format] preview = true diff --git a/src/bub/channels/telegram.py b/src/bub/channels/telegram.py index 9aa6046f..48b58b83 100644 --- a/src/bub/channels/telegram.py +++ b/src/bub/channels/telegram.py @@ -69,7 +69,7 @@ def bot_configs() -> list[BotConfig]: configs = [BotConfig(**item) for item in raw] if not configs: return [] - return configs + return configs # noqa: TRY300 except Exception as exc: logger.warning("telegram settings: failed to parse BUB_TELEGRAM_BOTS: %s", exc) return TelegramSettings._single_bot_config(settings) @@ -203,14 +203,15 @@ class TelegramChannel(Channel): def __init__(self, on_receive: MessageHandler, bot_config: BotConfig) -> None: self._on_receive = on_receive self._config = bot_config + self._channel_name = f"telegram-{self._config.name}" if self._config.name else "telegram" self._allow_users = {uid.strip() for uid in (self._config.allow_users or "").split(",") if uid.strip()} self._allow_chats = {cid.strip() for cid in (self._config.allow_chats or "").split(",") if cid.strip()} self._parser = TelegramMessageParser(bot_getter=lambda: self._app.bot) self._typing_tasks: dict[str, asyncio.Task] = {} @property - def name(self) -> str: - return f"telegram-{self._config.name}" if self._config.name else "telegram" + def name(self) -> str: # type: ignore[override] + return self._channel_name @property def enabled(self) -> bool: diff --git a/tests/test_channels.py b/tests/test_channels.py index 7c0156db..c1dc6f7c 100644 --- a/tests/test_channels.py +++ b/tests/test_channels.py @@ -15,7 +15,7 @@ from bub.channels.handler import BufferedMessageHandler from bub.channels.manager import ChannelManager from bub.channels.message import ChannelMessage -from bub.channels.telegram import BubMessageFilter, BotConfig, TelegramChannel, TelegramMessageParser +from bub.channels.telegram import BotConfig, BubMessageFilter, TelegramChannel, TelegramMessageParser from bub.turn_admission import AdmitDecision, SessionTurnController, SteeringBuffer From 618498171811acf77a402b1f7c1c3cde98e88ed0 Mon Sep 17 00:00:00 2001 From: yibie Date: Wed, 10 Jun 2026 12:22:43 +0800 Subject: [PATCH 4/4] style: apply ruff format fixes --- src/bub/channels/telegram.py | 7 +++++-- tests/test_builtin_cli.py | 4 ++-- tests/test_channels.py | 4 +++- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/bub/channels/telegram.py b/src/bub/channels/telegram.py index 48b58b83..5314f4a4 100644 --- a/src/bub/channels/telegram.py +++ b/src/bub/channels/telegram.py @@ -42,7 +42,7 @@ class TelegramSettings(Settings): token: str = Field(default="", description="Telegram bot token (backward compat, single-bot mode).") bots: str = Field( default="", - description="JSON array of bot configs for multi-bot mode, e.g. '[{\"name\":\"personal\",\"token\":\"xxx\"}]'.", + description='JSON array of bot configs for multi-bot mode, e.g. \'[{"name":"personal","token":"xxx"}]\'.', ) allow_users: str | None = Field( default=None, description="Comma-separated list of allowed Telegram user IDs (single-bot mode)." @@ -62,9 +62,12 @@ def bot_configs() -> list[BotConfig]: if settings.bots: try: import json as _json + raw = _json.loads(settings.bots) if not isinstance(raw, list): - logger.warning("telegram settings: BUB_TELEGRAM_BOTS is not a JSON array, falling back to single-bot") + logger.warning( + "telegram settings: BUB_TELEGRAM_BOTS is not a JSON array, falling back to single-bot" + ) return TelegramSettings._single_bot_config(settings) configs = [BotConfig(**item) for item in raw] if not configs: diff --git a/tests/test_builtin_cli.py b/tests/test_builtin_cli.py index f345c2b4..7f5eb56b 100644 --- a/tests/test_builtin_cli.py +++ b/tests/test_builtin_cli.py @@ -372,8 +372,8 @@ def fake_login_openai_codex_oauth(**kwargs: object) -> auth.OpenAICodexOAuthToke callback = prompt_for_redirect("https://auth.openai.com/authorize") assert callback == "http://localhost:1455/auth/callback?code=test" return auth.OpenAICodexOAuthTokens( - access_token="access", # noqa: S106 - refresh_token="refresh", # noqa: S106 + access_token="access", + refresh_token="refresh", expires_at=123, account_id="acct_123", ) diff --git a/tests/test_channels.py b/tests/test_channels.py index c1dc6f7c..120acdcb 100644 --- a/tests/test_channels.py +++ b/tests/test_channels.py @@ -774,7 +774,9 @@ async def test_telegram_channel_start_with_proxy_does_not_call_get_updates_proxy fake_builder = _FakeTelegramBuilder() monkeypatch.setattr("bub.channels.telegram.Application.builder", lambda: fake_builder) - channel = TelegramChannel(lambda message: None, bot_config=BotConfig(token="test_token", proxy="http://127.0.0.1:1087")) + channel = TelegramChannel( + lambda message: None, bot_config=BotConfig(token="test_token", proxy="http://127.0.0.1:1087") + ) await channel.start(asyncio.Event()) assert fake_builder.proxy_value == "http://127.0.0.1:1087"