Skip to content

Conversation

@kabir
Copy link
Collaborator

@kabir kabir commented Nov 4, 2025

…cking calls

Fixes race condition where DefaultRequestHandler.onMessageSend() returns before all task events are fully processed and persisted to TaskStore, resulting in incomplete Task objects being returned to clients (missing artifacts, incorrect state).

Root Cause:

  • Blocking calls interrupted immediately after first event and returned to client before background event consumption completed
  • Agent execution and event processing happened asynchronously in background
  • No synchronization to ensure all events were consumed and persisted before returning Task to client

Solution (4-step process):

  1. Wait for agent to finish enqueueing events (5s timeout)
  2. Close the queue to signal consumption can complete (breaks dependency)
  3. Wait for consumption to finish processing events (2s timeout)
  4. Fetch final task state from TaskStore (has all artifacts and correct state)

This ensures blocking calls return complete Task objects with all artifacts and correct state, including support for fire-and-forget tasks that never emit final state events.

Added unit tests:

  • testBlockingFireAndForgetReturnsNonFinalTask: Validates fire-and-forget pattern
  • testBlockingCallReturnsCompleteTaskWithArtifacts: Ensures all artifacts included

Upstream: #431

…cking calls

Fixes race condition where DefaultRequestHandler.onMessageSend() returns
before all task events are fully processed and persisted to TaskStore,
resulting in incomplete Task objects being returned to clients (missing
artifacts, incorrect state).

Root Cause:
- Blocking calls interrupted immediately after first event and returned
  to client before background event consumption completed
- Agent execution and event processing happened asynchronously in background
- No synchronization to ensure all events were consumed and persisted
  before returning Task to client

Solution (4-step process):
1. Wait for agent to finish enqueueing events (5s timeout)
2. Close the queue to signal consumption can complete (breaks dependency)
3. Wait for consumption to finish processing events (2s timeout)
4. Fetch final task state from TaskStore (has all artifacts and correct state)

This ensures blocking calls return complete Task objects with all artifacts
and correct state, including support for fire-and-forget tasks that never
emit final state events.

Added unit tests:
- testBlockingFireAndForgetReturnsNonFinalTask: Validates fire-and-forget pattern
- testBlockingCallReturnsCompleteTaskWithArtifacts: Ensures all artifacts included
@kabir kabir added the 0.3.x label Nov 4, 2025
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @kabir, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a critical race condition that previously caused blocking API calls to return incomplete task objects, missing crucial artifacts or the final state. The solution introduces a comprehensive four-step synchronization mechanism within the DefaultRequestHandler, ensuring that all agent-generated events are fully processed and persisted before a task is returned to the client. This enhancement significantly improves the reliability and data integrity of blocking task operations, including support for fire-and-forget agents, by guaranteeing that the returned task object accurately reflects its complete and final state.

Highlights

  • Race Condition Fix: Resolved a race condition in DefaultRequestHandler.onMessageSend() where blocking calls would return incomplete Task objects due to asynchronous event processing not being fully synchronized.
  • Four-Step Synchronization Process: Implemented a robust four-step process for blocking calls to ensure task completeness: 1. Wait for agent event enqueuing, 2. Close the event queue, 3. Wait for event consumption, and 4. Fetch the final Task state from TaskStore.
  • Configurable Timeouts: Introduced two new configurable timeouts (a2a.blocking.agent.timeout.seconds and a2a.blocking.consumption.timeout.seconds) to manage the waiting periods for agent completion and event consumption respectively.
  • Refactored ResultAggregator: The ResultAggregator was updated to streamline event consumption logic, specifically by removing an unused eventCallback parameter and refining how consumptionCompletionFuture is handled for blocking calls.
  • New Unit Tests: Added testBlockingFireAndForgetReturnsNonFinalTask and testBlockingCallReturnsCompleteTaskWithArtifacts to validate the fix, ensuring blocking calls return complete tasks with all artifacts, and correctly handle fire-and-forget scenarios.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively addresses a race condition in blocking calls by introducing explicit waits for agent execution and event consumption, ensuring that clients receive a complete Task object. The logic is sound and the new configurable timeouts are a welcome addition. The new unit tests are well-designed and thoroughly validate the fix. I have a couple of suggestions to further improve the robustness and readability of the implementation.

Comment on lines 291 to 295
} catch (java.util.concurrent.ExecutionException e) {
LOGGER.warn("Error during task {} execution", taskId, e.getCause());
} catch (java.util.concurrent.TimeoutException e) {
LOGGER.warn("Timeout waiting for consumption to complete for task {}", taskId);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Handling ExecutionException and TimeoutException by only logging a warning could result in returning an incomplete task to the client, which undermines the goal of this pull request. If the background processing fails or times out, it's better to signal a failure to the client. Consider re-throwing these exceptions as an InternalError and logging them as errors.

Suggested change
} catch (java.util.concurrent.ExecutionException e) {
LOGGER.warn("Error during task {} execution", taskId, e.getCause());
} catch (java.util.concurrent.TimeoutException e) {
LOGGER.warn("Timeout waiting for consumption to complete for task {}", taskId);
}
} catch (java.util.concurrent.ExecutionException e) {
LOGGER.error("Error during task {} execution", taskId, e.getCause());
throw new InternalError("Error waiting for task execution to complete", e.getCause());
} catch (java.util.concurrent.TimeoutException e) {
LOGGER.error("Timeout waiting for consumption to complete for task {}", taskId, e);
throw new InternalError("Timeout waiting for task consumption to complete", e);
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

// Step 1: Wait for agent to finish (with configurable timeout)
if (agentFuture != null) {
try {
agentFuture.get(agentCompletionTimeoutSeconds, java.util.concurrent.TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For improved readability, please import java.util.concurrent.TimeUnit at the top of the file and use TimeUnit.SECONDS directly here and on line 285.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@kabir kabir merged commit d89a677 into a2aproject:0.3.x Nov 4, 2025
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant