Skip to content

fix(sse): fix memory leaks in SSE stream cleanup and add memory telemetry#3378

Open
waleedlatif1 wants to merge 6 commits intostagingfrom
waleedlatif1/add-aws-monitoring
Open

fix(sse): fix memory leaks in SSE stream cleanup and add memory telemetry#3378
waleedlatif1 wants to merge 6 commits intostagingfrom
waleedlatif1/add-aws-monitoring

Conversation

@waleedlatif1
Copy link
Collaborator

Summary

  • Fix MCP events SSE missing cancel() handler — leaked intervals and pub/sub subscribers on disconnect
  • Fix workflow execute SSE cancel() not aborting execution — workflows continued running after client disconnect
  • Fix A2A serve abort listener missing { once: true } — listener closures held until GC
  • Cap event-buffer and stream-buffer pending arrays to prevent unbounded growth on Redis failure
  • Cap isolated-vm stderr accumulation at 64KB
  • Add periodic memory telemetry (heap, RSS, uptime) logged every 60s
  • Add active SSE connection counter by route for leak diagnostics

Type of Change

  • Bug fix

Testing

  • TypeScript compiles clean (bunx tsc --noEmit)
  • All 4,162 tests pass (bunx vitest run)
  • Lint clean (bun run lint)

Checklist

  • Code follows project style guidelines
  • Self-reviewed my changes
  • Tests added/updated and passing
  • No new warnings introduced
  • I confirm that I have read and agree to the terms outlined in the Contributor License Agreement (CLA)

@vercel
Copy link

vercel bot commented Feb 28, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

1 Skipped Deployment
Project Deployment Actions Updated (UTC)
docs Skipped Skipped Feb 28, 2026 1:35am

Request Review

@waleedlatif1 waleedlatif1 force-pushed the waleedlatif1/add-aws-monitoring branch from e7dec5f to cf2a497 Compare February 28, 2026 01:11
@waleedlatif1
Copy link
Collaborator Author

@cursor review

@waleedlatif1
Copy link
Collaborator Author

@greptile

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Feb 28, 2026

Greptile Summary

Fixed critical memory leaks in SSE stream handling by adding missing cancel() handlers and proper cleanup logic across all SSE routes. These leaks caused intervals, pub/sub subscribers, and event listeners to persist after client disconnect.

Key fixes:

  • Added cancel() handlers to MCP events, workflow execute, A2A serve, wand, and execution stream routes
  • Fixed A2A serve abort listener missing { once: true } that kept listener closures in memory until GC
  • Hoisted cleanup function in A2A resubscribe to eliminate duplication between start() and cancel()
  • Capped event-buffer and stream-buffer pending arrays at 1000 events to prevent unbounded growth during Redis failures
  • Capped isolated-vm stderr accumulation at 64KB
  • Added periodic memory telemetry (60s intervals) with heap, RSS, uptime, and SSE connection counts
  • Added SSE connection counter by route for leak diagnostics

Note: Workflow execute cancel() intentionally does NOT abort execution to preserve run-on-leave behavior where workflows continue running after client disconnect.

Confidence Score: 5/5

  • This PR is safe to merge with minimal risk
  • All changes are defensive memory leak fixes with proper cleanup patterns. The code adds missing cancel() handlers, caps unbounded arrays, and introduces observability without changing business logic. All tests pass and the changes follow established patterns.
  • No files require special attention

Important Files Changed

