Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 114 additions & 94 deletions cmd2/cmd2.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@
)

import rich.box
from prompt_toolkit import print_formatted_text
from prompt_toolkit import (
filters,
print_formatted_text,
)
from prompt_toolkit.application import get_app
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from prompt_toolkit.completion import Completer, DummyCompleter
Expand Down Expand Up @@ -136,6 +139,8 @@
CompletionError,
EmbeddedConsoleExit,
EmptyStatement,
IncompleteStatement,
MacroError,
PassThroughException,
RedirectionError,
SkipPostcommandHooks,
Expand Down Expand Up @@ -200,6 +205,7 @@ def __init__(self, msg: str = '') -> None:
if TYPE_CHECKING: # pragma: no cover
StaticArgParseBuilder = staticmethod[[], argparse.ArgumentParser]
ClassArgParseBuilder = classmethod['Cmd' | CommandSet, [], argparse.ArgumentParser]
from prompt_toolkit.buffer import Buffer
else:
StaticArgParseBuilder = staticmethod
ClassArgParseBuilder = classmethod
Expand Down Expand Up @@ -510,12 +516,6 @@ def __init__(
# Used to keep track of whether we are redirecting or piping output
self._redirecting = False

# Used to keep track of whether a continuation prompt is being displayed
self._at_continuation_prompt = False

# The multiline command currently being typed which is used to complete multiline commands.
self._multiline_in_progress = ''

# Characters used to draw a horizontal rule. Should not be blank.
self.ruler = "─"

Expand Down Expand Up @@ -643,6 +643,39 @@ def __init__(
# the current command being executed
self.current_command: Statement | None = None

def _should_continue_multiline(self) -> bool:
"""Return whether prompt-toolkit should continue prompting the user for a multiline command."""
buffer: Buffer = get_app().current_buffer
line: str = buffer.text

used_macros = []

# Continue until all macros are resolved
while True:
try:
statement = self._check_statement_complete(line)
except IncompleteStatement:
# The statement (or the resolved macro) is incomplete.
# Keep prompting the user.
return True

except (Cmd2ShlexError, EmptyStatement):
# These are "finished" states (even if they are errors).
# Submit so the main loop can handle the exception.
return False

# Check if this command matches a macro and wasn't already processed to avoid an infinite loop
if statement.command in self.macros and statement.command not in used_macros:
used_macros.append(statement.command)
try:
line = self._resolve_macro(statement)
except MacroError:
# Resolve failed. Submit to let the main loop handle the error.
return False
else:
# No macro found or already processed. The statement is complete.
return False

def _create_main_session(self, auto_suggest: bool, completekey: str) -> PromptSession[str]:
"""Create and return the main PromptSession for the application.

Expand Down Expand Up @@ -671,9 +704,11 @@ def _(event: Any) -> None: # pragma: no cover
"complete_in_thread": True,
"complete_while_typing": False,
"completer": Cmd2Completer(self),
"history": Cmd2History(self),
"history": Cmd2History(item.raw for item in self.history),
"key_bindings": key_bindings,
"lexer": Cmd2Lexer(self),
"multiline": filters.Condition(self._should_continue_multiline),
"prompt_continuation": self.continuation_prompt,
"rprompt": self.get_rprompt,
}

Expand Down Expand Up @@ -2369,25 +2404,15 @@ def complete(
:return: a Completions object
"""
try:
# Check if we are completing a multiline command
if self._at_continuation_prompt:
# lstrip and prepend the previously typed portion of this multiline command
lstripped_previous = self._multiline_in_progress.lstrip()
line = lstripped_previous + line

# Increment the indexes to account for the prepended text
begidx = len(lstripped_previous) + begidx
endidx = len(lstripped_previous) + endidx
else:
# lstrip the original line
orig_line = line
line = orig_line.lstrip()
num_stripped = len(orig_line) - len(line)
# lstrip the original line
orig_line = line
line = orig_line.lstrip()
num_stripped = len(orig_line) - len(line)

# Calculate new indexes for the stripped line. If the cursor is at a position before the end of a
# line of spaces, then the following math could result in negative indexes. Enforce a max of 0.
begidx = max(begidx - num_stripped, 0)
endidx = max(endidx - num_stripped, 0)
# Calculate new indexes for the stripped line. If the cursor is at a position before the end of a
# line of spaces, then the following math could result in negative indexes. Enforce a max of 0.
begidx = max(begidx - num_stripped, 0)
endidx = max(endidx - num_stripped, 0)

# Shortcuts are not word break characters when completing. Therefore, shortcuts become part
# of the text variable if there isn't a word break, like a space, after it. We need to remove it
Expand Down Expand Up @@ -2843,6 +2868,36 @@ def runcmds_plus_hooks(

return False

def _check_statement_complete(self, line: str) -> Statement:
"""Check if the given line is a complete statement.

:param line: the current input string to check
:return: the completed Statement
:raises Cmd2ShlexError: if a shlex error occurs on a non-multiline command
:raises IncompleteStatement: if more input is needed for multiline
:raises EmptyStatement: if the command is blank
"""
try:
statement = self.statement_parser.parse(line)

# Check if we have a finished multiline command or a standard command
if (statement.multiline_command and statement.terminator) or not statement.multiline_command:
if not statement.command:
raise EmptyStatement
return statement

except Cmd2ShlexError:
# Check if the error is occurring within a multiline command
partial_statement = self.statement_parser.parse_command_only(line)
if not partial_statement.multiline_command:
# It's a standard command with a quoting error, raise it
raise

# If we reached here, the statement is incomplete:
# - Multiline command missing a terminator
# - Multiline command with an unclosed quotation mark
raise IncompleteStatement

def _complete_statement(self, line: str) -> Statement:
"""Keep accepting lines of input until the command is complete.

Expand All @@ -2853,52 +2908,22 @@ def _complete_statement(self, line: str) -> Statement:
"""
while True:
try:
statement = self.statement_parser.parse(line)
if statement.multiline_command and statement.terminator:
# we have a completed multiline command, we are done
break
if not statement.multiline_command:
# it's not a multiline command, but we parsed it ok
# so we are done
break
except Cmd2ShlexError:
# we have an unclosed quotation mark, let's parse only the command
# and see if it's a multiline
partial_statement = self.statement_parser.parse_command_only(line)
if not partial_statement.multiline_command:
# not a multiline command, so raise the exception
raise

# if we get here we must have:
# - a multiline command with no terminator
# - a multiline command with unclosed quotation marks
try:
self._at_continuation_prompt = True

# Save the command line up to this point for completion
self._multiline_in_progress = line + '\n'

# Get next line of this command
return self._check_statement_complete(line)
except IncompleteStatement: # noqa: PERF203
# If incomplete, we need to fetch the next line
try:
nextline = self._read_command_line(self.continuation_prompt)
except EOFError:
# Add a blank line, which serves as a command terminator.
nextline = '\n'
self.poutput(nextline)

line += f'\n{nextline}'

except KeyboardInterrupt:
self.poutput('^C')
statement = self.statement_parser.parse('')
break
finally:
self._at_continuation_prompt = False
try:
nextline = self._read_command_line(self.continuation_prompt)
except EOFError:
# Add a blank line, which serves as a command terminator.
nextline = '\n'
self.poutput(nextline)

if not statement.command:
raise EmptyStatement
line += f'\n{nextline}'

return statement
except KeyboardInterrupt:
self.poutput('^C')
raise EmptyStatement from None

def _input_line_to_statement(self, line: str) -> Statement:
"""Parse the user's input line and convert it to a Statement, ensuring that all macros are also resolved.
Expand All @@ -2913,7 +2938,7 @@ def _input_line_to_statement(self, line: str) -> Statement:

# Continue until all macros are resolved
while True:
# Make sure all input has been read and convert it to a Statement
# Get a complete statement (handling multiline input)
statement = self._complete_statement(line)

# If this is the first loop iteration, save the original line
Expand All @@ -2923,28 +2948,30 @@ def _input_line_to_statement(self, line: str) -> Statement:
# Check if this command matches a macro and wasn't already processed to avoid an infinite loop
if statement.command in self.macros and statement.command not in used_macros:
used_macros.append(statement.command)
resolve_result = self._resolve_macro(statement)
if resolve_result is None:
raise EmptyStatement
line = resolve_result
try:
line = self._resolve_macro(statement)
except MacroError as ex:
self.perror(ex)
raise EmptyStatement from None
else:
# No macro found or already processed. The statement is complete.
break

# If a macro was expanded, the 'statement' now contains the expanded text.
# We need to swap the 'raw' attribute back to the string the user typed
# so history shows the original line.
# Restore original 'raw' text if a macro was expanded
if orig_line != statement.raw:
statement_dict = statement.to_dict()
statement_dict["raw"] = orig_line
statement = Statement.from_dict(statement_dict)

return statement

def _resolve_macro(self, statement: Statement) -> str | None:
def _resolve_macro(self, statement: Statement) -> str:
"""Resolve a macro and return the resulting string.

:param statement: the parsed statement from the command line
:return: the resolved macro or None on error
:return: the resolved macro string
:raises KeyError: if its not a macro
:raises MacroError: if the macro cannot be resolved (e.g. not enough args)
"""
if statement.command not in self.macros:
raise KeyError(f"{statement.command} is not a macro")
Expand All @@ -2954,8 +2981,7 @@ def _resolve_macro(self, statement: Statement) -> str | None:
# Make sure enough arguments were passed in
if len(statement.arg_list) < macro.minimum_arg_count:
plural = '' if macro.minimum_arg_count == 1 else 's'
self.perror(f"The macro '{statement.command}' expects at least {macro.minimum_arg_count} argument{plural}")
return None
raise MacroError(f"The macro '{statement.command}' expects at least {macro.minimum_arg_count} argument{plural}")

# Resolve the arguments in reverse and read their values from statement.argv since those
# are unquoted. Macro args should have been quoted when the macro was created.
Expand Down Expand Up @@ -3399,25 +3425,18 @@ def _process_alerts(self) -> None:
# Clear the alerts
self._alert_queue.clear()

if alert_text:
if not self._at_continuation_prompt and latest_prompt is not None:
# Update prompt now so patch_stdout can redraw it immediately.
self.prompt = latest_prompt
if latest_prompt is not None:
# Update prompt so patch_stdout() or get_app().invalidate() can redraw it.
self.prompt = latest_prompt

if alert_text:
# Print the alert messages above the prompt.
with patch_stdout():
print_formatted_text(pt_filter_style(alert_text))

if self._at_continuation_prompt and latest_prompt is not None:
# Update state only. The onscreen prompt won't change until the next prompt starts.
self.prompt = latest_prompt

elif latest_prompt is not None:
self.prompt = latest_prompt

# Refresh UI immediately unless at a continuation prompt.
if not self._at_continuation_prompt:
get_app().invalidate()
# Refresh UI immediately to show the new prompt
get_app().invalidate()

def _read_command_line(self, prompt: str) -> str:
"""Read the next command line from the input stream.
Expand Down Expand Up @@ -4993,6 +5012,7 @@ def do_history(self, args: argparse.Namespace) -> bool | None:

# Clear command and prompt-toolkit history
self.history.clear()
cast(Cmd2History, self.main_session.history).clear()

if self.persistent_history_file:
try:
Expand Down
8 changes: 8 additions & 0 deletions cmd2/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,13 @@ class EmptyStatement(Exception): # noqa: N818
"""Custom exception class for handling behavior when the user just presses <Enter>."""


class IncompleteStatement(Exception): # noqa: N818
"""Raised when more input is required to complete a multiline statement."""


class MacroError(Exception):
"""Raised when a macro fails to resolve (e.g., insufficient arguments)."""


class RedirectionError(Exception):
"""Custom exception class for when redirecting or piping output fails."""
Loading
Loading