diff --git a/mini_agent/streaming.py b/mini_agent/streaming.py new file mode 100644 index 00000000..3ae45144 --- /dev/null +++ b/mini_agent/streaming.py @@ -0,0 +1,168 @@ +"""Streaming output renderer with stable prefix and unstable suffix pattern. + +This module provides incremental rendering for long-running tasks like +npm test, pytest, etc., reducing visual flicker and improving UX. +""" + +import sys +from dataclasses import dataclass, field + + +@dataclass +class StreamingRenderer: + """Renders streaming output with stable prefix and unstable suffix. + + Lines that end with a newline are considered "stable" and will not + be re-rendered. The last line (without a trailing newline) is + considered "unstable" and will be updated on each render. + + This follows the Claude Code pattern for handling long-running + command output with minimal visual disruption. + """ + + stable_prefix: str = field(default="") + unstable_content: str = field(default="") + last_line_count: int = 0 + + def update(self, new_content: str) -> str: + """Process new content and return the portion that needs re-rendering. + + Args: + new_content: The complete accumulated content + + Returns: + The portion that should be re-rendered (unstable lines) + """ + if not new_content: + return "" + + # Split into lines + lines = new_content.split("\n") + + # Count complete lines (lines ending with \n are stable) + # Note: split("\n") adds empty string after trailing \n, so adjust accordingly + if new_content.endswith("\n"): + complete_lines = len(lines) - 1 # Exclude empty string after trailing \n + else: + complete_lines = len(lines) - 1 # Last line is incomplete + + # If we have fewer complete lines than before, content was reset + if complete_lines < self.last_line_count: + self.stable_prefix = "" + self.unstable_content = "" + self.last_line_count = 0 + lines = new_content.split("\n") + if new_content.endswith("\n"): + complete_lines = len(lines) - 1 + else: + complete_lines = len(lines) - 1 + # Handle the reset content directly and return + if complete_lines > 0: + self.stable_prefix = "\n".join(lines[:complete_lines]) + if self.stable_prefix: + self.stable_prefix += "\n" + if not new_content.endswith("\n"): + self.unstable_content = lines[-1] if lines else "" + else: + self.unstable_content = "" + self.last_line_count = complete_lines + return self.render() + + # Stable content: all complete lines + if complete_lines > 0: + # All lines except the last (potentially incomplete) one + self.stable_prefix = "\n".join(lines[:complete_lines]) + if complete_lines > 0 and self.stable_prefix: + self.stable_prefix += "\n" + + # Unstable content: the last line (may be growing or empty) + if not new_content.endswith("\n"): + # Last line is incomplete (still growing) + self.unstable_content = lines[-1] if lines else "" + else: + # All content is complete + self.unstable_content = "" + + self.last_line_count = complete_lines + + # Return what needs to be re-rendered + return self.render() + + def render(self) -> str: + """Return the complete rendered string. + + Returns: + Full rendered output with stable prefix and current unstable content + """ + result = self.stable_prefix + if self.unstable_content: + result += self.unstable_content + return result + + def clear(self) -> None: + """Clear all accumulated content.""" + self.stable_prefix = "" + self.unstable_content = "" + self.last_line_count = 0 + + +def render_streaming( + content: str, + renderer: StreamingRenderer | None = None, + clear_on_newline: bool = False, +) -> tuple[str, StreamingRenderer]: + """Helper function to render streaming content. + + Args: + content: New content to render + renderer: Existing renderer to update (creates new if None) + clear_on_newline: If True, clears screen and re-renders on newline + + Returns: + Tuple of (rendered_output, renderer_instance) + """ + if renderer is None: + renderer = StreamingRenderer() + + rendered = renderer.update(content) + + # Optionally handle terminal clear for newline detection + if clear_on_newline and "\n" in content: + # Move cursor to beginning of line and clear + sys.stdout.write("\r\033[K") + sys.stdout.flush() + + return rendered, renderer + + +def erase_lines(n: int) -> str: + """Generate ANSI escape sequence to erase n lines. + + Args: + n: Number of lines to erase + + Returns: + ANSI escape sequence + """ + return "\033[{}A\033[K".format(n) + + +def move_cursor_up(n: int) -> str: + """Generate ANSI escape sequence to move cursor up n lines. + + Args: + n: Number of lines to move up + + Returns: + ANSI escape sequence + """ + return "\033[{}A".format(n) + + +def clear_line() -> str: + """Generate ANSI escape sequence to clear current line. + + Returns: + ANSI escape sequence + """ + return "\r\033[K" diff --git a/tests/test_streaming.py b/tests/test_streaming.py new file mode 100644 index 00000000..e4d04b36 --- /dev/null +++ b/tests/test_streaming.py @@ -0,0 +1,209 @@ +"""Test cases for Streaming Renderer.""" + +import pytest + +from mini_agent.streaming import ( + StreamingRenderer, + clear_line, + erase_lines, + move_cursor_up, + render_streaming, +) + + +class TestStreamingRenderer: + """Tests for StreamingRenderer class.""" + + def test_empty_content(self): + """Test handling of empty content.""" + renderer = StreamingRenderer() + result = renderer.update("") + assert result == "" + assert renderer.stable_prefix == "" + assert renderer.unstable_content == "" + + def test_single_incomplete_line(self): + """Test a single line without newline (unstable).""" + renderer = StreamingRenderer() + result = renderer.update("test output") + assert result == "test output" + assert renderer.stable_prefix == "" + assert renderer.unstable_content == "test output" + + def test_single_complete_line(self): + """Test a single line with newline (stable).""" + renderer = StreamingRenderer() + result = renderer.update("test output\n") + assert result == "test output\n" + assert renderer.stable_prefix == "test output\n" + assert renderer.unstable_content == "" + + def test_multiple_lines_last_incomplete(self): + """Test multiple lines with last one incomplete.""" + renderer = StreamingRenderer() + result = renderer.update("line1\nline2\nline3") + assert result == "line1\nline2\nline3" + assert renderer.stable_prefix == "line1\nline2\n" + assert renderer.unstable_content == "line3" + + def test_multiple_lines_all_complete(self): + """Test multiple lines, all complete.""" + renderer = StreamingRenderer() + result = renderer.update("line1\nline2\nline3\n") + assert result == "line1\nline2\nline3\n" + assert renderer.stable_prefix == "line1\nline2\nline3\n" + assert renderer.unstable_content == "" + + def test_incremental_update(self): + """Test incremental updates to growing output.""" + renderer = StreamingRenderer() + + # Initial partial line + result1 = renderer.update("test") + assert result1 == "test" + assert renderer.unstable_content == "test" + assert renderer.stable_prefix == "" + + # Continue the line + result2 = renderer.update("test output") + assert result2 == "test output" + assert renderer.unstable_content == "test output" + + # Complete the line + result3 = renderer.update("test output\n") + assert result3 == "test output\n" + assert renderer.stable_prefix == "test output\n" + assert renderer.unstable_content == "" + + def test_incremental_with_multiple_lines(self): + """Test incremental updates with multiple lines.""" + renderer = StreamingRenderer() + + # First line complete + renderer.update("line1\n") + assert renderer.stable_prefix == "line1\n" + + # Second line incomplete + result = renderer.update("line1\nline2") + assert result == "line1\nline2" + assert renderer.stable_prefix == "line1\n" + assert renderer.unstable_content == "line2" + + # Second line completes + result = renderer.update("line1\nline2\n") + assert result == "line1\nline2\n" + assert renderer.stable_prefix == "line1\nline2\n" + assert renderer.unstable_content == "" + + def test_clear(self): + """Test clearing the renderer.""" + renderer = StreamingRenderer() + renderer.update("line1\nline2\n") + assert renderer.stable_prefix == "line1\nline2\n" + + renderer.clear() + assert renderer.stable_prefix == "" + assert renderer.unstable_content == "" + assert renderer.last_line_count == 0 + + def test_render_returns_full_content(self): + """Test that render() returns complete content.""" + renderer = StreamingRenderer() + renderer.update("line1\nline2\nline3") + + # render() should return same as last update() + result = renderer.render() + assert result == "line1\nline2\nline3" + + def test_content_reset_detection(self): + """Test detection of content reset (fewer lines than before).""" + renderer = StreamingRenderer() + + # Start with multiple lines + renderer.update("line1\nline2\nline3\n") + assert renderer.last_line_count == 3 + + # Simulate content reset - new content has fewer lines + renderer.update("new1\n") + assert renderer.stable_prefix == "new1\n" + assert renderer.unstable_content == "" + + def test_preserves_stable_lines(self): + """Test that stable lines are never re-rendered.""" + renderer = StreamingRenderer() + + # Build up stable content + renderer.update("stable1\n") + assert renderer.stable_prefix == "stable1\n" + + renderer.update("stable1\nstable2\n") + assert renderer.stable_prefix == "stable1\nstable2\n" + + # Update unstable line multiple times + renderer.update("stable1\nstable2\ngrowing...") + renderer.update("stable1\nstable2\ngrowing... more") + renderer.update("stable1\nstable2\ngrowing... complete") + + # Stable prefix should be unchanged + assert renderer.stable_prefix == "stable1\nstable2\n" + + def test_finalize_with_newline(self): + """Test finalizing unstable line with newline.""" + renderer = StreamingRenderer() + + renderer.update("test") + assert renderer.unstable_content == "test" + + renderer.update("test\n") + assert renderer.unstable_content == "" + assert renderer.stable_prefix == "test\n" + + +class TestHelperFunctions: + """Tests for helper functions.""" + + def test_clear_line(self): + """Test clear_line function.""" + result = clear_line() + assert result == "\r\033[K" + + def test_move_cursor_up(self): + """Test move_cursor_up function.""" + result = move_cursor_up(3) + assert result == "\033[3A" + + def test_erase_lines(self): + """Test erase_lines function.""" + result = erase_lines(5) + assert result == "\033[5A\033[K" + + +class TestRenderStreamingHelper: + """Tests for render_streaming helper function.""" + + def test_create_new_renderer(self): + """Test creating renderer via helper.""" + result, renderer = render_streaming("test content\n") + assert result == "test content\n" + assert isinstance(renderer, StreamingRenderer) + + def test_update_existing_renderer(self): + """Test updating existing renderer.""" + renderer = StreamingRenderer() + renderer.update("existing\n") + + result, returned_renderer = render_streaming("existing\nupdate", renderer=renderer) + assert returned_renderer is renderer + assert result == "existing\nupdate" + + def test_incremental_rendering(self): + """Test incremental rendering with helper.""" + renderer = None + + # Simulate streaming output + chunks = ["test", "ing s", "tream", "ing\n", "line2\n", "line3"] + + for chunk in chunks: + output, renderer = render_streaming(chunk, renderer=renderer) + # Just verify no errors occur + assert renderer is not None