Fix Redis stream leak: MAXLEN on xadd + cleanup_stream on terminal transitions#339
Open
Fix Redis stream leak: MAXLEN on xadd + cleanup_stream on terminal transitions#339
Conversation
The SDK's RedisStreamRepository.send_event was calling xadd with no
MAXLEN, so every task:* stream grew unbounded for the lifetime of the
task. The accompanying comment ("Add to Redis stream with a reasonable
max length") suggested the cap was intended but never wired up.
This change matches the agentex server-side adapter, which has had
maxlen=REDIS_STREAM_MAXLEN, approximate=True since Jan 2 (PR #111 in
scaleapi/scale-agentex). Default is 10000 entries, overridable via the
REDIS_STREAM_MAXLEN env var, same as the server.
Note: this caps each stream's size during generation but does not
delete streams when their task completes -- that's a separate fix.
…ions
The SDK defines RedisStreamRepository.cleanup_stream(topic) but never
calls it. As a result, every task:* stream that an agent writes to
during its lifetime stays in Redis forever, even after the task
completes. For long-lived clusters this leaks unbounded memory.
This change wires cleanup_stream into all terminal status transitions
on TasksService: cancel, complete, fail, terminate, timeout, and
delete. After the corresponding agentex API call returns successfully,
the task's Redis stream (task:{task_id}) is deleted best-effort -- any
failure is logged and swallowed so cleanup issues never break the
lifecycle call itself.
TasksService now accepts an optional stream_repository in __init__.
The two existing construction sites pass one in:
- get_all_activities() reuses the same RedisStreamRepository already
constructed for StreamingService
- TasksModule's lazy default constructs its own when no service is
provided
Note: this only fires when an application actually calls a terminal
transition method (e.g. adk.tasks.complete). Workflows that exit
without calling these still leak streams; that is a separate concern
for the application layer.
Contributor
|
on the backend side, i just made a change to pipeline xadd with an expire call to ensure streams get cleaned up by redis if nothing else, maybe worth doing something similar here? ref: scaleapi/scale-agentex#215 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
I think
RedisStreamRepositoryhas been leaking.Every
task:{id}stream grows unbounded for the lifetime of the task until it hits the MAXLEN set by agentex-server (not by the sdk). There's aclean_streammethod but it's never called.Two complementary fixes, one commit each:
Commit 1 —
perf(streaming): add MAXLEN to Redis xaddRedisStreamRepository.send_eventwas callingxaddwith noMAXLEN, despite a comment claiming "Add to Redis stream with a reasonable max length".This matches:
https://github.com/scaleapi/scale-agentex/blob/40ac50324bdf1c19f8c7d5e8121e69430eb7d146/agentex/src/adapters/streams/adapter_redis.py#L62
Commit 2 —
feat(streaming): cleanup task Redis stream on terminal status transitionsThe SDK already defined
RedisStreamRepository.cleanup_stream(topic)but never called it. This change wires it into all terminal status transitions onTasksService:cancel,complete,fail,terminate,timeout, anddelete.Delete is best-effort. Failures are logged and swollowed.
Also
TasksServicenow accepts an optionalstream_repositoryin__init__. The two existing construction sites pass one in:get_all_activities()reuses the sameRedisStreamRepositoryalready constructed forStreamingService.TasksModule's lazy default constructs its own when no service is provided.