Filename Overview
apps/sim/app/api/mcp/events/route.ts Added cancel() handler, cleanup() function with cleaned flag, { once: true } on abort listener, and SSE connection tracking
apps/sim/app/api/workflows/[id]/execute/route.ts Added cancel() handler and SSE connection tracking; intentionally does not abort execution to preserve run-on-leave behavior
apps/sim/app/api/a2a/serve/[agentId]/route.ts Added { once: true } to abort listener, hoisted cleanup function, added cancel() handlers for message and resubscribe streams, SSE tracking
apps/sim/lib/execution/event-buffer.ts Added MAX_PENDING_EVENTS cap (1000) to prevent unbounded memory growth during Redis failures
apps/sim/lib/copilot/orchestrator/stream-buffer.ts Added capping to pending array using config.eventLimit to prevent unbounded growth during Redis failures
apps/sim/lib/monitoring/memory-telemetry.ts New file implementing periodic memory telemetry (60s interval) using stable process.getActiveResourcesInfo() API

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[Client connects to SSE endpoint] --> B[start: incrementSSEConnections]
    B --> C[Create intervals/subscriptions]
    C --> D[Stream data to client]
    D --> E{Disconnect or cancel?}
    E -->|Client abort signal| F[abort listener with once: true]
    E -->|Stream cancel called| G[cancel handler]
    F --> H[cleanup function]
    G --> H
    H --> I{Already cleaned?}
    I -->|Yes| J[Return early]
    I -->|No| K[Set cleaned/closed flag]
    K --> L[Clear intervals]
    L --> M[Unsubscribe from pub/sub]
    M --> N[decrementSSEConnections]
    N --> O[Close controller]
    
    P[Memory Telemetry 60s] --> Q[Log heap/RSS stats]
    Q --> R[Log SSE connection counts]
    R --> S[Log active resources]
    S --> P
    
    T[Redis failure] --> U[Events accumulate in pending array]
    U --> V{pending.length > cap?}
    V -->|Yes| W[Drop oldest events]
    V -->|No| X[Keep accumulating]
    W --> Y[Log warning]
Loading

Last reviewed commit: 96fea68

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

11 files reviewed, 4 comments

Edit Code Review Agent Settings | Greptile

@waleedlatif1 waleedlatif1 force-pushed the waleedlatif1/add-aws-monitoring branch from cf2a497 to 2b37d5b Compare February 28, 2026 01:16
@waleedlatif1 waleedlatif1 force-pushed the waleedlatif1/add-aws-monitoring branch from 2b37d5b to 979cae7 Compare February 28, 2026 01:17
@waleedlatif1
Copy link
Collaborator Author

@cursor review

@waleedlatif1
Copy link
Collaborator Author

@greptile

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Bugbot reviewed your changes and found no new issues!

Comment @cursor review or bugbot run to trigger another review on this PR

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Feb 28, 2026

Additional Comments (1)

apps/sim/app/api/workflows/[id]/execute/route.ts
cancel() closes eventWriter while start() execution is still in-flight

cancel() calls await eventWriter.close() (line 1170), which immediately sets closed = true and flushes pending events. However, start() is still executing asynchronously — it awaits executeWorkflowCore, which may not respond to the abort signal instantaneously (e.g., if it is mid-way through an external AI API call).

Any events that start()'s execution writes to eventWriter after cancel() has set closed = true but before executeWorkflowCore sees the abort and stops will be silently dropped (the writeCore guard returns early when closed is true). These events — including block-completion events or error details — won't be persisted to Redis and will be lost from the audit log.

Since close() already awaits all inflight writes tracked in inflightWrites via Promise.allSettled, the issue is only with writes that are initiated after close() sets closed = true. The window is small but real: between timeoutController.abort() (line 1168) and the moment executeWorkflowCore honours the signal, the execution may still emit events.

One approach is to not call eventWriter.close() from cancel() at all, and instead let start()'s existing finally block handle it after the execution has fully unwound. The abort signal will cause executeWorkflowCore to return quickly, and the finally block will then flush and close correctly with no lost events.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@waleedlatif1
Copy link
Collaborator Author

@greptile

@waleedlatif1
Copy link
Collaborator Author

@cursor review

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

incrementSSEConnections('wand')
const reader = response.body?.getReader()
if (!reader) {
decrementSSEConnections('wand')
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wand early-return decrements without setting guard flag

Low Severity

The early-return path when reader is falsy calls decrementSSEConnections('wand') without setting wandStreamClosed = true. Every other decrement site in this stream (the finally block and the cancel() handler) guards the decrement with the wandStreamClosed flag, but this path is the sole exception. While controller.close() on the next line typically prevents cancel() from firing, the unguarded decrement is inconsistent with the pattern established everywhere else in this PR and creates a subtle maintenance hazard.

Additional Locations (1)

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant