Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 54 additions & 7 deletions lib/crewai/src/crewai/crew.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import (
TYPE_CHECKING,
Any,
Literal,
cast,
)
import uuid
Expand Down Expand Up @@ -106,6 +107,7 @@ def get_supported_content_types(provider: str, api: str | None = None) -> list[s
from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
aggregate_raw_outputs_from_tasks,
aggregate_summarized_outputs_from_task_outputs,
)
from crewai.utilities.i18n import get_i18n
from crewai.utilities.llm_utils import create_llm
Expand Down Expand Up @@ -259,6 +261,15 @@ class Crew(FlowTrackable, BaseModel):
default=None,
description="Path to the log file to be saved",
)
context_strategy: Literal["full", "summarized"] = Field(
default="full",
description=(
"Strategy for passing prior task outputs as context to subsequent tasks. "
"'full' (default) passes the complete raw output. "
"'summarized' condenses each output to 2-3 sentences before passing it, "
"which reduces token usage in long multi-task crews."
),
)
planning: bool | None = Field(
default=False,
description="Plan the crew execution and add the plan to the crew.",
Expand Down Expand Up @@ -1487,19 +1498,55 @@ def _update_manager_tools(
)
return tools

@staticmethod
def _get_context(task: Task, task_outputs: list[TaskOutput]) -> str:
def _get_context(self, task: Task, task_outputs: list[TaskOutput]) -> str:
if not task.context:
return ""

return (
aggregate_raw_outputs_from_task_outputs(task_outputs)
if task.context is NOT_SPECIFIED
else aggregate_raw_outputs_from_tasks(task.context)
)
effective_strategy = task.context_strategy or self.context_strategy

if task.context is NOT_SPECIFIED:
if effective_strategy == "summarized":
return aggregate_summarized_outputs_from_task_outputs(task_outputs)
return aggregate_raw_outputs_from_task_outputs(task_outputs)
else:
if effective_strategy == "summarized":
explicit_outputs = [
t.output for t in task.context if t.output is not None
]
return aggregate_summarized_outputs_from_task_outputs(explicit_outputs)
return aggregate_raw_outputs_from_tasks(task.context)

def _generate_context_summary(self, task: Task, output: TaskOutput) -> None:
"""Generate a condensed summary of a task output for use as context.

Called after task completion when context_strategy='summarized' is active
at the crew or task level. Falls back silently on any LLM error.
"""
if task.agent is None or not hasattr(task.agent, "llm"):
return
try:
prompt = (
"Summarize the following task output in 2-3 concise sentences, "
"preserving the key facts and conclusions:\n\n"
f"{output.raw}"
)
summary = task.agent.llm.call(messages=prompt)
if isinstance(summary, str) and summary.strip():
output.context_summary = summary.strip()
except Exception:
pass # Fall back to raw output in aggregate_summarized_outputs_from_task_outputs

def _process_task_result(self, task: Task, output: TaskOutput) -> None:
role = task.agent.role if task.agent is not None else "None"

effective_strategy = (
task.context_strategy
if task.context_strategy is not None
else self.context_strategy
)
if effective_strategy == "summarized":
self._generate_context_summary(task, output)

if self.output_log_file:
self._file_handler.log(
task_name=task.name, # type: ignore[arg-type]
Expand Down
9 changes: 9 additions & 0 deletions lib/crewai/src/crewai/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from typing import (
Any,
ClassVar,
Literal,
cast,
get_args,
get_origin,
Expand Down Expand Up @@ -134,6 +135,14 @@ class Task(BaseModel):
description="Other tasks that will have their output used as context for this task.",
default=NOT_SPECIFIED,
)
context_strategy: Literal["full", "summarized"] | None = Field(
description=(
"Override the crew-level context_strategy for this task. "
"'full' uses raw outputs; 'summarized' uses condensed summaries. "
"Defaults to None (inherit from crew)."
),
default=None,
)
async_execution: bool | None = Field(
description="Whether the task should be executed asynchronously or not.",
default=False,
Expand Down
4 changes: 4 additions & 0 deletions lib/crewai/src/crewai/tasks/task_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ class TaskOutput(BaseModel):
description="Expected output of the task", default=None
)
summary: str | None = Field(description="Summary of the task", default=None)
context_summary: str | None = Field(
description="Condensed summary of the raw output used when context_strategy='summarized'",
default=None,
)
raw: str = Field(description="Raw output of the task", default="")
pydantic: BaseModel | None = Field(
description="Pydantic output of task", default=None
Expand Down
20 changes: 20 additions & 0 deletions lib/crewai/src/crewai/utilities/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,26 @@ def aggregate_raw_outputs_from_task_outputs(task_outputs: list[TaskOutput]) -> s
return DIVIDERS.join(output.raw for output in task_outputs)


def aggregate_summarized_outputs_from_task_outputs(
task_outputs: list[TaskOutput],
) -> str:
"""Generate condensed string context from summarized task outputs.

Falls back to raw output for any task that has no context_summary.

Args:
task_outputs: List of TaskOutput objects.

Returns:
A string containing the aggregated context summaries (or raw fallbacks).
"""
parts = [
output.context_summary if output.context_summary is not None else output.raw
for output in task_outputs
]
return DIVIDERS.join(parts)


def aggregate_raw_outputs_from_tasks(tasks: list[Task] | _NotSpecified) -> str:
"""Generate string context from the tasks.

Expand Down
Loading