Skip to content
49 changes: 34 additions & 15 deletions apps/sim/app/api/a2a/serve/[agentId]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { validateUrlWithDNS } from '@/lib/core/security/input-validation.server'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { markExecutionCancelled } from '@/lib/execution/cancellation'
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils'
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
import {
Expand Down Expand Up @@ -630,9 +631,11 @@ async function handleMessageStream(
}

const encoder = new TextEncoder()
let messageStreamDecremented = false

const stream = new ReadableStream({
async start(controller) {
incrementSSEConnections('a2a-message')
const sendEvent = (event: string, data: unknown) => {
try {
const jsonRpcResponse = {
Expand Down Expand Up @@ -841,9 +844,19 @@ async function handleMessageStream(
})
} finally {
await releaseLock(lockKey, lockValue)
if (!messageStreamDecremented) {
messageStreamDecremented = true
decrementSSEConnections('a2a-message')
}
controller.close()
}
},
cancel() {
if (!messageStreamDecremented) {
messageStreamDecremented = true
decrementSSEConnections('a2a-message')
}
},
})

return new NextResponse(stream, {
Expand Down Expand Up @@ -1016,16 +1029,34 @@ async function handleTaskResubscribe(
let pollTimeoutId: ReturnType<typeof setTimeout> | null = null

const abortSignal = request.signal
abortSignal.addEventListener('abort', () => {
abortSignal.addEventListener(
'abort',
() => {
isCancelled = true
if (pollTimeoutId) {
clearTimeout(pollTimeoutId)
pollTimeoutId = null
}
},
{ once: true }
)

let sseDecremented = false
const cleanup = () => {
isCancelled = true
if (pollTimeoutId) {
clearTimeout(pollTimeoutId)
pollTimeoutId = null
}
})
if (!sseDecremented) {
sseDecremented = true
decrementSSEConnections('a2a-resubscribe')
}
}

const stream = new ReadableStream({
async start(controller) {
incrementSSEConnections('a2a-resubscribe')
const sendEvent = (event: string, data: unknown): boolean => {
if (isCancelled || abortSignal.aborted) return false
try {
Expand All @@ -1041,14 +1072,6 @@ async function handleTaskResubscribe(
}
}

const cleanup = () => {
isCancelled = true
if (pollTimeoutId) {
clearTimeout(pollTimeoutId)
pollTimeoutId = null
}
}

if (
!sendEvent('status', {
kind: 'status',
Expand Down Expand Up @@ -1160,11 +1183,7 @@ async function handleTaskResubscribe(
poll()
},
cancel() {
isCancelled = true
if (pollTimeoutId) {
clearTimeout(pollTimeoutId)
pollTimeoutId = null
}
cleanup()
},
})

Expand Down
45 changes: 34 additions & 11 deletions apps/sim/app/api/mcp/events/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { getSession } from '@/lib/auth'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { mcpConnectionManager } from '@/lib/mcp/connection-manager'
import { mcpPubSub } from '@/lib/mcp/pubsub'
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'

const logger = createLogger('McpEventsSSE')
Expand Down Expand Up @@ -41,10 +42,24 @@ export async function GET(request: NextRequest) {

const encoder = new TextEncoder()
const unsubscribers: Array<() => void> = []
let cleaned = false

const cleanup = () => {
if (cleaned) return
cleaned = true
for (const unsub of unsubscribers) {
unsub()
}
decrementSSEConnections('mcp-events')
logger.info(`SSE connection closed for workspace ${workspaceId}`)
}

const stream = new ReadableStream({
start(controller) {
incrementSSEConnections('mcp-events')

const send = (eventName: string, data: Record<string, unknown>) => {
if (cleaned) return
try {
controller.enqueue(
encoder.encode(`event: ${eventName}\ndata: ${JSON.stringify(data)}\n\n`)
Expand Down Expand Up @@ -82,6 +97,10 @@ export async function GET(request: NextRequest) {

// Heartbeat to keep the connection alive
const heartbeat = setInterval(() => {
if (cleaned) {
clearInterval(heartbeat)
return
}
try {
controller.enqueue(encoder.encode(': heartbeat\n\n'))
} catch {
Expand All @@ -91,20 +110,24 @@ export async function GET(request: NextRequest) {
unsubscribers.push(() => clearInterval(heartbeat))

// Cleanup when client disconnects
request.signal.addEventListener('abort', () => {
for (const unsub of unsubscribers) {
unsub()
}
try {
controller.close()
} catch {
// Already closed
}
logger.info(`SSE connection closed for workspace ${workspaceId}`)
})
request.signal.addEventListener(
'abort',
() => {
cleanup()
try {
controller.close()
} catch {
// Already closed
}
},
{ once: true }
)

logger.info(`SSE connection opened for workspace ${workspaceId}`)
},
cancel() {
cleanup()
},
})

return new Response(stream, { headers: SSE_HEADERS })
Expand Down
15 changes: 15 additions & 0 deletions apps/sim/app/api/wand/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing'
import { env } from '@/lib/core/config/env'
import { getCostMultiplier, isBillingEnabled } from '@/lib/core/config/feature-flags'
import { generateRequestId } from '@/lib/core/utils/request'
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
import { enrichTableSchema } from '@/lib/table/llm/wand'
import { verifyWorkspaceMembership } from '@/app/api/workflows/utils'
import { extractResponseText, parseResponsesUsage } from '@/providers/openai/utils'
Expand Down Expand Up @@ -330,10 +331,14 @@ export async function POST(req: NextRequest) {
const encoder = new TextEncoder()
const decoder = new TextDecoder()

let wandStreamClosed = false
const readable = new ReadableStream({
async start(controller) {
incrementSSEConnections('wand')
const reader = response.body?.getReader()
if (!reader) {
wandStreamClosed = true
decrementSSEConnections('wand')
controller.close()
return
}
Expand Down Expand Up @@ -478,6 +483,16 @@ export async function POST(req: NextRequest) {
controller.close()
} finally {
reader.releaseLock()
if (!wandStreamClosed) {
wandStreamClosed = true
decrementSSEConnections('wand')
}
}
},
cancel() {
if (!wandStreamClosed) {
wandStreamClosed = true
decrementSSEConnections('wand')
}
},
})
Expand Down
11 changes: 11 additions & 0 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/ev
import { processInputFileFields } from '@/lib/execution/files'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
import {
cleanupExecutionBase64Cache,
hydrateUserFilesWithBase64,
Expand Down Expand Up @@ -763,6 +764,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const encoder = new TextEncoder()
const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync)
let isStreamClosed = false
let sseDecremented = false

const eventWriter = createExecutionEventWriter(executionId)
setExecutionMeta(executionId, {
Expand All @@ -773,6 +775,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:

const stream = new ReadableStream<Uint8Array>({
async start(controller) {
incrementSSEConnections('workflow-execute')
let finalMetaStatus: 'complete' | 'error' | 'cancelled' | null = null

const sendEvent = (event: ExecutionEvent) => {
Expand Down Expand Up @@ -1147,6 +1150,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
if (executionId) {
await cleanupExecutionBase64Cache(executionId)
}
if (!sseDecremented) {
sseDecremented = true
decrementSSEConnections('workflow-execute')
}
if (!isStreamClosed) {
try {
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
Expand All @@ -1158,6 +1165,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
cancel() {
isStreamClosed = true
logger.info(`[${requestId}] Client disconnected from SSE stream`)
if (!sseDecremented) {
sseDecremented = true
decrementSSEConnections('workflow-execute')
}
},
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
getExecutionMeta,
readExecutionEvents,
} from '@/lib/execution/event-buffer'
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
import { formatSSEEvent } from '@/lib/workflows/executor/execution-events'
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'

Expand Down Expand Up @@ -73,8 +74,10 @@ export async function GET(

let closed = false

let sseDecremented = false
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
incrementSSEConnections('execution-stream-reconnect')
let lastEventId = fromEventId
const pollDeadline = Date.now() + MAX_POLL_DURATION_MS

Expand Down Expand Up @@ -142,11 +145,20 @@ export async function GET(
controller.close()
} catch {}
}
} finally {
if (!sseDecremented) {
sseDecremented = true
decrementSSEConnections('execution-stream-reconnect')
}
}
},
cancel() {
closed = true
logger.info('Client disconnected from reconnection stream', { executionId })
if (!sseDecremented) {
sseDecremented = true
decrementSSEConnections('execution-stream-reconnect')
}
},
})

Expand Down
3 changes: 3 additions & 0 deletions apps/sim/instrumentation-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,7 @@ async function initializeOpenTelemetry() {

export async function register() {
await initializeOpenTelemetry()

const { startMemoryTelemetry } = await import('./lib/monitoring/memory-telemetry')
startMemoryTelemetry()
}
9 changes: 9 additions & 0 deletions apps/sim/lib/copilot/orchestrator/stream-buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,15 @@ export function createStreamEventWriter(streamId: string): StreamEventWriter {
error: error instanceof Error ? error.message : String(error),
})
pending = batch.concat(pending)
if (pending.length > config.eventLimit) {
const dropped = pending.length - config.eventLimit
pending = pending.slice(-config.eventLimit)
logger.warn('Dropped oldest pending stream events due to sustained Redis failure', {
streamId,
dropped,
remaining: pending.length,
})
}
}
}

Expand Down
10 changes: 10 additions & 0 deletions apps/sim/lib/execution/event-buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const EVENT_LIMIT = 1000
const RESERVE_BATCH = 100
const FLUSH_INTERVAL_MS = 15
const FLUSH_MAX_BATCH = 200
const MAX_PENDING_EVENTS = 1000

function getEventsKey(executionId: string) {
return `${REDIS_PREFIX}${executionId}:events`
Expand Down Expand Up @@ -184,6 +185,15 @@ export function createExecutionEventWriter(executionId: string): ExecutionEventW
stack: error instanceof Error ? error.stack : undefined,
})
pending = batch.concat(pending)
if (pending.length > MAX_PENDING_EVENTS) {
const dropped = pending.length - MAX_PENDING_EVENTS
pending = pending.slice(-MAX_PENDING_EVENTS)
logger.warn('Dropped oldest pending events due to sustained Redis failure', {
executionId,
dropped,
remaining: pending.length,
})
}
}
}

Expand Down
8 changes: 7 additions & 1 deletion apps/sim/lib/execution/isolated-vm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -716,9 +716,15 @@ function spawnWorker(): Promise<WorkerInfo> {

proc.on('message', (message: unknown) => handleWorkerMessage(workerId, message))

const MAX_STDERR_SIZE = 64 * 1024
let stderrData = ''
proc.stderr?.on('data', (data: Buffer) => {
stderrData += data.toString()
if (stderrData.length < MAX_STDERR_SIZE) {
stderrData += data.toString()
if (stderrData.length > MAX_STDERR_SIZE) {
stderrData = stderrData.slice(0, MAX_STDERR_SIZE)
}
}
})

const startTimeout = setTimeout(() => {
Expand Down
Loading