diff --git a/skills/update-a2a-proto/SKILL.md b/.agents/skills/update-a2a-proto/SKILL.md similarity index 100% rename from skills/update-a2a-proto/SKILL.md rename to .agents/skills/update-a2a-proto/SKILL.md diff --git a/.claude/architecture/EVENTQUEUE.md b/.claude/architecture/EVENTQUEUE.md new file mode 100644 index 000000000..a72ece048 --- /dev/null +++ b/.claude/architecture/EVENTQUEUE.md @@ -0,0 +1,151 @@ +# EventQueue Architecture - A2A Java SDK + +> **Quick Reference** for event processing, queue management, and task lifecycle + +## Overview + +The EventQueue architecture guarantees: +1. **Events persist BEFORE clients see them** (no unpersisted events visible) +2. **Serial processing** eliminates concurrent update race conditions +3. **Task state drives queue lifecycle** (fire-and-forget support, late reconnections) + +## Architecture Diagram + +``` +AgentExecutor.execute() [YOUR CODE] + ↓ +AgentEmitter → MainQueue.enqueueEvent() + ↓ +MainEventBus.submit() [ALL events queue here FIRST] + ↓ +MainEventBusProcessor.take() [single background thread] + ↓ +1. TaskStore.save() FIRST ← Persist before visibility +2. PushNotificationSender.send() +3. MainQueue.distributeToChildren() ← Clients see LAST + ↓ +ChildQueue → EventConsumer → ResultAggregator → Client +``` + +**Key Insight**: All events flow through a single-threaded processor that persists events BEFORE distributing to clients. + +--- + +## Core Components + +### MainEventBus +**Location**: `server-common/.../events/MainEventBus.java` + +- `@ApplicationScoped` CDI bean - single instance shared by all MainQueues +- `LinkedBlockingDeque` - thread-safe centralized queue +- `submit(taskId, eventQueue, item)` - enqueue events (called by MainQueue) +- `take()` - blocking consumption (called by MainEventBusProcessor) + +**Guarantees**: Events persist BEFORE distribution, serial processing, push notifications AFTER persistence + +### MainEventBusProcessor +**Location**: `server-common/.../events/MainEventBusProcessor.java` + +Single background thread "MainEventBusProcessor" that processes events in order: +1. `TaskManager.process(event)` → persist to TaskStore +2. `PushNotificationSender.send()` → notifications +3. `mainQueue.distributeToChildren()` → clients receive + +**Exception Handling**: Converts `TaskStoreException` to `InternalError` events, continues processing + +### EventQueue System +**Location**: `server-common/.../events/EventQueue.java` + +**Queue Types**: +- **MainQueue**: No local queue - events submit directly to MainEventBus +- **ChildQueue**: Has local queue for client consumption + +**Characteristics**: Bounded (1000 events), thread-safe, graceful shutdown, hook support + +### QueueManager +**Location**: `server-common/.../events/QueueManager.java` + +- `createOrTap(taskId)` → Get existing MainQueue or create new +- `tap(taskId)` → Create ChildQueue for existing MainQueue +- **Default**: InMemoryQueueManager (thread-safe ConcurrentHashMap) +- **Replicated**: ReplicatedQueueManager (Kafka-based) + +### EventConsumer & ResultAggregator +**Locations**: `server-common/.../events/EventConsumer.java`, `server-common/.../tasks/ResultAggregator.java` + +**EventConsumer**: Polls queue, returns `Flow.Publisher`, closes queue on final event + +**ResultAggregator** bridges EventConsumer and DefaultRequestHandler: +- `consumeAndBreakOnInterrupt()` - Non-streaming (polls until terminal/AUTH_REQUIRED) +- `consumeAndEmit()` - Streaming (returns Flow.Publisher immediately) +- `consumeAll()` - Simple consumption + +--- + +## Key Concepts + +### Queue Structure +- MainQueue has NO local queue (events → MainEventBus directly) +- Only ChildQueues have local queues +- `MainQueue.dequeueEventItem()` throws `UnsupportedOperationException` +- `MainQueue.size()` returns `mainEventBus.size()` +- `ChildQueue.size()` returns local queue size + +### Terminal Events +Events that cause polling loop exit: +- `TaskStatusUpdateEvent` with `isFinal() == true` +- `Message` (legacy) +- `Task` with state: COMPLETED, CANCELED, FAILED, REJECTED, UNKNOWN + +### AUTH_REQUIRED Special Case +- Returns task to client immediately +- Agent continues in background +- Queue stays open, async cleanup +- Future events update TaskStore + +--- + +## Deep Dives + +For detailed documentation on specific aspects: + +- **[Queue Lifecycle & Two-Level Protection](eventqueue/LIFECYCLE.md)** + - THE BIG IDEA: fire-and-forget, late reconnections + - TaskStateProvider interface and state-driven cleanup + - Memory management and cleanup modes + +- **[Request Flows](eventqueue/FLOWS.md)** + - Non-streaming vs streaming flows + - DefaultRequestHandler orchestration + - Background cleanup patterns + +- **[Usage Scenarios & Pitfalls](eventqueue/SCENARIOS.md)** + - Fire-and-forget pattern (TCK) + - Late resubscription scenarios + - Tapping and multiple consumers + - Common mistakes to avoid + +--- + +## Key Files Reference + +| Component | Path | +|-----------|------| +| MainEventBus | `server-common/.../events/MainEventBus.java` | +| MainEventBusProcessor | `server-common/.../events/MainEventBusProcessor.java` | +| EventQueue | `server-common/.../events/EventQueue.java` | +| QueueManager | `server-common/.../events/QueueManager.java` | +| InMemoryQueueManager | `server-common/.../events/InMemoryQueueManager.java` | +| EventConsumer | `server-common/.../events/EventConsumer.java` | +| ResultAggregator | `server-common/.../tasks/ResultAggregator.java` | +| DefaultRequestHandler | `server-common/.../requesthandlers/DefaultRequestHandler.java` | +| TaskStateProvider | `server-common/.../tasks/TaskStateProvider.java` | +| AgentEmitter | `server-common/.../tasks/AgentEmitter.java` | + +--- + +## Related Documentation + +- **Main Architecture**: `AGENTS.md` - High-level system overview +- **Task Persistence**: See TaskStore exception handling in main docs +- **Replication**: `extras/queue-manager-replicated/README.md` diff --git a/.claude/architecture/eventqueue/FLOWS.md b/.claude/architecture/eventqueue/FLOWS.md new file mode 100644 index 000000000..d2906a1f5 --- /dev/null +++ b/.claude/architecture/eventqueue/FLOWS.md @@ -0,0 +1,228 @@ +# Request Flows - EventQueue Processing + +> Deep-dive on streaming vs non-streaming request handling + +## Non-Streaming Flow (`onMessageSend()`) + +**Location**: `DefaultRequestHandler.java` + +``` +1. initMessageSend() + → Create TaskManager & RequestContext + +2. queueManager.createOrTap(taskId) + → Get/create EventQueue (MainQueue or ChildQueue) + +3. registerAndExecuteAgentAsync() + → Start AgentExecutor in background thread + +4. resultAggregator.consumeAndBreakOnInterrupt(consumer) + → Poll queue until terminal event or AUTH_REQUIRED + → Blocking wait for events + +5. cleanup(queue, task, async) + → Close queue immediately OR in background + +6. Return Task/Message to client +``` + +### Terminal Events + +Events that cause polling loop exit: +- `TaskStatusUpdateEvent` with `isFinal() == true` +- `Message` (legacy) +- `Task` with state: COMPLETED, CANCELED, FAILED, REJECTED, UNKNOWN + +### AUTH_REQUIRED Special Case + +**Behavior**: +- Returns current task to client immediately +- Agent continues running in background +- Queue stays open, cleanup happens async +- Future events update TaskStore + +**Why**: Allows client to handle authentication prompt while agent waits for credentials. + +--- + +## Streaming Flow (`onMessageSendStream()`) + +**Location**: `DefaultRequestHandler.java` + +``` +1. initMessageSend() + → Same as non-streaming + +2. queueManager.createOrTap(taskId) + → Same + +3. registerAndExecuteAgentAsync() + → Same + +4. resultAggregator.consumeAndEmit(consumer) + → Returns Flow.Publisher immediately + → Non-blocking + +5. processor() wraps publisher: + - Validates task ID + - Adds task to QueueManager + - Stores push notification config + - Sends push notifications + +6. cleanup(queue, task, true) + → ALWAYS async for streaming + +7. Return Flow.Publisher +``` + +### Key Difference + +**Non-Streaming**: Blocks until terminal event, then returns Task/Message +**Streaming**: Returns Flow.Publisher immediately, client receives events as they arrive + +**Cleanup**: Streaming ALWAYS uses async cleanup (background thread) + +--- + +## EventConsumer Details + +**Location**: `server-common/.../events/EventConsumer.java` + +**Purpose**: Consumes events from EventQueue and exposes as reactive stream + +**Key Methods**: +- `consume()` → Returns `Flow.Publisher` +- Polls queue with 500ms timeout +- Closes queue on final event +- Thread-safe concurrent consumption + +**Usage**: +```java +EventConsumer consumer = new EventConsumer(eventQueue); +Flow.Publisher publisher = consumer.consume(); +// Subscribe to receive events as they arrive +``` + +--- + +## ResultAggregator Modes + +**Location**: `server-common/.../tasks/ResultAggregator.java` + +Bridges EventConsumer and DefaultRequestHandler with three consumption modes: + +### 1. consumeAndBreakOnInterrupt() + +**Used by**: `onMessageSend()` (non-streaming) + +**Behavior**: +- Polls queue until terminal event or AUTH_REQUIRED +- Returns `EventTypeAndInterrupt(event, interrupted)` +- Blocking operation +- Exits early on AUTH_REQUIRED (interrupted = true) + +**Use Case**: Non-streaming requests that need single final response + +### 2. consumeAndEmit() + +**Used by**: `onMessageSendStream()` (streaming) + +**Behavior**: +- Returns all events as `Flow.Publisher` +- Non-blocking, immediate return +- Client subscribes to stream +- Events delivered as they arrive + +**Use Case**: Streaming requests where client wants all events in real-time + +### 3. consumeAll() + +**Used by**: `onCancelTask()` + +**Behavior**: +- Consumes all events from queue +- Returns first `Message` or final `Task` found +- Simple consumption without streaming +- Blocks until queue exhausted + +**Use Case**: Task cancellation where final state matters + +--- + +## Flow Comparison Table + +| Aspect | Non-Streaming | Streaming | +|--------|---------------|-----------| +| **ResultAggregator Mode** | consumeAndBreakOnInterrupt | consumeAndEmit | +| **Return Type** | Task/Message | Flow.Publisher | +| **Blocking** | Yes (until terminal event) | No (immediate return) | +| **Cleanup** | Immediate or async | Always async | +| **AUTH_REQUIRED** | Early exit, return task | Continue streaming | +| **Use Case** | Simple request/response | Real-time event updates | + +--- + +## Cleanup Integration + +### Actual Implementation: Always Asynchronous + +**Reality**: Cleanup is ALWAYS asynchronous in both streaming and non-streaming flows. The cleanup happens in the `finally` block via `cleanupProducer()`, which runs in a background thread. + +```java +// Both flows (in finally block): +cleanupProducer(agentFuture, consumptionFuture, taskId, queue, isStreaming) + .whenComplete((res, err) -> { + if (err != null) { + LOGGER.error("Error during async cleanup for task {}", taskId, err); + } + }); +``` + +**Key Points**: +- Cleanup is initiated in `finally` block regardless of flow outcome +- `cleanupProducer()` waits for both agent and consumption futures to complete +- Queue closure happens in background, never blocking the request thread +- For streaming: EventConsumer manages queue lifecycle via `agentCompleted` flag +- For non-streaming: Queue is closed directly after agent completes + +### Streaming Cleanup + +```java +cleanup(queue, task, true); // ALWAYS async for streaming +``` + +**Logic**: Streaming always uses async cleanup because: +- Publisher already returned to client +- Events may still be processing +- Queue cleanup happens in background + +--- + +## Thread Model + +### Agent Execution Thread +- `CompletableFuture.runAsync(agentExecutor::execute, executor)` +- Agent runs in background thread pool +- Enqueues events to MainQueue + +### MainEventBusProcessor Thread +- Single background thread: "MainEventBusProcessor" +- Processes events from MainEventBus +- Persists to TaskStore, distributes to ChildQueues + +### Consumer Thread +- Non-streaming: Request handler thread (blocking) +- Streaming: Subscriber thread (reactive) +- Polls ChildQueue for events + +### Cleanup Thread +- Async cleanup: Background thread pool +- Immediate cleanup: Request handler thread + +--- + +## Related Documentation + +- **[Main Overview](../EVENTQUEUE.md)** - Architecture and components +- **[Lifecycle](LIFECYCLE.md)** - Queue lifecycle and cleanup +- **[Scenarios](SCENARIOS.md)** - Real-world usage patterns diff --git a/.claude/architecture/eventqueue/LIFECYCLE.md b/.claude/architecture/eventqueue/LIFECYCLE.md new file mode 100644 index 000000000..9cca21349 --- /dev/null +++ b/.claude/architecture/eventqueue/LIFECYCLE.md @@ -0,0 +1,202 @@ +# Queue Lifecycle - THE BIG IDEA + +> Deep-dive on task state-driven queue lifecycle management + +## Problem Solved + +- **Fire-and-forget tasks**: Agent finishes without emitting final state +- **Client reconnections**: Late reconnections after disconnect +- **Replicated events**: Late-arriving events for ongoing tasks +- **Queue leaks**: Proper cleanup when tasks finalize + +## Solution: Two-Level Protection + +**Core Principle**: MainQueues stay open in QueueManager map as long as Task is in non-final state, enabling fire-and-forget and late resubscriptions. + +--- + +## Level 1: Cleanup Callback + +**When**: MainQueue closes +**Location**: `InMemoryQueueManager.getCleanupCallback()` + +```java +Runnable cleanupCallback = () -> { + if (taskStateProvider != null && !taskStateProvider.isUnsatisfied()) { + boolean isFinalized = taskStateProvider.isTaskFinalized(taskId); + if (!isFinalized) { + LOGGER.info("Task {} is not finalized, keeping queue in map", taskId); + return; // Don't remove from map - task still active + } + } + queues.remove(taskId); // Only remove if finalized +}; +``` + +**Purpose**: Prevents removal from QueueManager map for non-final tasks (enables resubscription). + +--- + +## Level 2: Auto-Close Prevention + +**When**: Last ChildQueue closes +**Location**: `MainQueue.childClosing()` + +```java +void childClosing(ChildQueue child, boolean immediate) { + children.remove(child); + + if (!children.isEmpty()) { + return; // Other children still active + } + + // No children left - check if task finalized before auto-closing + if (taskStateProvider != null && taskId != null) { + boolean isFinalized = taskStateProvider.isTaskFinalized(taskId); + if (!isFinalized) { + LOGGER.info("MainQueue for task {} has no children, but task is not finalized - keeping queue open", taskId); + return; // Keep MainQueue OPEN for resubscriptions! + } + } + + this.doClose(immediate); // Close only if task finalized +} +``` + +**Purpose**: Prevents auto-close when all children disconnect (keeps queue alive for late arrivals). + +--- + +## TaskStateProvider Interface + +**Location**: `server-common/.../tasks/TaskStateProvider.java` + +```java +public interface TaskStateProvider { + boolean isTaskActive(String taskId); // Is task still being worked on? + boolean isTaskFinalized(String taskId); // Is task in final state? +} +``` + +### Implementations +- `InMemoryTaskStore` implements TaskStateProvider +- `JpaDatabaseTaskStore` implements TaskStateProvider +- Injected via CDI: `Instance` + +### State Checks +- `isTaskActive()`: Used by ReplicatedQueueManager to skip events for inactive tasks +- `isTaskFinalized()`: Used by both protection levels to determine cleanup eligibility + +--- + +## Queue Close Modes + +### Graceful Close (`queue.close()`) + +- Drains remaining events before closing +- Used by normal termination +- ChildQueues close individually + +### Immediate Close (`queue.close(true)`) + +- Clears all pending events immediately +- Used by error conditions +- Forces all children to close + +--- + +## Background Cleanup + +**Location**: `DefaultRequestHandler.cleanup()` + +### Non-Streaming Cleanup + +```java +if (event instanceof Message || isFinalEvent(event)) { + if (!interrupted) { + cleanup(queue, task, false); // Immediate: wait for agent, close queue + } else { + cleanup(queue, task, true); // Async: close in background + } +} +``` + +### Streaming Cleanup (always async) + +```java +cleanup(queue, task, true); // Background cleanup after streaming completes +``` + +### Cleanup Implementation + +```java +private CompletableFuture cleanupProducer( + @Nullable CompletableFuture agentFuture, + @Nullable CompletableFuture consumptionFuture, + String taskId, + EventQueue queue, + boolean isStreaming) { + + if (agentFuture == null) { + return CompletableFuture.completedFuture(null); + } + + // Wait for BOTH agent AND consumption to complete before cleanup + CompletableFuture bothComplete = agentFuture; + if (consumptionFuture != null) { + bothComplete = CompletableFuture.allOf(agentFuture, consumptionFuture); + } + + return bothComplete.whenComplete((v, t) -> { + if (isStreaming) { + // EventConsumer manages queue lifecycle via agentCompleted flag + LOGGER.debug("Streaming: queue lifecycle managed by EventConsumer"); + } else { + // Close ChildQueue directly (triggers Level 2 check) + queue.close(false, true); + } + }); +} +``` + +--- + +## Memory Management + +### Non-Final Tasks +- Queues retained in QueueManager map +- Small memory footprint (queue object + taskId) +- Enables fire-and-forget and resubscription patterns + +### Finalized Tasks +- Queues cleaned up immediately +- Removed from QueueManager map +- Grace period in JpaDatabaseTaskStore (48 hours) + +### Replicated Scenario +- Late-arriving events can still be processed +- MainQueue stays in map until finalization +- Each instance manages own queue lifecycle + +--- + +## Why Two Levels? + +**Level 1** (Cleanup Callback): +- Prevents removal from map for non-final tasks +- Enables resubscription after queue close + +**Level 2** (Auto-Close Prevention): +- Prevents auto-close when all children disconnect +- Keeps queue alive for late arrivals +- Supports fire-and-forget pattern + +**Together**: Guarantee that queues stay available for non-final tasks while cleaning up promptly when tasks complete. + +--- + +## Related Documentation + +- **[Main Overview](../EVENTQUEUE.md)** - Architecture and components +- **[Request Flows](FLOWS.md)** - How cleanup integrates with request handling +- **[Scenarios](SCENARIOS.md)** - Real-world usage patterns diff --git a/.claude/architecture/eventqueue/SCENARIOS.md b/.claude/architecture/eventqueue/SCENARIOS.md new file mode 100644 index 000000000..5ca673750 --- /dev/null +++ b/.claude/architecture/eventqueue/SCENARIOS.md @@ -0,0 +1,308 @@ +# Usage Scenarios & Common Pitfalls + +> Real-world patterns and mistakes to avoid + +## Scenario 1: Fire-and-Forget Pattern (TCK) + +**Pattern**: Agent emits WORKING status but never completes + +```java +// Agent execution +agentExecutor.execute(context, queue) { + Task workingTask = new Task.Builder() + .id(taskId) + .status(new TaskStatus(TaskState.WORKING)) // Non-final! + .build(); + queue.enqueueEvent(workingTask); + // Agent finishes WITHOUT emitting COMPLETED/FAILED +} + +// What happens: +// 1. ChildQueue closes (client got WORKING event) +// 2. MainQueue.childClosing() checks: isTaskFinalized(taskId) → false +// 3. MainQueue stays OPEN in QueueManager map +// 4. Late resubscription works: queueManager.tap(taskId) → success! +``` + +**Note**: Queue numbers grow during TCK run - this is EXPECTED and intentional for resubscription support. + +**Why This Works**: +- Level 2 protection prevents auto-close when task is non-final +- MainQueue stays in map even with no children +- Later reconnections can tap into same MainQueue + +--- + +## Scenario 2: Late Resubscription + +**Pattern**: Client disconnects then reconnects to ongoing task + +``` +Time 0: Client sends message, gets ChildQueue +Time 1: Agent emits WORKING event +Time 2: Client disconnects, ChildQueue closes +Time 3: Agent still processing (non-final state) +Time 4: All ChildQueues closed, MainQueue.childClosing() fires + → Checks isTaskFinalized() → false + → MainQueue stays open +Time 5: Client reconnects: queueManager.tap(taskId) + → MainQueue still in map! + → New ChildQueue created + → Success! +``` + +**Key Insight**: The gap between Time 4 and Time 5 can be seconds, minutes, or hours. As long as task is non-final, MainQueue remains available. + +**Use Cases**: +- Mobile app loses network connection +- Browser tab closed and reopened +- Load balancer routes to different instance +- Debugging: stop client, fix bug, restart + +--- + +## Scenario 3: Normal Completion + +**Pattern**: Task completes successfully + +```java +// Agent completes +Task completed = new Task.Builder() + .id(taskId) + .status(new TaskStatus(TaskState.COMPLETED)) // Final state! + .build(); +queue.enqueueEvent(completed); + +// Lifecycle: +// 1. TaskStore persists COMPLETED task +// 2. ChildQueue closes after consuming final event +// 3. Level 2: MainQueue.childClosing() +// → isTaskFinalized(taskId) = true +// → mainQueue.doClose() +// 4. Level 1: Cleanup callback fires +// → isTaskFinalized(taskId) = true +// → queues.remove(taskId) +``` + +**Timeline**: +- Event enqueued → MainEventBus → persisted → distributed +- ChildQueue receives COMPLETED event → closes +- MainQueue detects no children + finalized task → closes +- Cleanup callback removes from QueueManager map + +**Result**: Prompt cleanup when task actually finishes + +--- + +## Scenario 4: Tapping (Multiple Consumers) + +**Pattern**: Multiple clients consuming same task events + +```java +// Initial request creates MainQueue +EventQueue mainQueue = queueManager.createOrTap(taskId); + +// Second client taps into existing MainQueue +EventQueue childQueue = queueManager.tap(taskId); + +// Event distribution (ASYNCHRONOUS via MainEventBus) +// NOTE: Distribution is NOT immediate! +public void enqueueEvent(Event event) { + // Step 1: Submit to MainEventBus (async processing) + mainEventBus.submit(event); + + // Step 2: MainEventBusProcessor thread (separate background thread): + // - Persists event to TaskStore + // - Distributes to all ChildQueues via child.internalEnqueueItem(item) + // - Invokes replication hook if configured + + // Key Point: Events are NOT immediately in ChildQueues! + // There's a delay while MainEventBusProcessor persists and distributes. +} +``` + +**Use Cases**: +- **Resubscribing to ongoing tasks**: Late reconnection scenario +- **Canceling tasks while receiving events**: Client sends cancel, still receives updates +- **Multiple concurrent consumers**: Admin dashboard + user client both watching same task +- **Testing/debugging**: Monitor task execution while client operates normally + +**Key Points**: +- All ChildQueues receive ALL events +- Each ChildQueue has independent consumption +- MainQueue doesn't close until ALL children close AND task finalizes + +--- + +## Common Pitfalls + +### 1. Closing EventQueue Before AgentExecutor Finishes + +**Problem**: +```java +// WRONG +agentExecutor.execute(context, queue); +queue.close(); // Too early! Agent may still be enqueueing events +``` + +**Solution**: +```java +// RIGHT - in DefaultRequestHandler.cleanup() +Runnable cleanupTask = () -> { + agentFuture.join(); // Wait for agent to finish + queue.close(); // Then close queue +}; +``` + +**Why**: Agent runs asynchronously. Closing queue before agent finishes loses events. + +--- + +### 2. Not Accounting for Async Cleanup in Streaming + +**Problem**: +```java +// WRONG assumption +onMessageSendStream() returns → queue is closed +``` + +**Reality**: +```java +// RIGHT understanding +onMessageSendStream() returns → queue still open +cleanup() happens in background → queue closes later +``` + +**Why**: Streaming returns publisher immediately. Queue cleanup happens asynchronously after streaming completes. + +**Impact**: Tests may see queues still open after streaming response sent. This is expected. + +--- + +### 3. Assuming MainQueue Has Local Queue + +**Problem**: +```java +// WRONG +Event event = mainQueue.dequeueEventItem(); // Throws UnsupportedOperationException! +``` + +**Reality**: +- MainQueue has NO local queue +- Events submit directly to MainEventBus +- Only ChildQueues have local queues + +**Why**: Design choice to centralize persistence through MainEventBus. + +**Correct Usage**: +```java +// Enqueue to MainQueue (goes to MainEventBus) +mainQueue.enqueueEvent(event); + +// Dequeue from ChildQueue only +Event event = childQueue.dequeueEventItem(); +``` + +--- + +### 4. Not Handling AUTH_REQUIRED Special Case + +**Problem**: +```java +// WRONG assumption +AUTH_REQUIRED received → agent stopped → cleanup can be immediate +``` + +**Reality**: +```java +// RIGHT understanding +AUTH_REQUIRED received → agent STILL RUNNING → cleanup must be async +``` + +**Why**: Agent waits for authentication credentials. It hasn't finished executing. + +**Impact**: +- Non-streaming: Returns task to client immediately, cleanup happens async +- Agent continues running in background +- Future events (COMPLETED, FAILED) update TaskStore + +--- + +### 5. Expecting Immediate Queue Cleanup + +**Problem**: +```java +// WRONG expectation +task emits WORKING (non-final) → queue should be cleaned up +``` + +**Reality**: +```java +// RIGHT understanding +task emits WORKING (non-final) → queue intentionally KEPT OPEN +task emits COMPLETED (final) → queue cleaned up +``` + +**Why**: Two-level protection checks task finality before cleanup. + +**Impact**: +- Non-final tasks: Queues retained intentionally (fire-and-forget support) +- Finalized tasks: Queues cleaned up promptly +- This is NOT a leak, it's intentional design + +**When to Worry**: If finalized tasks don't clean up queues (check TaskStateProvider implementation) + +--- + +## Scenario Comparison Table + +| Scenario | Task State | Queue Behavior | Use Case | +|----------|-----------|----------------|----------| +| **Fire-and-Forget** | Non-final (WORKING) | Stays open indefinitely | TCK compliance, async agents | +| **Late Resubscription** | Non-final | Stays open for reconnection | Network issues, debugging | +| **Normal Completion** | Final (COMPLETED) | Closes promptly | Standard request/response | +| **Tapping** | Any | Multiple ChildQueues share MainQueue | Monitoring, multi-client | + +--- + +## Debugging Tips + +### Check Queue State +```java +// Is MainQueue in map? +boolean exists = queueManager.tap(taskId) != null; + +// What's the queue size? +int size = eventQueue.size(); // MainQueue = MainEventBus size, ChildQueue = local size +``` + +### Check Task State +```java +// Is task finalized? +boolean finalized = taskStateProvider.isTaskFinalized(taskId); + +// Is task active? +boolean active = taskStateProvider.isTaskActive(taskId); +``` + +### Trace Event Flow +```java +// Add EventEnqueueHook for logging +EventQueue.builder() + .hook(event -> LOGGER.info("Event enqueued: {}", event)) + .build(); +``` + +### Monitor MainEventBus +```java +// Check queue depth +int depth = mainEventBus.size(); // High depth = processing backlog +``` + +--- + +## Related Documentation + +- **[Main Overview](../EVENTQUEUE.md)** - Architecture and components +- **[Lifecycle](LIFECYCLE.md)** - Queue lifecycle and two-level protection +- **[Flows](FLOWS.md)** - Request handling patterns diff --git a/.claude/skills b/.claude/skills new file mode 120000 index 000000000..2b7a412b8 --- /dev/null +++ b/.claude/skills @@ -0,0 +1 @@ +../.agents/skills \ No newline at end of file diff --git a/.gitignore b/.gitignore index 441ef20e1..a4a0b371f 100644 --- a/.gitignore +++ b/.gitignore @@ -7,8 +7,6 @@ release.properties .flattened-pom.xml *.args -#Claude -CLAUDE.md # Eclipse .project @@ -51,7 +49,7 @@ nbproject/ .certs/ # Private Claude config -.claude/ +.claude/settings.local.json .serena/ .bob/ claudedocs diff --git a/AGENTS.md b/AGENTS.md index c5d2d7eed..3974c88b7 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -67,12 +67,24 @@ mvn clean install ### Skills -- [update-a2a-proto](skills/update-a2a-proto/SKILL.md) — Update the gRPC proto file `a2a.proto` from upstream and regenerate Java sources +- [update-a2a-proto](.agents/skills/update-a2a-proto/SKILL.md) — Update the gRPC proto file `a2a.proto` from upstream and regenerate Java sources ### Commands - `mvn clean install` — Clean build of the project +## Architecture Deep Dives + +For detailed architectural documentation: + +- **EventQueue & Event Processing**: `.claude/architecture/EVENTQUEUE.md` + - Quick reference with architecture diagram and core components + - **[Queue Lifecycle](.claude/architecture/eventqueue/LIFECYCLE.md)**: Two-level protection, fire-and-forget, late reconnections + - **[Request Flows](.claude/architecture/eventqueue/FLOWS.md)**: Non-streaming vs streaming, cleanup patterns + - **[Usage Scenarios](.claude/architecture/eventqueue/SCENARIOS.md)**: Real-world patterns and common pitfalls + +> 💡 Deep-dive docs are loaded on-demand when working in related areas. + ## Contributing See [CONTRIBUTING.md](CONTRIBUTING.md). Fork the repo, create a branch per issue, submit PRs against `main`. diff --git a/CLAUDE.md b/CLAUDE.md new file mode 120000 index 000000000..47dc3e3d8 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1 @@ +AGENTS.md \ No newline at end of file