Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,41 @@ describe('ConcurrentOutput', () => {
expect(logColumns[1]?.length).toBe(25 + 2)
})

test('renders large chunks split into batches without dropping lines', async () => {
// Given - simulate a large stack trace (>100 lines) arriving as a single write
const processSync = new Synchronizer()
const lineCount = 250
const largeOutput = Array.from({length: lineCount}, (_, i) => `line ${i + 1}`).join('\n')

const processes = [
{
prefix: 'pos-ext',
action: async (stdout: Writable, _stderr: Writable, _signal: AbortSignal) => {
stdout.write(largeOutput)
processSync.resolve()
},
},
]

// When - keepRunningAfterProcessesResolve prevents the component from unmounting
// before all setImmediate-batched state updates have been applied
const renderInstance = render(
<ConcurrentOutput
processes={processes}
abortSignal={new AbortController().signal}
keepRunningAfterProcessesResolve
/>,
)
await processSync.promise
await waitForContent(renderInstance, `line ${lineCount}`)

// Then - all lines should be rendered
const frame = unstyled(renderInstance.lastFrame()!)
for (let i = 1; i <= lineCount; i++) {
expect(frame).toContain(`line ${i}`)
}
})

test('rejects with the error thrown inside one of the processes', async () => {
// Given
const backendProcess = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import {Box, Static, Text, TextProps, useApp} from 'ink'
import figures from 'figures'
import stripAnsi from 'strip-ansi'

import {Writable} from 'stream'
import {Transform, Writable} from 'stream'
import {AsyncLocalStorage} from 'node:async_hooks'

const MAX_LINES_PER_BATCH = 20

export interface ConcurrentOutputProps {
processes: OutputProcess[]
prefixColumnSize?: number
Expand Down Expand Up @@ -132,27 +134,57 @@ const ConcurrentOutput: FunctionComponent<ConcurrentOutputProps> = ({

const writableStream = useCallback(
(process: OutputProcess, prefixes: string[]) => {
return new Writable({
write(chunk, _encoding, next) {
// Transform: splits incoming chunks into MAX_LINES_PER_BATCH-line pieces.
// Runs synchronously inside the writer's async context, so outputContextStore
// (prefix, stripAnsi overrides set by useConcurrentOutputContext) is available here.
const splitter = new Transform({
readableObjectMode: true,
transform(chunk, _encoding, callback) {
const context = outputContextStore.getStore()
const prefix = context?.outputPrefix ?? process.prefix
const shouldStripAnsi = context?.stripAnsi ?? true
const log = chunk.toString('utf8').replace(/(\n)$/, '')
const allLines = shouldStripAnsi ? stripAnsi(log).split(/\n/) : log.split(/\n/)
// Flag batches that came from a large chunk so the sink knows to yield
// between them. Single-batch writes keep synchronous next() to preserve
// existing behaviour for normal (small) output.
const isLargeChunk = allLines.length > MAX_LINES_PER_BATCH
for (let i = 0; i < allLines.length; i += MAX_LINES_PER_BATCH) {
this.push({prefix, lines: allLines.slice(i, i + MAX_LINES_PER_BATCH), isLargeChunk})
}
callback()
},
})

// Writable: renders each batch into React state.
// For large-chunk batches, setImmediate(next) yields the event loop so keyboard
// shortcuts (q, p) can fire between renders, and creates real Node.js backpressure:
// when next() is pending the pipe pauses the splitter, preventing unbounded
// memory growth from fast producers.
// For normal output (single-batch writes) next() is called synchronously to
// preserve the existing rendering behaviour.
const sink = new Writable({
objectMode: true,
write(
{prefix, lines, isLargeChunk}: {prefix: string; lines: string[]; isLargeChunk: boolean},
_encoding,
next,
) {
const index = addPrefix(prefix, prefixes)

const lines = shouldStripAnsi ? stripAnsi(log).split(/\n/) : log.split(/\n/)
setProcessOutput((previousProcessOutput) => [
...previousProcessOutput,
{
color: lineColor(index),
prefix,
lines,
},
{color: lineColor(index), prefix, lines},
])
next()
if (isLargeChunk) {
setImmediate(next)
} else {
next()
}
},
})

splitter.pipe(sink)
return splitter
},
[lineColor],
)
Expand Down
Loading