Skip to content

Fix Redis stream leak: MAXLEN on xadd + cleanup_stream on terminal transitions#339

Open
x wants to merge 2 commits intomainfrom
devon.peticolas/fix-redis-stream-leak
Open

Fix Redis stream leak: MAXLEN on xadd + cleanup_stream on terminal transitions#339
x wants to merge 2 commits intomainfrom
devon.peticolas/fix-redis-stream-leak

Conversation

@x
Copy link
Copy Markdown
Contributor

@x x commented May 4, 2026

Summary

I think RedisStreamRepository has been leaking.

Every task:{id} stream grows unbounded for the lifetime of the task until it hits the MAXLEN set by agentex-server (not by the sdk). There's a clean_stream method but it's never called.

Two complementary fixes, one commit each:

Commit 1 — perf(streaming): add MAXLEN to Redis xadd

RedisStreamRepository.send_event was calling xadd with no MAXLEN, despite a comment claiming "Add to Redis stream with a reasonable max length".

This matches:
https://github.com/scaleapi/scale-agentex/blob/40ac50324bdf1c19f8c7d5e8121e69430eb7d146/agentex/src/adapters/streams/adapter_redis.py#L62

Commit 2 — feat(streaming): cleanup task Redis stream on terminal status transitions

The SDK already defined RedisStreamRepository.cleanup_stream(topic) but never called it. This change wires it into all terminal status transitions on TasksService: cancel, complete, fail, terminate, timeout, and delete.

Delete is best-effort. Failures are logged and swollowed.

Also TasksService now accepts an optional stream_repository in __init__. The two existing construction sites pass one in:

  • get_all_activities() reuses the same RedisStreamRepository already constructed for StreamingService.
  • TasksModule's lazy default constructs its own when no service is provided.

x added 2 commits May 4, 2026 01:04
The SDK's RedisStreamRepository.send_event was calling xadd with no
MAXLEN, so every task:* stream grew unbounded for the lifetime of the
task. The accompanying comment ("Add to Redis stream with a reasonable
max length") suggested the cap was intended but never wired up.

This change matches the agentex server-side adapter, which has had
maxlen=REDIS_STREAM_MAXLEN, approximate=True since Jan 2 (PR #111 in
scaleapi/scale-agentex). Default is 10000 entries, overridable via the
REDIS_STREAM_MAXLEN env var, same as the server.

Note: this caps each stream's size during generation but does not
delete streams when their task completes -- that's a separate fix.
…ions

The SDK defines RedisStreamRepository.cleanup_stream(topic) but never
calls it. As a result, every task:* stream that an agent writes to
during its lifetime stays in Redis forever, even after the task
completes. For long-lived clusters this leaks unbounded memory.

This change wires cleanup_stream into all terminal status transitions
on TasksService: cancel, complete, fail, terminate, timeout, and
delete. After the corresponding agentex API call returns successfully,
the task's Redis stream (task:{task_id}) is deleted best-effort -- any
failure is logged and swallowed so cleanup issues never break the
lifecycle call itself.

TasksService now accepts an optional stream_repository in __init__.
The two existing construction sites pass one in:
- get_all_activities() reuses the same RedisStreamRepository already
  constructed for StreamingService
- TasksModule's lazy default constructs its own when no service is
  provided

Note: this only fires when an application actually calls a terminal
transition method (e.g. adk.tasks.complete). Workflows that exit
without calling these still leak streams; that is a separate concern
for the application layer.
@smoreinis
Copy link
Copy Markdown
Contributor

on the backend side, i just made a change to pipeline xadd with an expire call to ensure streams get cleaned up by redis if nothing else, maybe worth doing something similar here? ref: scaleapi/scale-agentex#215

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants