From 46ed0666e648b292e3125e4e42ccdf658c772348 Mon Sep 17 00:00:00 2001 From: Devon Peticolas Date: Mon, 4 May 2026 01:04:26 -0400 Subject: [PATCH 1/2] perf(streaming): add MAXLEN to Redis xadd to prevent unbounded growth The SDK's RedisStreamRepository.send_event was calling xadd with no MAXLEN, so every task:* stream grew unbounded for the lifetime of the task. The accompanying comment ("Add to Redis stream with a reasonable max length") suggested the cap was intended but never wired up. This change matches the agentex server-side adapter, which has had maxlen=REDIS_STREAM_MAXLEN, approximate=True since Jan 2 (PR #111 in scaleapi/scale-agentex). Default is 10000 entries, overridable via the REDIS_STREAM_MAXLEN env var, same as the server. Note: this caps each stream's size during generation but does not delete streams when their task completes -- that's a separate fix. --- .../lib/core/adapters/streams/adapter_redis.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/agentex/lib/core/adapters/streams/adapter_redis.py b/src/agentex/lib/core/adapters/streams/adapter_redis.py index 7b355ee94..7d1a67385 100644 --- a/src/agentex/lib/core/adapters/streams/adapter_redis.py +++ b/src/agentex/lib/core/adapters/streams/adapter_redis.py @@ -15,18 +15,26 @@ logger = make_logger(__name__) +_DEFAULT_STREAM_MAXLEN = 10000 + + class RedisStreamRepository(StreamRepository): """ A simplified Redis implementation of the EventStreamRepository interface. Optimized for text/JSON streaming with SSE. """ - def __init__(self, redis_url: str | None = None): + def __init__(self, redis_url: str | None = None, stream_maxlen: int | None = None): # Get Redis URL from environment if not provided self.redis_url = redis_url or os.environ.get( "REDIS_URL", "redis://localhost:6379" ) self.redis = redis.from_url(self.redis_url) + self.stream_maxlen = ( + stream_maxlen + if stream_maxlen is not None + else int(os.environ.get("REDIS_STREAM_MAXLEN", _DEFAULT_STREAM_MAXLEN)) + ) @override async def send_event(self, topic: str, event: dict[str, Any]) -> str: @@ -47,10 +55,11 @@ async def send_event(self, topic: str, event: dict[str, Any]) -> str: # # Uncomment to debug # logger.info(f"Sending event to Redis stream {topic}: {event_json}") - # Add to Redis stream with a reasonable max length message_id = await self.redis.xadd( name=topic, fields={"data": event_json}, + maxlen=self.stream_maxlen, + approximate=True, ) return message_id From d97d6b13aa676548ec8bacb6cb9cfacf5fa73b11 Mon Sep 17 00:00:00 2001 From: Devon Peticolas Date: Tue, 5 May 2026 14:30:59 -0400 Subject: [PATCH 2/2] feat(streaming): add sliding TTL to Redis stream keys MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirror scaleapi/scale-agentex#215 (server-side adapter): pipeline XADD with EXPIRE so each task:* stream key gets a sliding TTL. Orphaned streams (no writes for the TTL window) self-delete in Redis without needing an explicit cleanup_stream call from the caller. This is the right shape of fix for the SDK's leak: an explicit DEL on terminal task transitions (an earlier draft of this PR) introduced a race where the server's task_updated event published to the same topic could be deleted before a connected frontend SSE consumer read it. EXPIRE sidesteps that — TTL only fires after inactivity, so an actively-streaming agent or actively-reading consumer keeps the key alive, and the key only ages out once everyone is done with it. Defaults match the server: REDIS_STREAM_TTL_SECONDS=3600 (1h), overridable via env var. Setting it to 0 short-circuits to plain XADD (no TTL refresh), matching the server's escape hatch. Implementation notes: - transaction=False on the pipeline: connection-level batching, no MULTI/EXEC overhead for what's already a fast op. - raise_on_error=False: an EXPIRE failure after a successful XADD must not surface to the caller. The message has been published; retrying would duplicate it. We log and move on. Next successful XADD will reset the TTL anyway. --- .../core/adapters/streams/adapter_redis.py | 56 ++++++++++++++++--- 1 file changed, 49 insertions(+), 7 deletions(-) diff --git a/src/agentex/lib/core/adapters/streams/adapter_redis.py b/src/agentex/lib/core/adapters/streams/adapter_redis.py index 7d1a67385..8446d67f1 100644 --- a/src/agentex/lib/core/adapters/streams/adapter_redis.py +++ b/src/agentex/lib/core/adapters/streams/adapter_redis.py @@ -16,6 +16,7 @@ _DEFAULT_STREAM_MAXLEN = 10000 +_DEFAULT_STREAM_TTL_SECONDS = 3600 class RedisStreamRepository(StreamRepository): @@ -24,7 +25,12 @@ class RedisStreamRepository(StreamRepository): Optimized for text/JSON streaming with SSE. """ - def __init__(self, redis_url: str | None = None, stream_maxlen: int | None = None): + def __init__( + self, + redis_url: str | None = None, + stream_maxlen: int | None = None, + stream_ttl_seconds: int | None = None, + ): # Get Redis URL from environment if not provided self.redis_url = redis_url or os.environ.get( "REDIS_URL", "redis://localhost:6379" @@ -35,6 +41,14 @@ def __init__(self, redis_url: str | None = None, stream_maxlen: int | None = Non if stream_maxlen is not None else int(os.environ.get("REDIS_STREAM_MAXLEN", _DEFAULT_STREAM_MAXLEN)) ) + # 0 disables sliding TTL. + self.stream_ttl_seconds = ( + stream_ttl_seconds + if stream_ttl_seconds is not None + else int( + os.environ.get("REDIS_STREAM_TTL_SECONDS", _DEFAULT_STREAM_TTL_SECONDS) + ) + ) @override async def send_event(self, topic: str, event: dict[str, Any]) -> str: @@ -55,12 +69,40 @@ async def send_event(self, topic: str, event: dict[str, Any]) -> str: # # Uncomment to debug # logger.info(f"Sending event to Redis stream {topic}: {event_json}") - message_id = await self.redis.xadd( - name=topic, - fields={"data": event_json}, - maxlen=self.stream_maxlen, - approximate=True, - ) + # Pipeline XADD + EXPIRE in one round-trip so the stream key gets + # a sliding TTL — orphaned streams (no writes for the TTL window) + # self-delete. Mirrors the server-side adapter (scaleapi/scale-agentex#215). + if self.stream_ttl_seconds > 0: + async with self.redis.pipeline(transaction=False) as pipe: + pipe.xadd( + name=topic, + fields={"data": event_json}, + maxlen=self.stream_maxlen, + approximate=True, + ) + pipe.expire(name=topic, time=self.stream_ttl_seconds) + # raise_on_error=False so an EXPIRE failure does not surface + # to the caller after XADD already succeeded — that would + # risk callers retrying and duplicating messages. A failed + # TTL refresh is recoverable: MAXLEN still caps RAM and the + # next write resets the clock. + results = await pipe.execute(raise_on_error=False) + # results[0] = xadd message ID (or Exception) + # results[1] = expire bool (or Exception) + message_id = results[0] + if isinstance(message_id, Exception): + raise message_id + if isinstance(results[1], Exception): + logger.warning( + f"Failed to refresh TTL on stream {topic}: {results[1]}" + ) + else: + message_id = await self.redis.xadd( + name=topic, + fields={"data": event_json}, + maxlen=self.stream_maxlen, + approximate=True, + ) return message_id except Exception as e: