-
Notifications
You must be signed in to change notification settings - Fork 49
bug: replace cron job restarting agentex pod for oidc refresh with mongoclient refresh #338
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,6 +49,8 @@ def __init__(self): | |
| self.redis_pool: redis.ConnectionPool | None = None | ||
| self.database_async_read_only_engine: AsyncEngine | None = None | ||
| self.postgres_metrics_collector: PostgresMetricsCollector | None = None | ||
| self._mongodb_refresh_task: asyncio.Task | None = None | ||
| self._mongodb_close_tasks: set[asyncio.Task] = set() | ||
| self._loaded = False | ||
|
|
||
| async def create_temporal_client(self): | ||
|
|
@@ -122,17 +124,7 @@ async def load(self): | |
|
|
||
| logger.info("Connecting to MongoDB") | ||
|
|
||
| self.mongodb_client = AsyncMongoClient( | ||
| mongodb_uri, | ||
| serverSelectionTimeoutMS=20000, | ||
| connectTimeoutMS=20000, | ||
| socketTimeoutMS=20000, | ||
| retryWrites=False, # Disable retryable writes for AWS DocumentDB compatibility | ||
| maxPoolSize=self.environment_variables.MONGODB_MAX_POOL_SIZE, | ||
| minPoolSize=self.environment_variables.MONGODB_MIN_POOL_SIZE, | ||
| maxIdleTimeMS=30000, # Close connections after 30 seconds of inactivity | ||
| waitQueueTimeoutMS=5000, # Wait up to 5 seconds for a connection from pool | ||
| ) | ||
| self.mongodb_client = self._build_mongodb_client(mongodb_uri) | ||
| self.mongodb_database = self.mongodb_client[mongodb_database_name] | ||
|
|
||
| # Ping the database to verify connection | ||
|
|
@@ -226,10 +218,126 @@ async def load(self): | |
| service_name=service_name, | ||
| ) | ||
|
|
||
| self._start_mongodb_oidc_refresh_loop() | ||
|
|
||
| self._loaded = True | ||
|
|
||
| def _build_mongodb_client(self, mongodb_uri: str) -> AsyncMongoClient: | ||
| """Construct an AsyncMongoClient with the shared pool/timeout settings. | ||
|
|
||
| Used both at startup and by the OIDC refresh, so the two paths can never | ||
| drift apart. | ||
| """ | ||
| return AsyncMongoClient( | ||
| mongodb_uri, | ||
| serverSelectionTimeoutMS=20000, | ||
| connectTimeoutMS=20000, | ||
| socketTimeoutMS=20000, | ||
| retryWrites=False, # Disable retryable writes for AWS DocumentDB compatibility | ||
| maxPoolSize=self.environment_variables.MONGODB_MAX_POOL_SIZE, | ||
| minPoolSize=self.environment_variables.MONGODB_MIN_POOL_SIZE, | ||
| maxIdleTimeMS=30000, # Close connections after 30 seconds of inactivity | ||
| waitQueueTimeoutMS=5000, # Wait up to 5 seconds for a connection from pool | ||
| ) | ||
|
|
||
| def _mongodb_uses_oidc(self) -> bool: | ||
| """True only when the Mongo URI authenticates via MONGODB-OIDC. | ||
|
|
||
| Gates the refresh loop so standard-auth / AWS DocumentDB deployments are | ||
| never churned — only GCP OIDC tokens expire out from under a live client. | ||
| """ | ||
| uri = self.environment_variables.MONGODB_URI or "" | ||
| return "MONGODB-OIDC" in uri.upper() | ||
|
|
||
| async def refresh_mongodb_client(self) -> None: | ||
| """Rebuild the Mongo client to renew the cached GCP OIDC token. | ||
|
|
||
| pymongo's built-in GCP OIDC provider caches the access token for the life | ||
| of the client and only refreshes it reactively (on a server reauth | ||
| challenge). GCP tokens expire after ~1h, so a long-lived client eventually | ||
| fails auth. A new client authenticates fresh, picking up a new token. | ||
|
|
||
| The new client is built and pinged (which forces fresh auth) before the | ||
| swap, and the old client is closed only after a drain delay, so no in-flight | ||
| operation is ever dropped and we never swap to a broken client. | ||
| """ | ||
| mongodb_uri = self.environment_variables.MONGODB_URI | ||
| if not mongodb_uri or not self._mongodb_uses_oidc(): | ||
| return | ||
|
|
||
| new_client = self._build_mongodb_client(mongodb_uri) | ||
| # Force fresh OIDC auth and validate the new client before trusting it. | ||
| # If this raises, we keep using the existing (working) client. | ||
| await new_client.admin.command("ping") | ||
|
|
||
| old_client = self.mongodb_client | ||
| self.mongodb_client = new_client | ||
| self.mongodb_database = new_client[ | ||
| self.environment_variables.MONGODB_DATABASE_NAME | ||
| ] | ||
| logger.info("Refreshed MongoDB client to renew OIDC credentials") | ||
|
|
||
| if old_client is not None and old_client is not new_client: | ||
| task = asyncio.create_task( | ||
| self._close_mongodb_client_after_delay(old_client) | ||
| ) | ||
| # Keep a strong reference until done so the task is not GC'd mid-flight. | ||
| self._mongodb_close_tasks.add(task) | ||
| task.add_done_callback(self._mongodb_close_tasks.discard) | ||
|
Comment on lines
+280
to
+286
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This refresh swaps only ArtifactsRepro: pytest harness for cached Mongo collection across OIDC refresh
Repro: verbose pytest output showing cached collection uses the closed superseded client
Prompt To Fix With AIThis is a comment left during a code review.
Path: agentex/src/config/dependencies.py
Line: 280-286
Comment:
**Old clients still used**
This refresh swaps only `GlobalDependencies.mongodb_database`, but long-lived consumers can keep using `AsyncDatabase` and collection objects from the old client. For example, the Temporal worker builds retention repositories once at startup, and `MongoDBCRUDRepository.__init__` stores `self.collection = db[collection_name]`. After the first refresh, those activities still use collections bound to `old_client`; once this delayed close runs, retention cleanup Mongo operations can fail even though the global dependency points at a fresh client.
How can I resolve this? If you propose a fix, please make it concise. |
||
|
|
||
| async def _close_mongodb_client_after_delay( | ||
| self, client: AsyncMongoClient, delay: float = 60.0 | ||
| ) -> None: | ||
| """Close a superseded Mongo client after letting in-flight ops drain.""" | ||
| try: | ||
| await asyncio.sleep(delay) | ||
| await client.close() | ||
| except asyncio.CancelledError: | ||
| await client.close() | ||
| raise | ||
| except Exception as e: | ||
| logger.warning(f"Error closing superseded MongoDB client: {e}") | ||
|
|
||
| def _start_mongodb_oidc_refresh_loop(self) -> None: | ||
| interval = self.environment_variables.MONGODB_OIDC_REFRESH_INTERVAL_SECONDS | ||
| if ( | ||
| self.mongodb_client is None | ||
| or not self._mongodb_uses_oidc() | ||
| or interval <= 0 | ||
| or self._mongodb_refresh_task is not None | ||
| ): | ||
| return | ||
| self._mongodb_refresh_task = asyncio.create_task( | ||
| self._mongodb_oidc_refresh_loop(interval) | ||
| ) | ||
| logger.info(f"Started MongoDB OIDC refresh loop (interval={interval}s)") | ||
|
|
||
| async def _mongodb_oidc_refresh_loop(self, interval: int) -> None: | ||
| while True: | ||
| try: | ||
| await asyncio.sleep(interval) | ||
| await self.refresh_mongodb_client() | ||
| except asyncio.CancelledError: | ||
| raise | ||
| except Exception as e: | ||
| logger.error( | ||
| f"MongoDB OIDC refresh failed; retrying next interval: {e}" | ||
| ) | ||
|
|
||
| async def _stop_mongodb_oidc_refresh_loop(self) -> None: | ||
| if self._mongodb_refresh_task is not None: | ||
| self._mongodb_refresh_task.cancel() | ||
| try: | ||
| await self._mongodb_refresh_task | ||
| except asyncio.CancelledError: | ||
| pass | ||
| self._mongodb_refresh_task = None | ||
|
|
||
| async def force_reload(self): | ||
| """Force reload all dependencies with fresh environment variables""" | ||
| # Stop the MongoDB OIDC refresh loop before tearing down the client | ||
| await self._stop_mongodb_oidc_refresh_loop() | ||
|
|
||
| # Stop metrics collection | ||
| if self.postgres_metrics_collector: | ||
| await self.postgres_metrics_collector.stop_collection() | ||
|
|
@@ -272,6 +380,9 @@ def shutdown(): | |
| async def async_shutdown(): | ||
| global_dependencies = GlobalDependencies() | ||
|
|
||
| # Stop the MongoDB OIDC refresh loop | ||
| await global_dependencies._stop_mongodb_oidc_refresh_loop() | ||
|
|
||
| # Stop PostgreSQL metrics collection | ||
| if global_dependencies.postgres_metrics_collector: | ||
| await global_dependencies.postgres_metrics_collector.stop_collection() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| """Integration tests for the MongoDB client-refresh swap against a real Mongo. | ||
|
|
||
| The unit tests mock the client; these prove the build-validate-swap-drain works | ||
| end-to-end against a live MongoDB container: data written before the swap is still | ||
| readable after it, the post-swap client is fully functional, and the superseded | ||
| client is drained and closed. (The container doesn't speak GCP OIDC, so the OIDC | ||
| gate is forced on to exercise the swap path itself.) | ||
| """ | ||
|
|
||
| from unittest.mock import AsyncMock | ||
|
|
||
| import pytest | ||
| from src.config.dependencies import GlobalDependencies, Singleton | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def deps(mongodb_connection_string): | ||
| """Fresh GlobalDependencies wired to the test Mongo container.""" | ||
| Singleton._instances.pop(GlobalDependencies, None) | ||
| instance = GlobalDependencies() | ||
| instance.environment_variables = instance.environment_variables.model_copy( | ||
| update={ | ||
| "MONGODB_URI": mongodb_connection_string, | ||
| "MONGODB_DATABASE_NAME": "agentex_oidc_refresh_test", | ||
| } | ||
| ) | ||
| instance.mongodb_client = instance._build_mongodb_client(mongodb_connection_string) | ||
| instance.mongodb_database = instance.mongodb_client["agentex_oidc_refresh_test"] | ||
| yield instance | ||
| Singleton._instances.pop(GlobalDependencies, None) | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| @pytest.mark.integration | ||
| async def test_refresh_preserves_data_and_drains_old_client(deps, monkeypatch): | ||
| # Treat the container URI as OIDC so the refresh path actually runs, and | ||
| # collapse the drain delay so the close completes within the test. | ||
| monkeypatch.setattr(deps, "_mongodb_uses_oidc", lambda: True) | ||
| original_close_after_delay = deps._close_mongodb_client_after_delay | ||
|
|
||
| async def fast_close(client, delay=0.0): | ||
| await original_close_after_delay(client, delay=0.0) | ||
|
|
||
| monkeypatch.setattr(deps, "_close_mongodb_client_after_delay", fast_close) | ||
|
|
||
| collection = "docs" | ||
| await deps.mongodb_database[collection].insert_one({"_id": "before", "n": 1}) | ||
|
|
||
| old_client = deps.mongodb_client | ||
| old_client.close = AsyncMock(wraps=old_client.close) | ||
|
|
||
| await deps.refresh_mongodb_client() | ||
|
|
||
| # A genuinely new client is now installed. | ||
| assert deps.mongodb_client is not old_client | ||
|
|
||
| # The new client can write, and reads the doc written before the swap. | ||
| await deps.mongodb_database[collection].insert_one({"_id": "after", "n": 2}) | ||
| ids = { | ||
| doc["_id"] | ||
| async for doc in deps.mongodb_database[collection].find({}, {"_id": 1}) | ||
| } | ||
| assert ids == {"before", "after"} | ||
|
|
||
| # The superseded client is drained and closed. | ||
| for task in list(deps._mongodb_close_tasks): | ||
| await task | ||
| old_client.close.assert_awaited_once() | ||
|
|
||
| await deps.mongodb_client.drop_database("agentex_oidc_refresh_test") | ||
| await deps.mongodb_client.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the fresh client fails validation, the exception leaves
refresh_mongodb_client()beforenew_clientis closed. The refresh loop catches the error and retries on the next interval, so an auth or network outage can accumulate failedAsyncMongoClientinstances and their underlying resources for the lifetime of the process. Close the candidate client before re-raising whenadmin.command("ping")fails.Artifacts
Repro: pytest harness that forces failed MongoDB refresh candidates and asserts they are not closed
Repro: verbose pytest output showing two failed ping attempts with candidate_close_awaits=0
Prompt To Fix With AI