diff --git a/apps/sim/app/api/a2a/serve/[agentId]/route.ts b/apps/sim/app/api/a2a/serve/[agentId]/route.ts index d10ef0eea2..f865e2c396 100644 --- a/apps/sim/app/api/a2a/serve/[agentId]/route.ts +++ b/apps/sim/app/api/a2a/serve/[agentId]/route.ts @@ -19,6 +19,7 @@ import { validateUrlWithDNS } from '@/lib/core/security/input-validation.server' import { SSE_HEADERS } from '@/lib/core/utils/sse' import { getBaseUrl } from '@/lib/core/utils/urls' import { markExecutionCancelled } from '@/lib/execution/cancellation' +import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections' import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils' import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils' import { @@ -630,9 +631,11 @@ async function handleMessageStream( } const encoder = new TextEncoder() + let messageStreamDecremented = false const stream = new ReadableStream({ async start(controller) { + incrementSSEConnections('a2a-message') const sendEvent = (event: string, data: unknown) => { try { const jsonRpcResponse = { @@ -841,9 +844,19 @@ async function handleMessageStream( }) } finally { await releaseLock(lockKey, lockValue) + if (!messageStreamDecremented) { + messageStreamDecremented = true + decrementSSEConnections('a2a-message') + } controller.close() } }, + cancel() { + if (!messageStreamDecremented) { + messageStreamDecremented = true + decrementSSEConnections('a2a-message') + } + }, }) return new NextResponse(stream, { @@ -1016,16 +1029,34 @@ async function handleTaskResubscribe( let pollTimeoutId: ReturnType | null = null const abortSignal = request.signal - abortSignal.addEventListener('abort', () => { + abortSignal.addEventListener( + 'abort', + () => { + isCancelled = true + if (pollTimeoutId) { + clearTimeout(pollTimeoutId) + pollTimeoutId = null + } + }, + { once: true } + ) + + let sseDecremented = false + const cleanup = () => { isCancelled = true if (pollTimeoutId) { clearTimeout(pollTimeoutId) pollTimeoutId = null } - }) + if (!sseDecremented) { + sseDecremented = true + decrementSSEConnections('a2a-resubscribe') + } + } const stream = new ReadableStream({ async start(controller) { + incrementSSEConnections('a2a-resubscribe') const sendEvent = (event: string, data: unknown): boolean => { if (isCancelled || abortSignal.aborted) return false try { @@ -1041,14 +1072,6 @@ async function handleTaskResubscribe( } } - const cleanup = () => { - isCancelled = true - if (pollTimeoutId) { - clearTimeout(pollTimeoutId) - pollTimeoutId = null - } - } - if ( !sendEvent('status', { kind: 'status', @@ -1160,11 +1183,7 @@ async function handleTaskResubscribe( poll() }, cancel() { - isCancelled = true - if (pollTimeoutId) { - clearTimeout(pollTimeoutId) - pollTimeoutId = null - } + cleanup() }, }) diff --git a/apps/sim/app/api/mcp/events/route.ts b/apps/sim/app/api/mcp/events/route.ts index 6df91db5c0..7def26b345 100644 --- a/apps/sim/app/api/mcp/events/route.ts +++ b/apps/sim/app/api/mcp/events/route.ts @@ -14,6 +14,7 @@ import { getSession } from '@/lib/auth' import { SSE_HEADERS } from '@/lib/core/utils/sse' import { mcpConnectionManager } from '@/lib/mcp/connection-manager' import { mcpPubSub } from '@/lib/mcp/pubsub' +import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections' import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' const logger = createLogger('McpEventsSSE') @@ -41,10 +42,24 @@ export async function GET(request: NextRequest) { const encoder = new TextEncoder() const unsubscribers: Array<() => void> = [] + let cleaned = false + + const cleanup = () => { + if (cleaned) return + cleaned = true + for (const unsub of unsubscribers) { + unsub() + } + decrementSSEConnections('mcp-events') + logger.info(`SSE connection closed for workspace ${workspaceId}`) + } const stream = new ReadableStream({ start(controller) { + incrementSSEConnections('mcp-events') + const send = (eventName: string, data: Record) => { + if (cleaned) return try { controller.enqueue( encoder.encode(`event: ${eventName}\ndata: ${JSON.stringify(data)}\n\n`) @@ -82,6 +97,10 @@ export async function GET(request: NextRequest) { // Heartbeat to keep the connection alive const heartbeat = setInterval(() => { + if (cleaned) { + clearInterval(heartbeat) + return + } try { controller.enqueue(encoder.encode(': heartbeat\n\n')) } catch { @@ -91,20 +110,24 @@ export async function GET(request: NextRequest) { unsubscribers.push(() => clearInterval(heartbeat)) // Cleanup when client disconnects - request.signal.addEventListener('abort', () => { - for (const unsub of unsubscribers) { - unsub() - } - try { - controller.close() - } catch { - // Already closed - } - logger.info(`SSE connection closed for workspace ${workspaceId}`) - }) + request.signal.addEventListener( + 'abort', + () => { + cleanup() + try { + controller.close() + } catch { + // Already closed + } + }, + { once: true } + ) logger.info(`SSE connection opened for workspace ${workspaceId}`) }, + cancel() { + cleanup() + }, }) return new Response(stream, { headers: SSE_HEADERS }) diff --git a/apps/sim/app/api/wand/route.ts b/apps/sim/app/api/wand/route.ts index f868364ae6..abebcc1894 100644 --- a/apps/sim/app/api/wand/route.ts +++ b/apps/sim/app/api/wand/route.ts @@ -10,6 +10,7 @@ import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing' import { env } from '@/lib/core/config/env' import { getCostMultiplier, isBillingEnabled } from '@/lib/core/config/feature-flags' import { generateRequestId } from '@/lib/core/utils/request' +import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections' import { enrichTableSchema } from '@/lib/table/llm/wand' import { verifyWorkspaceMembership } from '@/app/api/workflows/utils' import { extractResponseText, parseResponsesUsage } from '@/providers/openai/utils' @@ -330,10 +331,14 @@ export async function POST(req: NextRequest) { const encoder = new TextEncoder() const decoder = new TextDecoder() + let wandStreamClosed = false const readable = new ReadableStream({ async start(controller) { + incrementSSEConnections('wand') const reader = response.body?.getReader() if (!reader) { + wandStreamClosed = true + decrementSSEConnections('wand') controller.close() return } @@ -478,6 +483,16 @@ export async function POST(req: NextRequest) { controller.close() } finally { reader.releaseLock() + if (!wandStreamClosed) { + wandStreamClosed = true + decrementSSEConnections('wand') + } + } + }, + cancel() { + if (!wandStreamClosed) { + wandStreamClosed = true + decrementSSEConnections('wand') } }, }) diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index b393ae492a..b2cb3c1f8c 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -22,6 +22,7 @@ import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/ev import { processInputFileFields } from '@/lib/execution/files' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' +import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections' import { cleanupExecutionBase64Cache, hydrateUserFilesWithBase64, @@ -763,6 +764,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const encoder = new TextEncoder() const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync) let isStreamClosed = false + let sseDecremented = false const eventWriter = createExecutionEventWriter(executionId) setExecutionMeta(executionId, { @@ -773,6 +775,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const stream = new ReadableStream({ async start(controller) { + incrementSSEConnections('workflow-execute') let finalMetaStatus: 'complete' | 'error' | 'cancelled' | null = null const sendEvent = (event: ExecutionEvent) => { @@ -1147,6 +1150,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: if (executionId) { await cleanupExecutionBase64Cache(executionId) } + if (!sseDecremented) { + sseDecremented = true + decrementSSEConnections('workflow-execute') + } if (!isStreamClosed) { try { controller.enqueue(encoder.encode('data: [DONE]\n\n')) @@ -1158,6 +1165,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: cancel() { isStreamClosed = true logger.info(`[${requestId}] Client disconnected from SSE stream`) + if (!sseDecremented) { + sseDecremented = true + decrementSSEConnections('workflow-execute') + } }, }) diff --git a/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts b/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts index 1f77ff391d..88e3c87447 100644 --- a/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts +++ b/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts @@ -7,6 +7,7 @@ import { getExecutionMeta, readExecutionEvents, } from '@/lib/execution/event-buffer' +import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections' import { formatSSEEvent } from '@/lib/workflows/executor/execution-events' import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils' @@ -73,8 +74,10 @@ export async function GET( let closed = false + let sseDecremented = false const stream = new ReadableStream({ async start(controller) { + incrementSSEConnections('execution-stream-reconnect') let lastEventId = fromEventId const pollDeadline = Date.now() + MAX_POLL_DURATION_MS @@ -142,11 +145,20 @@ export async function GET( controller.close() } catch {} } + } finally { + if (!sseDecremented) { + sseDecremented = true + decrementSSEConnections('execution-stream-reconnect') + } } }, cancel() { closed = true logger.info('Client disconnected from reconnection stream', { executionId }) + if (!sseDecremented) { + sseDecremented = true + decrementSSEConnections('execution-stream-reconnect') + } }, }) diff --git a/apps/sim/instrumentation-node.ts b/apps/sim/instrumentation-node.ts index c1d1f4bad8..4100a796c7 100644 --- a/apps/sim/instrumentation-node.ts +++ b/apps/sim/instrumentation-node.ts @@ -160,4 +160,7 @@ async function initializeOpenTelemetry() { export async function register() { await initializeOpenTelemetry() + + const { startMemoryTelemetry } = await import('./lib/monitoring/memory-telemetry') + startMemoryTelemetry() } diff --git a/apps/sim/lib/copilot/orchestrator/stream-buffer.ts b/apps/sim/lib/copilot/orchestrator/stream-buffer.ts index bc0524c4af..81e1cbc350 100644 --- a/apps/sim/lib/copilot/orchestrator/stream-buffer.ts +++ b/apps/sim/lib/copilot/orchestrator/stream-buffer.ts @@ -237,6 +237,15 @@ export function createStreamEventWriter(streamId: string): StreamEventWriter { error: error instanceof Error ? error.message : String(error), }) pending = batch.concat(pending) + if (pending.length > config.eventLimit) { + const dropped = pending.length - config.eventLimit + pending = pending.slice(-config.eventLimit) + logger.warn('Dropped oldest pending stream events due to sustained Redis failure', { + streamId, + dropped, + remaining: pending.length, + }) + } } } diff --git a/apps/sim/lib/execution/event-buffer.ts b/apps/sim/lib/execution/event-buffer.ts index 4473a922f4..81373ccf42 100644 --- a/apps/sim/lib/execution/event-buffer.ts +++ b/apps/sim/lib/execution/event-buffer.ts @@ -10,6 +10,7 @@ const EVENT_LIMIT = 1000 const RESERVE_BATCH = 100 const FLUSH_INTERVAL_MS = 15 const FLUSH_MAX_BATCH = 200 +const MAX_PENDING_EVENTS = 1000 function getEventsKey(executionId: string) { return `${REDIS_PREFIX}${executionId}:events` @@ -184,6 +185,15 @@ export function createExecutionEventWriter(executionId: string): ExecutionEventW stack: error instanceof Error ? error.stack : undefined, }) pending = batch.concat(pending) + if (pending.length > MAX_PENDING_EVENTS) { + const dropped = pending.length - MAX_PENDING_EVENTS + pending = pending.slice(-MAX_PENDING_EVENTS) + logger.warn('Dropped oldest pending events due to sustained Redis failure', { + executionId, + dropped, + remaining: pending.length, + }) + } } } diff --git a/apps/sim/lib/execution/isolated-vm.ts b/apps/sim/lib/execution/isolated-vm.ts index 3da81142f5..9deffbe83c 100644 --- a/apps/sim/lib/execution/isolated-vm.ts +++ b/apps/sim/lib/execution/isolated-vm.ts @@ -716,9 +716,15 @@ function spawnWorker(): Promise { proc.on('message', (message: unknown) => handleWorkerMessage(workerId, message)) + const MAX_STDERR_SIZE = 64 * 1024 let stderrData = '' proc.stderr?.on('data', (data: Buffer) => { - stderrData += data.toString() + if (stderrData.length < MAX_STDERR_SIZE) { + stderrData += data.toString() + if (stderrData.length > MAX_STDERR_SIZE) { + stderrData = stderrData.slice(0, MAX_STDERR_SIZE) + } + } }) const startTimeout = setTimeout(() => { diff --git a/apps/sim/lib/monitoring/memory-telemetry.ts b/apps/sim/lib/monitoring/memory-telemetry.ts new file mode 100644 index 0000000000..a730e0b976 --- /dev/null +++ b/apps/sim/lib/monitoring/memory-telemetry.ts @@ -0,0 +1,47 @@ +/** + * Periodic memory telemetry for diagnosing heap growth in production. + * Logs process.memoryUsage(), V8 heap stats, and active SSE connection + * counts every 60s, enabling correlation between connection leaks and + * memory spikes. + */ + +import v8 from 'node:v8' +import { createLogger } from '@sim/logger' +import { + getActiveSSEConnectionCount, + getActiveSSEConnectionsByRoute, +} from '@/lib/monitoring/sse-connections' + +const logger = createLogger('MemoryTelemetry') + +const MB = 1024 * 1024 + +let started = false + +export function startMemoryTelemetry(intervalMs = 60_000) { + if (started) return + started = true + + const timer = setInterval(() => { + const mem = process.memoryUsage() + const heap = v8.getHeapStatistics() + + logger.info('Memory snapshot', { + heapUsedMB: Math.round(mem.heapUsed / MB), + heapTotalMB: Math.round(mem.heapTotal / MB), + rssMB: Math.round(mem.rss / MB), + externalMB: Math.round(mem.external / MB), + arrayBuffersMB: Math.round(mem.arrayBuffers / MB), + heapSizeLimitMB: Math.round(heap.heap_size_limit / MB), + nativeContexts: heap.number_of_native_contexts, + activeResources: + typeof process.getActiveResourcesInfo === 'function' + ? process.getActiveResourcesInfo().length + : -1, + uptimeMin: Math.round(process.uptime() / 60), + activeSSEConnections: getActiveSSEConnectionCount(), + sseByRoute: getActiveSSEConnectionsByRoute(), + }) + }, intervalMs) + timer.unref() +} diff --git a/apps/sim/lib/monitoring/sse-connections.ts b/apps/sim/lib/monitoring/sse-connections.ts new file mode 100644 index 0000000000..b6394ddff6 --- /dev/null +++ b/apps/sim/lib/monitoring/sse-connections.ts @@ -0,0 +1,27 @@ +/** + * Tracks active SSE connections by route for memory leak diagnostics. + * Logged alongside periodic memory telemetry to correlate connection + * counts with heap growth. + */ + +const connections = new Map() + +export function incrementSSEConnections(route: string) { + connections.set(route, (connections.get(route) ?? 0) + 1) +} + +export function decrementSSEConnections(route: string) { + const count = (connections.get(route) ?? 0) - 1 + if (count <= 0) connections.delete(route) + else connections.set(route, count) +} + +export function getActiveSSEConnectionCount(): number { + let total = 0 + for (const count of connections.values()) total += count + return total +} + +export function getActiveSSEConnectionsByRoute(): Record { + return Object.fromEntries(connections) +}